aboutsummaryrefslogtreecommitdiff
path: root/src/mpsc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mpsc')
-rw-r--r--src/mpsc/mod.rs193
-rw-r--r--src/mpsc/queue.rs22
-rw-r--r--src/mpsc/sink_impl.rs58
3 files changed, 95 insertions, 178 deletions
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index dd50343..28612da 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -79,13 +79,13 @@
// by the queue structure.
use futures_core::stream::{FusedStream, Stream};
-use futures_core::task::{Context, Poll, Waker};
use futures_core::task::__internal::AtomicWaker;
+use futures_core::task::{Context, Poll, Waker};
use std::fmt;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
use std::thread;
use crate::mpsc::queue::Queue;
@@ -209,9 +209,7 @@ impl SendError {
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TrySendError")
- .field("kind", &self.err.kind)
- .finish()
+ f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
}
}
@@ -251,8 +249,7 @@ impl<T> TrySendError<T> {
impl fmt::Debug for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_tuple("TryRecvError")
- .finish()
+ f.debug_tuple("TryRecvError").finish()
}
}
@@ -335,10 +332,7 @@ struct SenderTask {
impl SenderTask {
fn new() -> Self {
- Self {
- task: None,
- is_parked: false,
- }
+ Self { task: None, is_parked: false }
}
fn notify(&mut self) {
@@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
maybe_parked: false,
};
- let rx = Receiver {
- inner: Some(inner),
- };
+ let rx = Receiver { inner: Some(inner) };
(Sender(Some(tx)), rx)
}
@@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
-
let inner = Arc::new(UnboundedInner {
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
@@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
recv_task: AtomicWaker::new(),
});
- let tx = UnboundedSenderInner {
- inner: inner.clone(),
- };
+ let tx = UnboundedSenderInner { inner: inner.clone() };
- let rx = UnboundedReceiver {
- inner: Some(inner),
- };
+ let rx = UnboundedReceiver { inner: Some(inner) };
(UnboundedSender(Some(tx)), rx)
}
@@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> {
if state.is_open {
Poll::Ready(Ok(()))
} else {
- Poll::Ready(Err(SendError {
- kind: SendErrorKind::Disconnected,
- }))
+ Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
}
}
-
// Push message to the queue and signal to the receiver
fn queue_push_and_signal(&self, msg: T) {
// Push the message onto the message queue
@@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(state.num_messages < MAX_CAPACITY, "buffer space \
- exhausted; sending this messages would overflow the state");
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => {
- return Some(state.num_messages)
- }
+ Ok(_) => return Some(state.num_messages),
Err(actual) => curr = actual,
}
}
@@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> {
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(None).is_ready() {
- return Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Full,
- },
- val: msg,
- });
+ return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
}
// The channel has capacity to accept the message, so send it
@@ -531,9 +511,7 @@ impl<T> BoundedSenderInner<T> {
// Do the send without failing.
// Can be called only by bounded sender.
#[allow(clippy::debug_assert_with_mut_call)]
- fn do_send_b(&mut self, msg: T)
- -> Result<(), TrySendError<T>>
- {
+ fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// Anyone callig do_send *should* make sure there is room first,
// but assert here for tests as a sanity check.
debug_assert!(self.poll_unparked(None).is_ready());
@@ -551,12 +529,12 @@ impl<T> BoundedSenderInner<T> {
// the configured buffer size
num_messages > self.inner.buffer
}
- None => return Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- }),
+ None => {
+ return Err(TrySendError {
+ err: SendError { kind: SendErrorKind::Disconnected },
+ val: msg,
+ })
+ }
};
// If the channel has reached capacity, then the sender task needs to
@@ -600,16 +578,17 @@ impl<T> BoundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(state.num_messages < MAX_CAPACITY, "buffer space \
- exhausted; sending this messages would overflow the state");
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => {
- return Some(state.num_messages)
- }
+ Ok(_) => return Some(state.num_messages),
Err(actual) => curr = actual,
}
}
@@ -644,15 +623,10 @@ impl<T> BoundedSenderInner<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- fn poll_ready(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
- return Poll::Ready(Err(SendError {
- kind: SendErrorKind::Disconnected,
- }));
+ return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
}
self.poll_unparked(Some(cx)).map(Ok)
@@ -699,7 +673,7 @@ impl<T> BoundedSenderInner<T> {
if !task.is_parked {
self.maybe_parked = false;
- return Poll::Ready(())
+ return Poll::Ready(());
}
// At this point, an unpark request is pending, so there will be an
@@ -724,12 +698,7 @@ impl<T> Sender<T> {
if let Some(inner) = &mut self.0 {
inner.try_send(msg)
} else {
- Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- })
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
}
}
@@ -739,8 +708,7 @@ impl<T> Sender<T> {
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
/// ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.try_send(msg)
- .map_err(|e| e.err)
+ self.try_send(msg).map_err(|e| e.err)
}
/// Polls the channel to determine if there is guaranteed capacity to send
@@ -755,13 +723,8 @@ impl<T> Sender<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- pub fn poll_ready(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_mut().ok_or(SendError {
- kind: SendErrorKind::Disconnected,
- })?;
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
inner.poll_ready(cx)
}
@@ -799,7 +762,10 @@ impl<T> Sender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -809,13 +775,8 @@ impl<T> Sender<T> {
impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
- pub fn poll_ready(
- &self,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_ref().ok_or(SendError {
- kind: SendErrorKind::Disconnected,
- })?;
+ pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
inner.poll_ready_nb()
}
@@ -845,12 +806,7 @@ impl<T> UnboundedSender<T> {
}
}
- Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- })
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
}
/// Send a message on the channel.
@@ -858,8 +814,7 @@ impl<T> UnboundedSender<T> {
/// This method should only be called after `poll_ready` has been used to
/// verify that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.do_send_nb(msg)
- .map_err(|e| e.err)
+ self.do_send_nb(msg).map_err(|e| e.err)
}
/// Sends a message along this channel.
@@ -888,7 +843,10 @@ impl<T> UnboundedSender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -928,9 +886,7 @@ impl<T> Clone for UnboundedSenderInner<T> {
Ok(_) => {
// The ABA problem doesn't matter here. We only care that the
// number of senders never exceeds the maximum.
- return Self {
- inner: self.inner.clone(),
- };
+ return Self { inner: self.inner.clone() };
}
Err(actual) => curr = actual,
}
@@ -1021,19 +977,22 @@ impl<T> Receiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function will panic if called after `try_next` or `poll_next` has
- /// returned `None`.
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => {
- Ok(msg)
- },
+ Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1098,18 +1057,15 @@ impl<T> FusedStream for Receiver<T> {
impl<T> Stream for Receiver<T> {
type Item = T;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T>> {
- // Try to read a message off of the message queue.
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ // Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
if msg.is_none() {
self.inner = None;
}
Poll::Ready(msg)
- },
+ }
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1169,19 +1125,22 @@ impl<T> UnboundedReceiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function will panic if called after `try_next` or `poll_next` has
- /// returned `None`.
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => {
- Ok(msg)
- },
+ Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1230,10 +1189,7 @@ impl<T> FusedStream for UnboundedReceiver<T> {
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
@@ -1241,7 +1197,7 @@ impl<T> Stream for UnboundedReceiver<T> {
self.inner = None;
}
Poll::Ready(msg)
- },
+ }
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1339,10 +1295,7 @@ impl State {
*/
fn decode_state(num: usize) -> State {
- State {
- is_open: num & OPEN_MASK == OPEN_MASK,
- num_messages: num & MAX_CAPACITY,
- }
+ State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
}
fn encode_state(state: &State) -> usize {
diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs
index b00e1b1..57dc7f5 100644
--- a/src/mpsc/queue.rs
+++ b/src/mpsc/queue.rs
@@ -43,10 +43,10 @@
pub(super) use self::PopResult::*;
-use std::thread;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
+use std::thread;
/// A result of the `pop` function.
pub(super) enum PopResult<T> {
@@ -76,15 +76,12 @@ pub(super) struct Queue<T> {
tail: UnsafeCell<*mut Node<T>>,
}
-unsafe impl<T: Send> Send for Queue<T> { }
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send> Send for Queue<T> {}
+unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Self {
- Box::into_raw(Box::new(Self {
- next: AtomicPtr::new(ptr::null_mut()),
- value: v,
- }))
+ Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
}
}
@@ -93,10 +90,7 @@ impl<T> Queue<T> {
/// one consumer.
pub(super) fn new() -> Self {
let stub = unsafe { Node::new(None) };
- Self {
- head: AtomicPtr::new(stub),
- tail: UnsafeCell::new(stub),
- }
+ Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
/// Pushes a new value onto this queue.
@@ -133,7 +127,11 @@ impl<T> Queue<T> {
return Data(ret);
}
- if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+ if self.head.load(Ordering::Acquire) == tail {
+ Empty
+ } else {
+ Inconsistent
+ }
}
/// Pop an element similarly to `pop` function, but spin-wait on inconsistent
diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs
index 4ce66b4..1be2016 100644
--- a/src/mpsc/sink_impl.rs
+++ b/src/mpsc/sink_impl.rs
@@ -6,24 +6,15 @@ use std::pin::Pin;
impl<T> Sink<T> for Sender<T> {
type Error = SendError;
- fn poll_ready(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self).poll_ready(cx)
}
- fn start_send(
- mut self: Pin<&mut Self>,
- msg: T,
- ) -> Result<(), Self::Error> {
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
(*self).start_send(msg)
}
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match (*self).poll_ready(cx) {
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
// If the receiver disconnected, we consider the sink to be flushed.
@@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> {
}
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> {
impl<T> Sink<T> for UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Self::poll_ready(&*self, cx)
}
- fn start_send(
- mut self: Pin<&mut Self>,
- msg: T,
- ) -> Result<(), Self::Error> {
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
Self::start_send(&mut *self, msg)
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> {
impl<T> Sink<T> for &UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
UnboundedSender::poll_ready(*self, cx)
}
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.unbounded_send(msg)
- .map_err(TrySendError::into_send_error)
+ self.unbounded_send(msg).map_err(TrySendError::into_send_error)
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close_channel();
Poll::Ready(Ok(()))
}