aboutsummaryrefslogtreecommitdiff
path: root/src/select.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/select.rs')
-rw-r--r--src/select.rs1251
1 files changed, 1251 insertions, 0 deletions
diff --git a/src/select.rs b/src/select.rs
new file mode 100644
index 0000000..1488f80
--- /dev/null
+++ b/src/select.rs
@@ -0,0 +1,1251 @@
+//! Interface to the select mechanism.
+
+use std::fmt;
+use std::marker::PhantomData;
+use std::mem;
+use std::time::{Duration, Instant};
+
+use crossbeam_utils::Backoff;
+
+use crate::channel::{self, Receiver, Sender};
+use crate::context::Context;
+use crate::err::{ReadyTimeoutError, TryReadyError};
+use crate::err::{RecvError, SendError};
+use crate::err::{SelectTimeoutError, TrySelectError};
+use crate::flavors;
+use crate::utils;
+
+/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
+/// `read` or `write`.
+///
+/// Each field contains data associated with a specific channel flavor.
+#[derive(Debug, Default)]
+pub struct Token {
+ pub at: flavors::at::AtToken,
+ pub array: flavors::array::ArrayToken,
+ pub list: flavors::list::ListToken,
+ pub never: flavors::never::NeverToken,
+ pub tick: flavors::tick::TickToken,
+ pub zero: flavors::zero::ZeroToken,
+}
+
+/// Identifier associated with an operation by a specific thread on a specific channel.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Operation(usize);
+
+impl Operation {
+ /// Creates an operation identifier from a mutable reference.
+ ///
+ /// This function essentially just turns the address of the reference into a number. The
+ /// reference should point to a variable that is specific to the thread and the operation,
+ /// and is alive for the entire duration of select or blocking operation.
+ #[inline]
+ pub fn hook<T>(r: &mut T) -> Operation {
+ let val = r as *mut T as usize;
+ // Make sure that the pointer address doesn't equal the numerical representation of
+ // `Selected::{Waiting, Aborted, Disconnected}`.
+ assert!(val > 2);
+ Operation(val)
+ }
+}
+
+/// Current state of a select or a blocking operation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Selected {
+ /// Still waiting for an operation.
+ Waiting,
+
+ /// The attempt to block the current thread has been aborted.
+ Aborted,
+
+ /// An operation became ready because a channel is disconnected.
+ Disconnected,
+
+ /// An operation became ready because a message can be sent or received.
+ Operation(Operation),
+}
+
+impl From<usize> for Selected {
+ #[inline]
+ fn from(val: usize) -> Selected {
+ match val {
+ 0 => Selected::Waiting,
+ 1 => Selected::Aborted,
+ 2 => Selected::Disconnected,
+ oper => Selected::Operation(Operation(oper)),
+ }
+ }
+}
+
+impl Into<usize> for Selected {
+ #[inline]
+ fn into(self) -> usize {
+ match self {
+ Selected::Waiting => 0,
+ Selected::Aborted => 1,
+ Selected::Disconnected => 2,
+ Selected::Operation(Operation(val)) => val,
+ }
+ }
+}
+
+/// A receiver or a sender that can participate in select.
+///
+/// This is a handle that assists select in executing an operation, registration, deciding on the
+/// appropriate deadline for blocking, etc.
+pub trait SelectHandle {
+ /// Attempts to select an operation and returns `true` on success.
+ fn try_select(&self, token: &mut Token) -> bool;
+
+ /// Returns a deadline for an operation, if there is one.
+ fn deadline(&self) -> Option<Instant>;
+
+ /// Registers an operation for execution and returns `true` if it is now ready.
+ fn register(&self, oper: Operation, cx: &Context) -> bool;
+
+ /// Unregisters an operation for execution.
+ fn unregister(&self, oper: Operation);
+
+ /// Attempts to select an operation the thread got woken up for and returns `true` on success.
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool;
+
+ /// Returns `true` if an operation can be executed without blocking.
+ fn is_ready(&self) -> bool;
+
+ /// Registers an operation for readiness notification and returns `true` if it is now ready.
+ fn watch(&self, oper: Operation, cx: &Context) -> bool;
+
+ /// Unregisters an operation for readiness notification.
+ fn unwatch(&self, oper: Operation);
+}
+
+impl<T: SelectHandle> SelectHandle for &T {
+ fn try_select(&self, token: &mut Token) -> bool {
+ (**self).try_select(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ (**self).deadline()
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ (**self).register(oper, cx)
+ }
+
+ fn unregister(&self, oper: Operation) {
+ (**self).unregister(oper);
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ (**self).accept(token, cx)
+ }
+
+ fn is_ready(&self) -> bool {
+ (**self).is_ready()
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ (**self).watch(oper, cx)
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ (**self).unwatch(oper)
+ }
+}
+
+/// Determines when a select operation should time out.
+#[derive(Clone, Copy, Eq, PartialEq)]
+enum Timeout {
+ /// No blocking.
+ Now,
+
+ /// Block forever.
+ Never,
+
+ /// Time out after the time instant.
+ At(Instant),
+}
+
+/// Runs until one of the operations is selected, potentially blocking the current thread.
+///
+/// Successful receive operations will have to be followed up by `channel::read()` and successful
+/// send operations by `channel::write()`.
+fn run_select(
+ handles: &mut [(&dyn SelectHandle, usize, *const u8)],
+ timeout: Timeout,
+) -> Option<(Token, usize, *const u8)> {
+ if handles.is_empty() {
+ // Wait until the timeout and return.
+ match timeout {
+ Timeout::Now => return None,
+ Timeout::Never => {
+ utils::sleep_until(None);
+ unreachable!();
+ }
+ Timeout::At(when) => {
+ utils::sleep_until(Some(when));
+ return None;
+ }
+ }
+ }
+
+ // Shuffle the operations for fairness.
+ utils::shuffle(handles);
+
+ // Create a token, which serves as a temporary variable that gets initialized in this function
+ // and is later used by a call to `channel::read()` or `channel::write()` that completes the
+ // selected operation.
+ let mut token = Token::default();
+
+ // Try selecting one of the operations without blocking.
+ for &(handle, i, ptr) in handles.iter() {
+ if handle.try_select(&mut token) {
+ return Some((token, i, ptr));
+ }
+ }
+
+ loop {
+ // Prepare for blocking.
+ let res = Context::with(|cx| {
+ let mut sel = Selected::Waiting;
+ let mut registered_count = 0;
+ let mut index_ready = None;
+
+ if let Timeout::Now = timeout {
+ cx.try_select(Selected::Aborted).unwrap();
+ }
+
+ // Register all operations.
+ for (handle, i, _) in handles.iter_mut() {
+ registered_count += 1;
+
+ // If registration returns `false`, that means the operation has just become ready.
+ if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
+ // Try aborting select.
+ sel = match cx.try_select(Selected::Aborted) {
+ Ok(()) => {
+ index_ready = Some(*i);
+ Selected::Aborted
+ }
+ Err(s) => s,
+ };
+ break;
+ }
+
+ // If another thread has already selected one of the operations, stop registration.
+ sel = cx.selected();
+ if sel != Selected::Waiting {
+ break;
+ }
+ }
+
+ if sel == Selected::Waiting {
+ // Check with each operation for how long we're allowed to block, and compute the
+ // earliest deadline.
+ let mut deadline: Option<Instant> = match timeout {
+ Timeout::Now => return None,
+ Timeout::Never => None,
+ Timeout::At(when) => Some(when),
+ };
+ for &(handle, _, _) in handles.iter() {
+ if let Some(x) = handle.deadline() {
+ deadline = deadline.map(|y| x.min(y)).or(Some(x));
+ }
+ }
+
+ // Block the current thread.
+ sel = cx.wait_until(deadline);
+ }
+
+ // Unregister all registered operations.
+ for (handle, _, _) in handles.iter_mut().take(registered_count) {
+ handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
+ }
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ // If an operation became ready during registration, try selecting it.
+ if let Some(index_ready) = index_ready {
+ for &(handle, i, ptr) in handles.iter() {
+ if i == index_ready && handle.try_select(&mut token) {
+ return Some((i, ptr));
+ }
+ }
+ }
+ }
+ Selected::Disconnected => {}
+ Selected::Operation(_) => {
+ // Find the selected operation.
+ for (handle, i, ptr) in handles.iter_mut() {
+ // Is this the selected operation?
+ if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
+ {
+ // Try selecting this operation.
+ if handle.accept(&mut token, cx) {
+ return Some((*i, *ptr));
+ }
+ }
+ }
+ }
+ }
+
+ None
+ });
+
+ // Return if an operation was selected.
+ if let Some((i, ptr)) = res {
+ return Some((token, i, ptr));
+ }
+
+ // Try selecting one of the operations without blocking.
+ for &(handle, i, ptr) in handles.iter() {
+ if handle.try_select(&mut token) {
+ return Some((token, i, ptr));
+ }
+ }
+
+ match timeout {
+ Timeout::Now => return None,
+ Timeout::Never => {}
+ Timeout::At(when) => {
+ if Instant::now() >= when {
+ return None;
+ }
+ }
+ }
+ }
+}
+
+/// Runs until one of the operations becomes ready, potentially blocking the current thread.
+fn run_ready(
+ handles: &mut [(&dyn SelectHandle, usize, *const u8)],
+ timeout: Timeout,
+) -> Option<usize> {
+ if handles.is_empty() {
+ // Wait until the timeout and return.
+ match timeout {
+ Timeout::Now => return None,
+ Timeout::Never => {
+ utils::sleep_until(None);
+ unreachable!();
+ }
+ Timeout::At(when) => {
+ utils::sleep_until(Some(when));
+ return None;
+ }
+ }
+ }
+
+ // Shuffle the operations for fairness.
+ utils::shuffle(handles);
+
+ loop {
+ let backoff = Backoff::new();
+ loop {
+ // Check operations for readiness.
+ for &(handle, i, _) in handles.iter() {
+ if handle.is_ready() {
+ return Some(i);
+ }
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.snooze();
+ }
+ }
+
+ // Check for timeout.
+ match timeout {
+ Timeout::Now => return None,
+ Timeout::Never => {}
+ Timeout::At(when) => {
+ if Instant::now() >= when {
+ return None;
+ }
+ }
+ }
+
+ // Prepare for blocking.
+ let res = Context::with(|cx| {
+ let mut sel = Selected::Waiting;
+ let mut registered_count = 0;
+
+ // Begin watching all operations.
+ for (handle, _, _) in handles.iter_mut() {
+ registered_count += 1;
+ let oper = Operation::hook::<&dyn SelectHandle>(handle);
+
+ // If registration returns `false`, that means the operation has just become ready.
+ if handle.watch(oper, cx) {
+ sel = match cx.try_select(Selected::Operation(oper)) {
+ Ok(()) => Selected::Operation(oper),
+ Err(s) => s,
+ };
+ break;
+ }
+
+ // If another thread has already chosen one of the operations, stop registration.
+ sel = cx.selected();
+ if sel != Selected::Waiting {
+ break;
+ }
+ }
+
+ if sel == Selected::Waiting {
+ // Check with each operation for how long we're allowed to block, and compute the
+ // earliest deadline.
+ let mut deadline: Option<Instant> = match timeout {
+ Timeout::Now => unreachable!(),
+ Timeout::Never => None,
+ Timeout::At(when) => Some(when),
+ };
+ for &(handle, _, _) in handles.iter() {
+ if let Some(x) = handle.deadline() {
+ deadline = deadline.map(|y| x.min(y)).or(Some(x));
+ }
+ }
+
+ // Block the current thread.
+ sel = cx.wait_until(deadline);
+ }
+
+ // Unwatch all operations.
+ for (handle, _, _) in handles.iter_mut().take(registered_count) {
+ handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
+ }
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {}
+ Selected::Disconnected => {}
+ Selected::Operation(_) => {
+ for (handle, i, _) in handles.iter_mut() {
+ let oper = Operation::hook::<&dyn SelectHandle>(handle);
+ if sel == Selected::Operation(oper) {
+ return Some(*i);
+ }
+ }
+ }
+ }
+
+ None
+ });
+
+ // Return if an operation became ready.
+ if res.is_some() {
+ return res;
+ }
+ }
+}
+
+/// Attempts to select one of the operations without blocking.
+#[inline]
+pub fn try_select<'a>(
+ handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
+) -> Result<SelectedOperation<'a>, TrySelectError> {
+ match run_select(handles, Timeout::Now) {
+ None => Err(TrySelectError),
+ Some((token, index, ptr)) => Ok(SelectedOperation {
+ token,
+ index,
+ ptr,
+ _marker: PhantomData,
+ }),
+ }
+}
+
+/// Blocks until one of the operations becomes ready and selects it.
+#[inline]
+pub fn select<'a>(
+ handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
+) -> SelectedOperation<'a> {
+ if handles.is_empty() {
+ panic!("no operations have been added to `Select`");
+ }
+
+ let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
+ SelectedOperation {
+ token,
+ index,
+ ptr,
+ _marker: PhantomData,
+ }
+}
+
+/// Blocks for a limited time until one of the operations becomes ready and selects it.
+#[inline]
+pub fn select_timeout<'a>(
+ handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
+ timeout: Duration,
+) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
+ select_deadline(handles, Instant::now() + timeout)
+}
+
+/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
+#[inline]
+pub fn select_deadline<'a>(
+ handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
+ deadline: Instant,
+) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
+ match run_select(handles, Timeout::At(deadline)) {
+ None => Err(SelectTimeoutError),
+ Some((token, index, ptr)) => Ok(SelectedOperation {
+ token,
+ index,
+ ptr,
+ _marker: PhantomData,
+ }),
+ }
+}
+
+/// Selects from a set of channel operations.
+///
+/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
+/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
+/// among them is selected.
+///
+/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
+/// when it will simply return an error because the channel is disconnected.
+///
+/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
+/// dynamically created list of channel operations.
+///
+/// Once a list of operations has been built with `Select`, there are two different ways of
+/// proceeding:
+///
+/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
+/// the returned selected operation has already begun and **must** be completed. If we don't
+/// complete it, a panic will occur.
+///
+/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
+/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
+/// possible for another thread to make the operation not ready just before we try executing it,
+/// so it's wise to use a retry loop. However, note that these methods might return with success
+/// spuriously, so it's a good idea to always double check if the operation is really ready.
+///
+/// # Examples
+///
+/// Use [`select`] to receive a message from a list of receivers:
+///
+/// ```
+/// use crossbeam_channel::{Receiver, RecvError, Select};
+///
+/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
+/// // Build a list of operations.
+/// let mut sel = Select::new();
+/// for r in rs {
+/// sel.recv(r);
+/// }
+///
+/// // Complete the selected operation.
+/// let oper = sel.select();
+/// let index = oper.index();
+/// oper.recv(&rs[index])
+/// }
+/// ```
+///
+/// Use [`ready`] to receive a message from a list of receivers:
+///
+/// ```
+/// use crossbeam_channel::{Receiver, RecvError, Select};
+///
+/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
+/// // Build a list of operations.
+/// let mut sel = Select::new();
+/// for r in rs {
+/// sel.recv(r);
+/// }
+///
+/// loop {
+/// // Wait until a receive operation becomes ready and try executing it.
+/// let index = sel.ready();
+/// let res = rs[index].try_recv();
+///
+/// // If the operation turns out not to be ready, retry.
+/// if let Err(e) = res {
+/// if e.is_empty() {
+/// continue;
+/// }
+/// }
+///
+/// // Success!
+/// return res.map_err(|_| RecvError);
+/// }
+/// }
+/// ```
+///
+/// [`try_select`]: Select::try_select
+/// [`select`]: Select::select
+/// [`select_timeout`]: Select::select_timeout
+/// [`try_ready`]: Select::try_ready
+/// [`ready`]: Select::ready
+/// [`ready_timeout`]: Select::ready_timeout
+pub struct Select<'a> {
+ /// A list of senders and receivers participating in selection.
+ handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
+
+ /// The next index to assign to an operation.
+ next_index: usize,
+}
+
+unsafe impl Send for Select<'_> {}
+unsafe impl Sync for Select<'_> {}
+
+impl<'a> Select<'a> {
+ /// Creates an empty list of channel operations for selection.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::Select;
+ ///
+ /// let mut sel = Select::new();
+ ///
+ /// // The list of operations is empty, which means no operation can be selected.
+ /// assert!(sel.try_select().is_err());
+ /// ```
+ pub fn new() -> Select<'a> {
+ Select {
+ handles: Vec::with_capacity(4),
+ next_index: 0,
+ }
+ }
+
+ /// Adds a send operation.
+ ///
+ /// Returns the index of the added operation.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s, r) = unbounded::<i32>();
+ ///
+ /// let mut sel = Select::new();
+ /// let index = sel.send(&s);
+ /// ```
+ pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
+ let i = self.next_index;
+ let ptr = s as *const Sender<_> as *const u8;
+ self.handles.push((s, i, ptr));
+ self.next_index += 1;
+ i
+ }
+
+ /// Adds a receive operation.
+ ///
+ /// Returns the index of the added operation.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s, r) = unbounded::<i32>();
+ ///
+ /// let mut sel = Select::new();
+ /// let index = sel.recv(&r);
+ /// ```
+ pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
+ let i = self.next_index;
+ let ptr = r as *const Receiver<_> as *const u8;
+ self.handles.push((r, i, ptr));
+ self.next_index += 1;
+ i
+ }
+
+ /// Removes a previously added operation.
+ ///
+ /// This is useful when an operation is selected because the channel got disconnected and we
+ /// want to try again to select a different operation instead.
+ ///
+ /// If new operations are added after removing some, the indices of removed operations will not
+ /// be reused.
+ ///
+ /// # Panics
+ ///
+ /// An attempt to remove a non-existing or already removed operation will panic.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded::<i32>();
+ /// let (_, r2) = unbounded::<i32>();
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // Both operations are initially ready, so a random one will be executed.
+ /// let oper = sel.select();
+ /// assert_eq!(oper.index(), oper2);
+ /// assert!(oper.recv(&r2).is_err());
+ /// sel.remove(oper2);
+ ///
+ /// s1.send(10).unwrap();
+ ///
+ /// let oper = sel.select();
+ /// assert_eq!(oper.index(), oper1);
+ /// assert_eq!(oper.recv(&r1), Ok(10));
+ /// ```
+ pub fn remove(&mut self, index: usize) {
+ assert!(
+ index < self.next_index,
+ "index out of bounds; {} >= {}",
+ index,
+ self.next_index,
+ );
+
+ let i = self
+ .handles
+ .iter()
+ .enumerate()
+ .find(|(_, (_, i, _))| *i == index)
+ .expect("no operation with this index")
+ .0;
+
+ self.handles.swap_remove(i);
+ }
+
+ /// Attempts to select one of the operations without blocking.
+ ///
+ /// If an operation is ready, it is selected and returned. If multiple operations are ready at
+ /// the same time, a random one among them is selected. If none of the operations are ready, an
+ /// error is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// The selected operation must be completed with [`SelectedOperation::send`]
+ /// or [`SelectedOperation::recv`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// s1.send(10).unwrap();
+ /// s2.send(20).unwrap();
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // Both operations are initially ready, so a random one will be executed.
+ /// let oper = sel.try_select();
+ /// match oper {
+ /// Err(_) => panic!("both operations should be ready"),
+ /// Ok(oper) => match oper.index() {
+ /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
+ /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
+ try_select(&mut self.handles)
+ }
+
+ /// Blocks until one of the operations becomes ready and selects it.
+ ///
+ /// Once an operation becomes ready, it is selected and returned. If multiple operations are
+ /// ready at the same time, a random one among them is selected.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// The selected operation must be completed with [`SelectedOperation::send`]
+ /// or [`SelectedOperation::recv`].
+ ///
+ /// # Panics
+ ///
+ /// Panics if no operations have been added to `Select`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// let oper = sel.select();
+ /// match oper.index() {
+ /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
+ /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
+ /// _ => unreachable!(),
+ /// }
+ /// ```
+ pub fn select(&mut self) -> SelectedOperation<'a> {
+ select(&mut self.handles)
+ }
+
+ /// Blocks for a limited time until one of the operations becomes ready and selects it.
+ ///
+ /// If an operation becomes ready, it is selected and returned. If multiple operations are
+ /// ready at the same time, a random one among them is selected. If none of the operations
+ /// become ready for the specified duration, an error is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// The selected operation must be completed with [`SelectedOperation::send`]
+ /// or [`SelectedOperation::recv`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// let oper = sel.select_timeout(Duration::from_millis(500));
+ /// match oper {
+ /// Err(_) => panic!("should not have timed out"),
+ /// Ok(oper) => match oper.index() {
+ /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
+ /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ pub fn select_timeout(
+ &mut self,
+ timeout: Duration,
+ ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
+ select_timeout(&mut self.handles, timeout)
+ }
+
+ /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
+ ///
+ /// If an operation becomes ready, it is selected and returned. If multiple operations are
+ /// ready at the same time, a random one among them is selected. If none of the operations
+ /// become ready before the given deadline, an error is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// The selected operation must be completed with [`SelectedOperation::send`]
+ /// or [`SelectedOperation::recv`].
+ ///
+ /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
+ /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::{Instant, Duration};
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// let oper = sel.select_deadline(deadline);
+ /// match oper {
+ /// Err(_) => panic!("should not have timed out"),
+ /// Ok(oper) => match oper.index() {
+ /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
+ /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ pub fn select_deadline(
+ &mut self,
+ deadline: Instant,
+ ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
+ select_deadline(&mut self.handles, deadline)
+ }
+
+ /// Attempts to find a ready operation without blocking.
+ ///
+ /// If an operation is ready, its index is returned. If multiple operations are ready at the
+ /// same time, a random one among them is chosen. If none of the operations are ready, an error
+ /// is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// Note that this method might return with success spuriously, so it's a good idea to always
+ /// double check if the operation is really ready.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// s1.send(10).unwrap();
+ /// s2.send(20).unwrap();
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // Both operations are initially ready, so a random one will be chosen.
+ /// match sel.try_ready() {
+ /// Err(_) => panic!("both operations should be ready"),
+ /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
+ /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
+ /// Ok(_) => unreachable!(),
+ /// }
+ /// ```
+ pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
+ match run_ready(&mut self.handles, Timeout::Now) {
+ None => Err(TryReadyError),
+ Some(index) => Ok(index),
+ }
+ }
+
+ /// Blocks until one of the operations becomes ready.
+ ///
+ /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
+ /// the same time, a random one among them is chosen.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// Note that this method might return with success spuriously, so it's a good idea to always
+ /// double check if the operation is really ready.
+ ///
+ /// # Panics
+ ///
+ /// Panics if no operations have been added to `Select`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// match sel.ready() {
+ /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
+ /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
+ /// _ => unreachable!(),
+ /// }
+ /// ```
+ pub fn ready(&mut self) -> usize {
+ if self.handles.is_empty() {
+ panic!("no operations have been added to `Select`");
+ }
+
+ run_ready(&mut self.handles, Timeout::Never).unwrap()
+ }
+
+ /// Blocks for a limited time until one of the operations becomes ready.
+ ///
+ /// If an operation becomes ready, its index is returned. If multiple operations are ready at
+ /// the same time, a random one among them is chosen. If none of the operations become ready
+ /// for the specified duration, an error is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// Note that this method might return with success spuriously, so it's a good idea to double
+ /// check if the operation is really ready.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// match sel.ready_timeout(Duration::from_millis(500)) {
+ /// Err(_) => panic!("should not have timed out"),
+ /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
+ /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
+ /// Ok(_) => unreachable!(),
+ /// }
+ /// ```
+ pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
+ self.ready_deadline(Instant::now() + timeout)
+ }
+
+ /// Blocks until a given deadline, or until one of the operations becomes ready.
+ ///
+ /// If an operation becomes ready, its index is returned. If multiple operations are ready at
+ /// the same time, a random one among them is chosen. If none of the operations become ready
+ /// before the deadline, an error is returned.
+ ///
+ /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
+ /// even when it will simply return an error because the channel is disconnected.
+ ///
+ /// Note that this method might return with success spuriously, so it's a good idea to double
+ /// check if the operation is really ready.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_channel::{unbounded, Select};
+ ///
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// let (s1, r1) = unbounded();
+ /// let (s2, r2) = unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// s1.send(10).unwrap();
+ /// });
+ /// thread::spawn(move || s2.send(20).unwrap());
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r1);
+ /// let oper2 = sel.recv(&r2);
+ ///
+ /// // The second operation will be selected because it becomes ready first.
+ /// match sel.ready_deadline(deadline) {
+ /// Err(_) => panic!("should not have timed out"),
+ /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
+ /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
+ /// Ok(_) => unreachable!(),
+ /// }
+ /// ```
+ pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
+ match run_ready(&mut self.handles, Timeout::At(deadline)) {
+ None => Err(ReadyTimeoutError),
+ Some(index) => Ok(index),
+ }
+ }
+}
+
+impl<'a> Clone for Select<'a> {
+ fn clone(&self) -> Select<'a> {
+ Select {
+ handles: self.handles.clone(),
+ next_index: self.next_index,
+ }
+ }
+}
+
+impl<'a> Default for Select<'a> {
+ fn default() -> Select<'a> {
+ Select::new()
+ }
+}
+
+impl fmt::Debug for Select<'_> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Select { .. }")
+ }
+}
+
+/// A selected operation that needs to be completed.
+///
+/// To complete the operation, call [`send`] or [`recv`].
+///
+/// # Panics
+///
+/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
+/// `SelectedOperation` is dropped without completion, a panic occurs.
+///
+/// [`send`]: SelectedOperation::send
+/// [`recv`]: SelectedOperation::recv
+#[must_use]
+pub struct SelectedOperation<'a> {
+ /// Token needed to complete the operation.
+ token: Token,
+
+ /// The index of the selected operation.
+ index: usize,
+
+ /// The address of the selected `Sender` or `Receiver`.
+ ptr: *const u8,
+
+ /// Indicates that `Sender`s and `Receiver`s are borrowed.
+ _marker: PhantomData<&'a ()>,
+}
+
+impl SelectedOperation<'_> {
+ /// Returns the index of the selected operation.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, Select};
+ ///
+ /// let (s1, r1) = bounded::<()>(0);
+ /// let (s2, r2) = bounded::<()>(0);
+ /// let (s3, r3) = bounded::<()>(1);
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.send(&s1);
+ /// let oper2 = sel.recv(&r2);
+ /// let oper3 = sel.send(&s3);
+ ///
+ /// // Only the last operation is ready.
+ /// let oper = sel.select();
+ /// assert_eq!(oper.index(), 2);
+ /// assert_eq!(oper.index(), oper3);
+ ///
+ /// // Complete the operation.
+ /// oper.send(&s3, ()).unwrap();
+ /// ```
+ pub fn index(&self) -> usize {
+ self.index
+ }
+
+ /// Completes the send operation.
+ ///
+ /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
+ /// when the operation was added.
+ ///
+ /// # Panics
+ ///
+ /// Panics if an incorrect [`Sender`] reference is passed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, Select, SendError};
+ ///
+ /// let (s, r) = bounded::<i32>(0);
+ /// drop(r);
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.send(&s);
+ ///
+ /// let oper = sel.select();
+ /// assert_eq!(oper.index(), oper1);
+ /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
+ /// ```
+ pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
+ assert!(
+ s as *const Sender<T> as *const u8 == self.ptr,
+ "passed a sender that wasn't selected",
+ );
+ let res = unsafe { channel::write(s, &mut self.token, msg) };
+ mem::forget(self);
+ res.map_err(SendError)
+ }
+
+ /// Completes the receive operation.
+ ///
+ /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
+ /// when the operation was added.
+ ///
+ /// # Panics
+ ///
+ /// Panics if an incorrect [`Receiver`] reference is passed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_channel::{bounded, Select, RecvError};
+ ///
+ /// let (s, r) = bounded::<i32>(0);
+ /// drop(s);
+ ///
+ /// let mut sel = Select::new();
+ /// let oper1 = sel.recv(&r);
+ ///
+ /// let oper = sel.select();
+ /// assert_eq!(oper.index(), oper1);
+ /// assert_eq!(oper.recv(&r), Err(RecvError));
+ /// ```
+ pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
+ assert!(
+ r as *const Receiver<T> as *const u8 == self.ptr,
+ "passed a receiver that wasn't selected",
+ );
+ let res = unsafe { channel::read(r, &mut self.token) };
+ mem::forget(self);
+ res.map_err(|_| RecvError)
+ }
+}
+
+impl fmt::Debug for SelectedOperation<'_> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("SelectedOperation { .. }")
+ }
+}
+
+impl Drop for SelectedOperation<'_> {
+ fn drop(&mut self) {
+ panic!("dropped `SelectedOperation` without completing the operation");
+ }
+}