aboutsummaryrefslogtreecommitdiff
path: root/src/flavors/zero.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/flavors/zero.rs')
-rw-r--r--src/flavors/zero.rs102
1 files changed, 61 insertions, 41 deletions
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
index be647b5..4afbd8f 100644
--- a/src/flavors/zero.rs
+++ b/src/flavors/zero.rs
@@ -6,6 +6,7 @@ use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
+use std::{fmt, ptr};
use crossbeam_utils::Backoff;
@@ -16,7 +17,19 @@ use crate::utils::Spinlock;
use crate::waker::Waker;
/// A pointer to a packet.
-pub type ZeroToken = usize;
+pub struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
@@ -80,7 +93,7 @@ struct Inner {
}
/// Zero-capacity channel.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
/// Inner representation of the channel.
inner: Spinlock<Inner>,
@@ -90,7 +103,7 @@ pub struct Channel<T> {
impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Channel {
inner: Spinlock::new(Inner {
senders: Waker::new(),
@@ -102,12 +115,12 @@ impl<T> Channel<T> {
}
/// Returns a receiver handle to the channel.
- pub fn receiver(&self) -> Receiver<'_, T> {
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
- pub fn sender(&self) -> Sender<'_, T> {
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
@@ -117,10 +130,10 @@ impl<T> Channel<T> {
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -128,13 +141,13 @@ impl<T> Channel<T> {
}
/// Writes a message into the packet.
- pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(msg);
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
@@ -146,10 +159,10 @@ impl<T> Channel<T> {
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -157,13 +170,13 @@ impl<T> Channel<T> {
}
/// Reads a message from the packet.
- pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(());
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
@@ -177,19 +190,19 @@ impl<T> Channel<T> {
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
- drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
+ drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
}
/// Attempts to send a message into the channel.
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -203,13 +216,17 @@ impl<T> Channel<T> {
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -224,10 +241,10 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::message_on_stack(msg);
+ let mut packet = Packet::<T>::message_on_stack(msg);
inner
.senders
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);
@@ -256,13 +273,13 @@ impl<T> Channel<T> {
}
/// Attempts to receive a message without blocking.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
@@ -273,13 +290,13 @@ impl<T> Channel<T> {
}
/// Receives a message from the channel.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
@@ -293,10 +310,12 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::empty_on_stack();
- inner
- .receivers
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
inner.senders.notify();
drop(inner);
@@ -325,7 +344,7 @@ impl<T> Channel<T> {
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect(&self) -> bool {
let mut inner = self.inner.lock();
if !inner.is_disconnected {
@@ -339,31 +358,32 @@ impl<T> Channel<T> {
}
/// Returns the current number of messages inside the channel.
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
- pub fn capacity(&self) -> Option<usize> {
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
/// Returns `true` if the channel is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
true
}
}
/// Receiver handle to a channel.
-pub struct Receiver<'a, T>(&'a Channel<T>);
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
-pub struct Sender<'a, T>(&'a Channel<T>);
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
@@ -380,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
let mut inner = self.0.inner.lock();
inner
.receivers
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
@@ -394,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}
@@ -430,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> {
let mut inner = self.0.inner.lock();
inner
.senders
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
@@ -444,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}