diff options
Diffstat (limited to 'src/sync/broadcast.rs')
-rw-r--r-- | src/sync/broadcast.rs | 478 |
1 files changed, 202 insertions, 276 deletions
diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs index 0c8716f..ee9aba0 100644 --- a/src/sync/broadcast.rs +++ b/src/sync/broadcast.rs @@ -21,7 +21,7 @@ //! ## Lagging //! //! As sent messages must be retained until **all** [`Receiver`] handles receive -//! a clone, broadcast channels are suspectible to the "slow receiver" problem. +//! a clone, broadcast channels are susceptible to the "slow receiver" problem. //! In this case, all but one receiver are able to receive values at the rate //! they are sent. Because one receiver is stalled, the channel starts to fill //! up. @@ -55,8 +55,8 @@ //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe //! [`Receiver`]: crate::sync::broadcast::Receiver //! [`channel`]: crate::sync::broadcast::channel -//! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged -//! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed +//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged +//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed //! [`recv`]: crate::sync::broadcast::Receiver::recv //! //! # Examples @@ -107,6 +107,7 @@ //! assert_eq!(20, rx.recv().await.unwrap()); //! assert_eq!(30, rx.recv().await.unwrap()); //! } +//! ``` use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; @@ -194,58 +195,99 @@ pub struct Receiver<T> { /// Next position to read from next: u64, - - /// Used to support the deprecated `poll_recv` fn - waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>, } -/// Error returned by [`Sender::send`][Sender::send]. -/// -/// A **send** operation can only fail if there are no active receivers, -/// implying that the message could never be received. The error contains the -/// message being sent as a payload so it can be recovered. -#[derive(Debug)] -pub struct SendError<T>(pub T); +pub mod error { + //! Broadcast error types -/// An error returned from the [`recv`] function on a [`Receiver`]. -/// -/// [`recv`]: crate::sync::broadcast::Receiver::recv -/// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug, PartialEq)] -pub enum RecvError { - /// There are no more active senders implying no further messages will ever - /// be sent. - Closed, + use std::fmt; - /// The receiver lagged too far behind. Attempting to receive again will - /// return the oldest message still retained by the channel. + /// Error returned by from the [`send`] function on a [`Sender`]. /// - /// Includes the number of skipped messages. - Lagged(u64), -} + /// A **send** operation can only fail if there are no active receivers, + /// implying that the message could never be received. The error contains the + /// message being sent as a payload so it can be recovered. + /// + /// [`send`]: crate::sync::broadcast::Sender::send + /// [`Sender`]: crate::sync::broadcast::Sender + #[derive(Debug)] + pub struct SendError<T>(pub T); -/// An error returned from the [`try_recv`] function on a [`Receiver`]. -/// -/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv -/// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug, PartialEq)] -pub enum TryRecvError { - /// The channel is currently empty. There are still active - /// [`Sender`][Sender] handles, so data may yet become available. - Empty, - - /// There are no more active senders implying no further messages will ever - /// be sent. - Closed, - - /// The receiver lagged too far behind and has been forcibly disconnected. - /// Attempting to receive again will return the oldest message still - /// retained by the channel. - /// - /// Includes the number of skipped messages. - Lagged(u64), + impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "channel closed") + } + } + + impl<T: fmt::Debug> std::error::Error for SendError<T> {} + + /// An error returned from the [`recv`] function on a [`Receiver`]. + /// + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`Receiver`]: crate::sync::broadcast::Receiver + #[derive(Debug, PartialEq)] + pub enum RecvError { + /// There are no more active senders implying no further messages will ever + /// be sent. + Closed, + + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), + } + + impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RecvError::Closed => write!(f, "channel closed"), + RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), + } + } + } + + impl std::error::Error for RecvError {} + + /// An error returned from the [`try_recv`] function on a [`Receiver`]. + /// + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv + /// [`Receiver`]: crate::sync::broadcast::Receiver + #[derive(Debug, PartialEq)] + pub enum TryRecvError { + /// The channel is currently empty. There are still active + /// [`Sender`] handles, so data may yet become available. + /// + /// [`Sender`]: crate::sync::broadcast::Sender + Empty, + + /// There are no more active senders implying no further messages will ever + /// be sent. + Closed, + + /// The receiver lagged too far behind and has been forcibly disconnected. + /// Attempting to receive again will return the oldest message still + /// retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), + } + + impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TryRecvError::Empty => write!(f, "channel empty"), + TryRecvError::Closed => write!(f, "channel closed"), + TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), + } + } + } + + impl std::error::Error for TryRecvError {} } +use self::error::*; + /// Data shared between senders and receivers struct Shared<T> { /// slots in the channel @@ -273,7 +315,7 @@ struct Tail { closed: bool, /// Receivers waiting for a value - waiters: LinkedList<Waiter>, + waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, } /// Slot in the buffer @@ -373,8 +415,8 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv -/// [`SendError`]: crate::sync::broadcast::SendError -/// [`RecvError`]: crate::sync::broadcast::RecvError +/// [`SendError`]: crate::sync::broadcast::error::SendError +/// [`RecvError`]: crate::sync::broadcast::error::RecvError /// /// # Examples /// @@ -400,7 +442,7 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// tx.send(20).unwrap(); /// } /// ``` -pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { +pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { assert!(capacity > 0, "capacity is empty"); assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); @@ -433,7 +475,6 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { let rx = Receiver { shared: shared.clone(), next: 0, - waiter: None, }; let tx = Sender { shared }; @@ -528,23 +569,7 @@ impl<T> Sender<T> { /// ``` pub fn subscribe(&self) -> Receiver<T> { let shared = self.shared.clone(); - - let mut tail = shared.tail.lock().unwrap(); - - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = tail.pos; - - drop(tail); - - Receiver { - shared, - next, - waiter: None, - } + new_receiver(shared) } /// Returns the number of active receivers @@ -584,12 +609,12 @@ impl<T> Sender<T> { /// } /// ``` pub fn receiver_count(&self) -> usize { - let tail = self.shared.tail.lock().unwrap(); + let tail = self.shared.tail.lock(); tail.rx_cnt } fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> { - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); if tail.rx_cnt == 0 { return Err(SendError(value)); @@ -634,6 +659,22 @@ impl<T> Sender<T> { } } +fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> { + let mut tail = shared.tail.lock(); + + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); + } + + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + + let next = tail.pos; + + drop(tail); + + Receiver { shared, next } +} + impl Tail { fn notify_rx(&mut self) { while let Some(mut waiter) = self.waiters.pop_back() { @@ -695,7 +736,7 @@ impl<T> Receiver<T> { // the slot lock. drop(slot); - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); @@ -784,106 +825,7 @@ impl<T> Receiver<T> { } } -impl<T> Receiver<T> -where - T: Clone, -{ - /// Attempts to return a pending value on this receiver without awaiting. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// await on a receiver. - /// - /// Compared with [`recv`], this function has three failure cases instead of one - /// (one for closed, one for an empty buffer, one for a lagging receiver). - /// - /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have - /// dropped, indicating that no further values can be sent on the channel. - /// - /// If the [`Receiver`] handle falls behind, once the channel is full, newly - /// sent values will overwrite old values. At this point, a call to [`recv`] - /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s - /// internal cursor is updated to point to the oldest value still held by - /// the channel. A subsequent call to [`try_recv`] will return this value - /// **unless** it has been since overwritten. If there are no values to - /// receive, `Err(TryRecvError::Empty)` is returned. - /// - /// [`recv`]: crate::sync::broadcast::Receiver::recv - /// [`Receiver`]: crate::sync::broadcast::Receiver - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::broadcast; - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, mut rx) = broadcast::channel(16); - /// - /// assert!(rx.try_recv().is_err()); - /// - /// tx.send(10).unwrap(); - /// - /// let value = rx.try_recv().unwrap(); - /// assert_eq!(10, value); - /// } - /// ``` - pub fn try_recv(&mut self) -> Result<T, TryRecvError> { - let guard = self.recv_ref(None)?; - guard.clone_value().ok_or(TryRecvError::Closed) - } - - #[doc(hidden)] - #[deprecated(since = "0.2.21", note = "use async fn recv()")] - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { - use Poll::{Pending, Ready}; - - // The borrow checker prohibits calling `self.poll_ref` while passing in - // a mutable ref to a field (as it should). To work around this, - // `waiter` is first *removed* from `self` then `poll_recv` is called. - // - // However, for safety, we must ensure that `waiter` is **not** dropped. - // It could be contained in the intrusive linked list. The `Receiver` - // drop implementation handles cleanup. - // - // The guard pattern is used to ensure that, on return, even due to - // panic, the waiter node is replaced on `self`. - - struct Guard<'a, T> { - waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>, - receiver: &'a mut Receiver<T>, - } - - impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - self.receiver.waiter = self.waiter.take(); - } - } - - let waiter = self.waiter.take().or_else(|| { - Some(Box::pin(UnsafeCell::new(Waiter { - queued: false, - waker: None, - pointers: linked_list::Pointers::new(), - _p: PhantomPinned, - }))) - }); - - let guard = Guard { - waiter, - receiver: self, - }; - let res = guard - .receiver - .recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker()))); - - match res { - Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)), - Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)), - Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))), - Err(TryRecvError::Empty) => Pending, - } - } - +impl<T: Clone> Receiver<T> { /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent @@ -948,54 +890,103 @@ where /// assert_eq!(20, rx.recv().await.unwrap()); /// assert_eq!(30, rx.recv().await.unwrap()); /// } + /// ``` pub async fn recv(&mut self) -> Result<T, RecvError> { let fut = Recv::<_, T>::new(Borrow(self)); fut.await } -} -#[cfg(feature = "stream")] -#[doc(hidden)] -#[deprecated(since = "0.2.21", note = "use `into_stream()`")] -impl<T> crate::stream::Stream for Receiver<T> -where - T: Clone, -{ - type Item = Result<T, RecvError>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Result<T, RecvError>>> { - #[allow(deprecated)] - self.poll_recv(cx).map(|v| match v { - Ok(v) => Some(Ok(v)), - lag @ Err(RecvError::Lagged(_)) => Some(lag), - Err(RecvError::Closed) => None, - }) + /// Attempts to return a pending value on this receiver without awaiting. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// await on a receiver. + /// + /// Compared with [`recv`], this function has three failure cases instead of two + /// (one for closed, one for an empty buffer, one for a lagging receiver). + /// + /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have + /// dropped, indicating that no further values can be sent on the channel. + /// + /// If the [`Receiver`] handle falls behind, once the channel is full, newly + /// sent values will overwrite old values. At this point, a call to [`recv`] + /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s + /// internal cursor is updated to point to the oldest value still held by + /// the channel. A subsequent call to [`try_recv`] will return this value + /// **unless** it has been since overwritten. If there are no values to + /// receive, `Err(TryRecvError::Empty)` is returned. + /// + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = broadcast::channel(16); + /// + /// assert!(rx.try_recv().is_err()); + /// + /// tx.send(10).unwrap(); + /// + /// let value = rx.try_recv().unwrap(); + /// assert_eq!(10, value); + /// } + /// ``` + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + let guard = self.recv_ref(None)?; + guard.clone_value().ok_or(TryRecvError::Closed) + } + + /// Convert the receiver into a `Stream`. + /// + /// The conversion allows using `Receiver` with APIs that require stream + /// values. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::StreamExt; + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = broadcast::channel(128); + /// + /// tokio::spawn(async move { + /// for i in 0..10_i32 { + /// tx.send(i).unwrap(); + /// } + /// }); + /// + /// // Streams must be pinned to iterate. + /// tokio::pin! { + /// let stream = rx + /// .into_stream() + /// .filter(Result::is_ok) + /// .map(Result::unwrap) + /// .filter(|v| v % 2 == 0) + /// .map(|v| v + 1); + /// } + /// + /// while let Some(i) = stream.next().await { + /// println!("{}", i); + /// } + /// } + /// ``` + #[cfg(feature = "stream")] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> { + Recv::new(Borrow(self)) } } impl<T> Drop for Receiver<T> { fn drop(&mut self) { - let mut tail = self.shared.tail.lock().unwrap(); - - if let Some(waiter) = &self.waiter { - // safety: tail lock is held - let queued = waiter.with(|ptr| unsafe { (*ptr).queued }); - - if queued { - // Remove the node - // - // safety: tail lock is held and the wait node is verified to be in - // the list. - unsafe { - waiter.with_mut(|ptr| { - tail.waiters.remove((&mut *ptr).into()); - }); - } - } - } + let mut tail = self.shared.tail.lock(); tail.rx_cnt -= 1; let until = tail.pos; @@ -1070,48 +1061,6 @@ where cfg_stream! { use futures_core::Stream; - impl<T: Clone> Receiver<T> { - /// Convert the receiver into a `Stream`. - /// - /// The conversion allows using `Receiver` with APIs that require stream - /// values. - /// - /// # Examples - /// - /// ``` - /// use tokio::stream::StreamExt; - /// use tokio::sync::broadcast; - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, rx) = broadcast::channel(128); - /// - /// tokio::spawn(async move { - /// for i in 0..10_i32 { - /// tx.send(i).unwrap(); - /// } - /// }); - /// - /// // Streams must be pinned to iterate. - /// tokio::pin! { - /// let stream = rx - /// .into_stream() - /// .filter(Result::is_ok) - /// .map(Result::unwrap) - /// .filter(|v| v % 2 == 0) - /// .map(|v| v + 1); - /// } - /// - /// while let Some(i) = stream.next().await { - /// println!("{}", i); - /// } - /// } - /// ``` - pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> { - Recv::new(Borrow(self)) - } - } - impl<R, T: Clone> Stream for Recv<R, T> where R: AsMut<Receiver<T>>, @@ -1141,7 +1090,7 @@ where fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap(); + let mut tail = self.receiver.as_mut().shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); @@ -1211,27 +1160,4 @@ impl<'a, T> Drop for RecvGuard<'a, T> { } } -impl fmt::Display for RecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RecvError::Closed => write!(f, "channel closed"), - RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), - } - } -} - -impl std::error::Error for RecvError {} - -impl fmt::Display for TryRecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TryRecvError::Empty => write!(f, "channel empty"), - TryRecvError::Closed => write!(f, "channel closed"), - TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), - } - } -} - -impl std::error::Error for TryRecvError {} - fn is_unpin<T: Unpin>() {} |