aboutsummaryrefslogtreecommitdiff
path: root/src/context.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/context.rs')
-rw-r--r--src/context.rs191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/context.rs b/src/context.rs
new file mode 100644
index 0000000..e2e8480
--- /dev/null
+++ b/src/context.rs
@@ -0,0 +1,191 @@
+//! Thread-local context used in select.
+
+use std::cell::Cell;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::thread::{self, Thread, ThreadId};
+use std::time::Instant;
+
+use crossbeam_utils::Backoff;
+
+use crate::select::Selected;
+
+/// Thread-local context used in select.
+#[derive(Debug, Clone)]
+pub struct Context {
+ inner: Arc<Inner>,
+}
+
+/// Inner representation of `Context`.
+#[derive(Debug)]
+struct Inner {
+ /// Selected operation.
+ select: AtomicUsize,
+
+ /// A slot into which another thread may store a pointer to its `Packet`.
+ packet: AtomicUsize,
+
+ /// Thread handle.
+ thread: Thread,
+
+ /// Thread id.
+ thread_id: ThreadId,
+}
+
+impl Context {
+ /// Creates a new context for the duration of the closure.
+ #[inline]
+ pub fn with<F, R>(f: F) -> R
+ where
+ F: FnOnce(&Context) -> R,
+ {
+ thread_local! {
+ /// Cached thread-local context.
+ static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
+ }
+
+ let mut f = Some(f);
+ let mut f = move |cx: &Context| -> R {
+ let f = f.take().unwrap();
+ f(cx)
+ };
+
+ CONTEXT
+ .try_with(|cell| match cell.take() {
+ None => f(&Context::new()),
+ Some(cx) => {
+ cx.reset();
+ let res = f(&cx);
+ cell.set(Some(cx));
+ res
+ }
+ })
+ .unwrap_or_else(|_| f(&Context::new()))
+ }
+
+ /// Creates a new `Context`.
+ #[cold]
+ fn new() -> Context {
+ Context {
+ inner: Arc::new(Inner {
+ select: AtomicUsize::new(Selected::Waiting.into()),
+ packet: AtomicUsize::new(0),
+ thread: thread::current(),
+ thread_id: thread::current().id(),
+ }),
+ }
+ }
+
+ /// Resets `select` and `packet`.
+ #[inline]
+ fn reset(&self) {
+ self.inner
+ .select
+ .store(Selected::Waiting.into(), Ordering::Release);
+ self.inner.packet.store(0, Ordering::Release);
+ }
+
+ /// Attempts to select an operation.
+ ///
+ /// On failure, the previously selected operation is returned.
+ #[inline]
+ pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
+ self.inner
+ .select
+ .compare_exchange(
+ Selected::Waiting.into(),
+ select.into(),
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .map(|_| ())
+ .map_err(|e| e.into())
+ }
+
+ /// Returns the selected operation.
+ #[inline]
+ pub fn selected(&self) -> Selected {
+ Selected::from(self.inner.select.load(Ordering::Acquire))
+ }
+
+ /// Stores a packet.
+ ///
+ /// This method must be called after `try_select` succeeds and there is a packet to provide.
+ #[inline]
+ pub fn store_packet(&self, packet: usize) {
+ if packet != 0 {
+ self.inner.packet.store(packet, Ordering::Release);
+ }
+ }
+
+ /// Waits until a packet is provided and returns it.
+ #[inline]
+ pub fn wait_packet(&self) -> usize {
+ let backoff = Backoff::new();
+ loop {
+ let packet = self.inner.packet.load(Ordering::Acquire);
+ if packet != 0 {
+ return packet;
+ }
+ backoff.snooze();
+ }
+ }
+
+ /// Waits until an operation is selected and returns it.
+ ///
+ /// If the deadline is reached, `Selected::Aborted` will be selected.
+ #[inline]
+ pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
+ // Spin for a short time, waiting until an operation is selected.
+ let backoff = Backoff::new();
+ loop {
+ let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
+ if sel != Selected::Waiting {
+ return sel;
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.snooze();
+ }
+ }
+
+ loop {
+ // Check whether an operation has been selected.
+ let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
+ if sel != Selected::Waiting {
+ return sel;
+ }
+
+ // If there's a deadline, park the current thread until the deadline is reached.
+ if let Some(end) = deadline {
+ let now = Instant::now();
+
+ if now < end {
+ thread::park_timeout(end - now);
+ } else {
+ // The deadline has been reached. Try aborting select.
+ return match self.try_select(Selected::Aborted) {
+ Ok(()) => Selected::Aborted,
+ Err(s) => s,
+ };
+ }
+ } else {
+ thread::park();
+ }
+ }
+ }
+
+ /// Unparks the thread this context belongs to.
+ #[inline]
+ pub fn unpark(&self) {
+ self.inner.thread.unpark();
+ }
+
+ /// Returns the id of the thread this context belongs to.
+ #[inline]
+ pub fn thread_id(&self) -> ThreadId {
+ self.inner.thread_id
+ }
+}