diff options
Diffstat (limited to 'src/flavors/zero.rs')
-rw-r--r-- | src/flavors/zero.rs | 102 |
1 files changed, 61 insertions, 41 deletions
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index be647b5..4afbd8f 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -6,6 +6,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; +use std::{fmt, ptr}; use crossbeam_utils::Backoff; @@ -16,7 +17,19 @@ use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub type ZeroToken = usize; +pub struct ZeroToken(*mut ()); + +impl Default for ZeroToken { + fn default() -> Self { + Self(ptr::null_mut()) + } +} + +impl fmt::Debug for ZeroToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&(self.0 as usize), f) + } +} /// A slot for passing one message from a sender to a receiver. struct Packet<T> { @@ -80,7 +93,7 @@ struct Inner { } /// Zero-capacity channel. -pub struct Channel<T> { +pub(crate) struct Channel<T> { /// Inner representation of the channel. inner: Spinlock<Inner>, @@ -90,7 +103,7 @@ pub struct Channel<T> { impl<T> Channel<T> { /// Constructs a new zero-capacity channel. - pub fn new() -> Self { + pub(crate) fn new() -> Self { Channel { inner: Spinlock::new(Inner { senders: Waker::new(), @@ -102,12 +115,12 @@ impl<T> Channel<T> { } /// Returns a receiver handle to the channel. - pub fn receiver(&self) -> Receiver<'_, T> { + pub(crate) fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. - pub fn sender(&self) -> Sender<'_, T> { + pub(crate) fn sender(&self) -> Sender<'_, T> { Sender(self) } @@ -117,10 +130,10 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -128,13 +141,13 @@ impl<T> Channel<T> { } /// Writes a message into the packet. - pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(msg); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); packet.msg.get().write(Some(msg)); packet.ready.store(true, Ordering::Release); Ok(()) @@ -146,10 +159,10 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -157,13 +170,13 @@ impl<T> Channel<T> { } /// Reads a message from the packet. - pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(()); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); if packet.on_stack { // The message has been in the packet from the beginning, so there is no need to wait @@ -177,19 +190,19 @@ impl<T> Channel<T> { // heap-allocated packet. packet.wait_ready(); let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); + drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); Ok(msg) } } /// Attempts to send a message into the channel. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -203,13 +216,17 @@ impl<T> Channel<T> { } /// Sends a message into the channel. - pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + pub(crate) fn send( + &self, + msg: T, + deadline: Option<Instant>, + ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -224,10 +241,10 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::message_on_stack(msg); + let mut packet = Packet::<T>::message_on_stack(msg); inner .senders - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); inner.receivers.notify(); drop(inner); @@ -256,13 +273,13 @@ impl<T> Channel<T> { } /// Attempts to receive a message without blocking. - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } } else if inner.is_disconnected { @@ -273,13 +290,13 @@ impl<T> Channel<T> { } /// Receives a message from the channel. - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); @@ -293,10 +310,12 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::empty_on_stack(); - inner - .receivers - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + let mut packet = Packet::<T>::empty_on_stack(); + inner.receivers.register_with_packet( + oper, + &mut packet as *mut Packet<T> as *mut (), + cx, + ); inner.senders.notify(); drop(inner); @@ -325,7 +344,7 @@ impl<T> Channel<T> { /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. - pub fn disconnect(&self) -> bool { + pub(crate) fn disconnect(&self) -> bool { let mut inner = self.inner.lock(); if !inner.is_disconnected { @@ -339,31 +358,32 @@ impl<T> Channel<T> { } /// Returns the current number of messages inside the channel. - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { 0 } /// Returns the capacity of the channel. - pub fn capacity(&self) -> Option<usize> { + #[allow(clippy::unnecessary_wraps)] // This is intentional. + pub(crate) fn capacity(&self) -> Option<usize> { Some(0) } /// Returns `true` if the channel is empty. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { true } /// Returns `true` if the channel is full. - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { true } } /// Receiver handle to a channel. -pub struct Receiver<'a, T>(&'a Channel<T>); +pub(crate) struct Receiver<'a, T>(&'a Channel<T>); /// Sender handle to a channel. -pub struct Sender<'a, T>(&'a Channel<T>); +pub(crate) struct Sender<'a, T>(&'a Channel<T>); impl<T> SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { @@ -380,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> { let mut inner = self.0.inner.lock(); inner .receivers - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.senders.notify(); inner.senders.can_select() || inner.is_disconnected } @@ -394,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } @@ -430,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> { let mut inner = self.0.inner.lock(); inner .senders - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.receivers.notify(); inner.receivers.can_select() || inner.is_disconnected } @@ -444,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } |