diff options
Diffstat (limited to 'src/mpsc')
-rw-r--r-- | src/mpsc/mod.rs | 193 | ||||
-rw-r--r-- | src/mpsc/queue.rs | 22 | ||||
-rw-r--r-- | src/mpsc/sink_impl.rs | 58 |
3 files changed, 95 insertions, 178 deletions
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs index dd50343..28612da 100644 --- a/src/mpsc/mod.rs +++ b/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll, Waker}; use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; use std::fmt; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,9 +209,7 @@ impl SendError { impl<T> fmt::Debug for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError") - .field("kind", &self.err.kind) - .finish() + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() } } @@ -251,8 +249,7 @@ impl<T> TrySendError<T> { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError") - .finish() + f.debug_tuple("TryRecvError").finish() } } @@ -335,10 +332,7 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { - task: None, - is_parked: false, - } + Self { task: None, is_parked: false } } fn notify(&mut self) { @@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { maybe_parked: false, }; - let rx = Receiver { - inner: Some(inner), - }; + let rx = Receiver { inner: Some(inner) }; (Sender(Some(tx)), rx) } @@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { - let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { - inner: inner.clone(), - }; + let tx = UnboundedSenderInner { inner: inner.clone() }; - let rx = UnboundedReceiver { - inner: Some(inner), - }; + let rx = UnboundedReceiver { inner: Some(inner) }; (UnboundedSender(Some(tx)), rx) } @@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })) + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) } } - // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> { fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Full, - }, - val: msg, - }); + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); } // The channel has capacity to accept the message, so send it @@ -531,9 +511,7 @@ impl<T> BoundedSenderInner<T> { // Do the send without failing. // Can be called only by bounded sender. #[allow(clippy::debug_assert_with_mut_call)] - fn do_send_b(&mut self, msg: T) - -> Result<(), TrySendError<T>> - { + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { // Anyone callig do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -551,12 +529,12 @@ impl<T> BoundedSenderInner<T> { // the configured buffer size num_messages > self.inner.buffer } - None => return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }), + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } }; // If the channel has reached capacity, then the sender task needs to @@ -600,16 +578,17 @@ impl<T> BoundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -644,15 +623,10 @@ impl<T> BoundedSenderInner<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })); + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); } self.poll_unparked(Some(cx)).map(Ok) @@ -699,7 +673,7 @@ impl<T> BoundedSenderInner<T> { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()) + return Poll::Ready(()); } // At this point, an unpark request is pending, so there will be an @@ -724,12 +698,7 @@ impl<T> Sender<T> { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } } @@ -739,8 +708,7 @@ impl<T> Sender<T> { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg) - .map_err(|e| e.err) + self.try_send(msg).map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -755,13 +723,8 @@ impl<T> Sender<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_mut().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready(cx) } @@ -799,7 +762,10 @@ impl<T> Sender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -809,13 +775,8 @@ impl<T> Sender<T> { impl<T> UnboundedSender<T> { /// Check if the channel is ready to receive a message. - pub fn poll_ready( - &self, - _: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_ref().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready_nb() } @@ -845,12 +806,7 @@ impl<T> UnboundedSender<T> { } } - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } /// Send a message on the channel. @@ -858,8 +814,7 @@ impl<T> UnboundedSender<T> { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg) - .map_err(|e| e.err) + self.do_send_nb(msg).map_err(|e| e.err) } /// Sends a message along this channel. @@ -888,7 +843,10 @@ impl<T> UnboundedSender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -928,9 +886,7 @@ impl<T> Clone for UnboundedSenderInner<T> { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { - inner: self.inner.clone(), - }; + return Self { inner: self.inner.clone() }; } Err(actual) => curr = actual, } @@ -1021,19 +977,22 @@ impl<T> Receiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1098,18 +1057,15 @@ impl<T> FusedStream for Receiver<T> { impl<T> Stream for Receiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { - // Try to read a message off of the message queue. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1169,19 +1125,22 @@ impl<T> UnboundedReceiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1230,10 +1189,7 @@ impl<T> FusedStream for UnboundedReceiver<T> { impl<T> Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1241,7 +1197,7 @@ impl<T> Stream for UnboundedReceiver<T> { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1339,10 +1295,7 @@ impl State { */ fn decode_state(num: usize) -> State { - State { - is_open: num & OPEN_MASK == OPEN_MASK, - num_messages: num & MAX_CAPACITY, - } + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } } fn encode_state(state: &State) -> usize { diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs index b00e1b1..57dc7f5 100644 --- a/src/mpsc/queue.rs +++ b/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; -use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; /// A result of the `pop` function. pub(super) enum PopResult<T> { @@ -76,15 +76,12 @@ pub(super) struct Queue<T> { tail: UnsafeCell<*mut Node<T>>, } -unsafe impl<T: Send> Send for Queue<T> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Self { - Box::into_raw(Box::new(Self { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) } } @@ -93,10 +90,7 @@ impl<T> Queue<T> { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -133,7 +127,11 @@ impl<T> Queue<T> { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs index 4ce66b4..1be2016 100644 --- a/src/mpsc/sink_impl.rs +++ b/src/mpsc/sink_impl.rs @@ -6,24 +6,15 @@ use std::pin::Pin; impl<T> Sink<T> for Sender<T> { type Error = SendError; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { (*self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> { } } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> { impl<T> Sink<T> for UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Self::poll_ready(&*self, cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> { impl<T> Sink<T> for &UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg) - .map_err(TrySendError::into_send_error) + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.close_channel(); Poll::Ready(Ok(())) } |