diff options
Diffstat (limited to 'src/flavors/zero.rs')
-rw-r--r-- | src/flavors/zero.rs | 63 |
1 files changed, 39 insertions, 24 deletions
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index 9790b77..4afbd8f 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -6,6 +6,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; +use std::{fmt, ptr}; use crossbeam_utils::Backoff; @@ -16,7 +17,19 @@ use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub(crate) type ZeroToken = usize; +pub struct ZeroToken(*mut ()); + +impl Default for ZeroToken { + fn default() -> Self { + Self(ptr::null_mut()) + } +} + +impl fmt::Debug for ZeroToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&(self.0 as usize), f) + } +} /// A slot for passing one message from a sender to a receiver. struct Packet<T> { @@ -117,10 +130,10 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -130,11 +143,11 @@ impl<T> Channel<T> { /// Writes a message into the packet. pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(msg); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); packet.msg.get().write(Some(msg)); packet.ready.store(true, Ordering::Release); Ok(()) @@ -146,10 +159,10 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -159,11 +172,11 @@ impl<T> Channel<T> { /// Reads a message from the packet. pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(()); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); if packet.on_stack { // The message has been in the packet from the beginning, so there is no need to wait @@ -177,7 +190,7 @@ impl<T> Channel<T> { // heap-allocated packet. packet.wait_ready(); let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); + drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); Ok(msg) } } @@ -189,7 +202,7 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -213,7 +226,7 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -228,10 +241,10 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::message_on_stack(msg); + let mut packet = Packet::<T>::message_on_stack(msg); inner .senders - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); inner.receivers.notify(); drop(inner); @@ -266,7 +279,7 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } } else if inner.is_disconnected { @@ -283,7 +296,7 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); @@ -297,10 +310,12 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::empty_on_stack(); - inner - .receivers - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + let mut packet = Packet::<T>::empty_on_stack(); + inner.receivers.register_with_packet( + oper, + &mut packet as *mut Packet<T> as *mut (), + cx, + ); inner.senders.notify(); drop(inner); @@ -385,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> { let mut inner = self.0.inner.lock(); inner .receivers - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.senders.notify(); inner.senders.can_select() || inner.is_disconnected } @@ -399,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } @@ -435,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> { let mut inner = self.0.inner.lock(); inner .senders - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.receivers.notify(); inner.receivers.can_select() || inner.is_disconnected } @@ -449,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } |