aboutsummaryrefslogtreecommitdiff
path: root/src/flavors/zero.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/flavors/zero.rs')
-rw-r--r--src/flavors/zero.rs466
1 files changed, 466 insertions, 0 deletions
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
new file mode 100644
index 0000000..be647b5
--- /dev/null
+++ b/src/flavors/zero.rs
@@ -0,0 +1,466 @@
+//! Zero-capacity channel.
+//!
+//! This kind of channel is also known as *rendezvous* channel.
+
+use std::cell::UnsafeCell;
+use std::marker::PhantomData;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Instant;
+
+use crossbeam_utils::Backoff;
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use crate::select::{Operation, SelectHandle, Selected, Token};
+use crate::utils::Spinlock;
+use crate::waker::Waker;
+
+/// A pointer to a packet.
+pub type ZeroToken = usize;
+
+/// A slot for passing one message from a sender to a receiver.
+struct Packet<T> {
+ /// Equals `true` if the packet is allocated on the stack.
+ on_stack: bool,
+
+ /// Equals `true` once the packet is ready for reading or writing.
+ ready: AtomicBool,
+
+ /// The message.
+ msg: UnsafeCell<Option<T>>,
+}
+
+impl<T> Packet<T> {
+ /// Creates an empty packet on the stack.
+ fn empty_on_stack() -> Packet<T> {
+ Packet {
+ on_stack: true,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(None),
+ }
+ }
+
+ /// Creates an empty packet on the heap.
+ fn empty_on_heap() -> Box<Packet<T>> {
+ Box::new(Packet {
+ on_stack: false,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(None),
+ })
+ }
+
+ /// Creates a packet on the stack, containing a message.
+ fn message_on_stack(msg: T) -> Packet<T> {
+ Packet {
+ on_stack: true,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(Some(msg)),
+ }
+ }
+
+ /// Waits until the packet becomes ready for reading or writing.
+ fn wait_ready(&self) {
+ let backoff = Backoff::new();
+ while !self.ready.load(Ordering::Acquire) {
+ backoff.snooze();
+ }
+ }
+}
+
+/// Inner representation of a zero-capacity channel.
+struct Inner {
+ /// Senders waiting to pair up with a receive operation.
+ senders: Waker,
+
+ /// Receivers waiting to pair up with a send operation.
+ receivers: Waker,
+
+ /// Equals `true` when the channel is disconnected.
+ is_disconnected: bool,
+}
+
+/// Zero-capacity channel.
+pub struct Channel<T> {
+ /// Inner representation of the channel.
+ inner: Spinlock<Inner>,
+
+ /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Constructs a new zero-capacity channel.
+ pub fn new() -> Self {
+ Channel {
+ inner: Spinlock::new(Inner {
+ senders: Waker::new(),
+ receivers: Waker::new(),
+ is_disconnected: false,
+ }),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Returns a receiver handle to the channel.
+ pub fn receiver(&self) -> Receiver<'_, T> {
+ Receiver(self)
+ }
+
+ /// Returns a sender handle to the channel.
+ pub fn sender(&self) -> Sender<'_, T> {
+ Sender(self)
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero = operation.packet;
+ true
+ } else if inner.is_disconnected {
+ token.zero = 0;
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Writes a message into the packet.
+ pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero == 0 {
+ return Err(msg);
+ }
+
+ let packet = &*(token.zero as *const Packet<T>);
+ packet.msg.get().write(Some(msg));
+ packet.ready.store(true, Ordering::Release);
+ Ok(())
+ }
+
+ /// Attempts to pair up with a sender.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero = operation.packet;
+ true
+ } else if inner.is_disconnected {
+ token.zero = 0;
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Reads a message from the packet.
+ pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero == 0 {
+ return Err(());
+ }
+
+ let packet = &*(token.zero as *const Packet<T>);
+
+ if packet.on_stack {
+ // The message has been in the packet from the beginning, so there is no need to wait
+ // for it. However, after reading the message, we need to set `ready` to `true` in
+ // order to signal that the packet can be destroyed.
+ let msg = packet.msg.get().replace(None).unwrap();
+ packet.ready.store(true, Ordering::Release);
+ Ok(msg)
+ } else {
+ // Wait until the message becomes available, then read it and destroy the
+ // heap-allocated packet.
+ packet.wait_ready();
+ let msg = packet.msg.get().replace(None).unwrap();
+ drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
+ Ok(msg)
+ }
+ }
+
+ /// Attempts to send a message into the channel.
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ Ok(())
+ } else if inner.is_disconnected {
+ Err(TrySendError::Disconnected(msg))
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ return Ok(());
+ }
+
+ if inner.is_disconnected {
+ return Err(SendTimeoutError::Disconnected(msg));
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ let packet = Packet::<T>::message_on_stack(msg);
+ inner
+ .senders
+ .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ inner.receivers.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Timeout(msg))
+ }
+ Selected::Disconnected => {
+ self.inner.lock().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Disconnected(msg))
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is read, then drop the packet.
+ packet.wait_ready();
+ Ok(())
+ }
+ }
+ })
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero = operation.packet;
+ drop(inner);
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else if inner.is_disconnected {
+ Err(TryRecvError::Disconnected)
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero = operation.packet;
+ drop(inner);
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if inner.is_disconnected {
+ return Err(RecvTimeoutError::Disconnected);
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ let packet = Packet::<T>::empty_on_stack();
+ inner
+ .receivers
+ .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ inner.senders.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Timeout)
+ }
+ Selected::Disconnected => {
+ self.inner.lock().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Disconnected)
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is provided, then read it.
+ packet.wait_ready();
+ unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
+ }
+ }
+ })
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub fn disconnect(&self) -> bool {
+ let mut inner = self.inner.lock();
+
+ if !inner.is_disconnected {
+ inner.is_disconnected = true;
+ inner.senders.disconnect();
+ inner.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub fn len(&self) -> usize {
+ 0
+ }
+
+ /// Returns the capacity of the channel.
+ pub fn capacity(&self) -> Option<usize> {
+ Some(0)
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub fn is_empty(&self) -> bool {
+ true
+ }
+
+ /// Returns `true` if the channel is full.
+ pub fn is_full(&self) -> bool {
+ true
+ }
+}
+
+/// Receiver handle to a channel.
+pub struct Receiver<'a, T>(&'a Channel<T>);
+
+/// Sender handle to a channel.
+pub struct Sender<'a, T>(&'a Channel<T>);
+
+impl<T> SelectHandle for Receiver<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_recv(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ let packet = Box::into_raw(Packet::<T>::empty_on_heap());
+
+ let mut inner = self.0.inner.lock();
+ inner
+ .receivers
+ .register_with_packet(oper, packet as usize, cx);
+ inner.senders.notify();
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn unregister(&self, oper: Operation) {
+ if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
+ unsafe {
+ drop(Box::from_raw(operation.packet as *mut Packet<T>));
+ }
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ token.zero = cx.wait_packet();
+ true
+ }
+
+ fn is_ready(&self) -> bool {
+ let inner = self.0.inner.lock();
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ let mut inner = self.0.inner.lock();
+ inner.receivers.watch(oper, cx);
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ let mut inner = self.0.inner.lock();
+ inner.receivers.unwatch(oper);
+ }
+}
+
+impl<T> SelectHandle for Sender<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_send(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ let packet = Box::into_raw(Packet::<T>::empty_on_heap());
+
+ let mut inner = self.0.inner.lock();
+ inner
+ .senders
+ .register_with_packet(oper, packet as usize, cx);
+ inner.receivers.notify();
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn unregister(&self, oper: Operation) {
+ if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
+ unsafe {
+ drop(Box::from_raw(operation.packet as *mut Packet<T>));
+ }
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ token.zero = cx.wait_packet();
+ true
+ }
+
+ fn is_ready(&self) -> bool {
+ let inner = self.0.inner.lock();
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ let mut inner = self.0.inner.lock();
+ inner.senders.watch(oper, cx);
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ let mut inner = self.0.inner.lock();
+ inner.senders.unwatch(oper);
+ }
+}