aboutsummaryrefslogtreecommitdiff
path: root/src/flavors/at.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/flavors/at.rs')
-rw-r--r--src/flavors/at.rs202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/flavors/at.rs b/src/flavors/at.rs
new file mode 100644
index 0000000..a2b1b57
--- /dev/null
+++ b/src/flavors/at.rs
@@ -0,0 +1,202 @@
+//! Channel that delivers a message at a certain moment in time.
+//!
+//! Messages cannot be sent into this kind of channel; they are materialized on demand.
+
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread;
+use std::time::{Duration, Instant};
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, TryRecvError};
+use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
+
+/// Result of a receive operation.
+pub type AtToken = Option<Instant>;
+
+/// Channel that delivers a message at a certain moment in time
+pub struct Channel {
+ /// The instant at which the message will be delivered.
+ delivery_time: Instant,
+
+ /// `true` if the message has been received.
+ received: AtomicBool,
+}
+
+impl Channel {
+ /// Creates a channel that delivers a message at a certain instant in time.
+ #[inline]
+ pub fn new_deadline(when: Instant) -> Self {
+ Channel {
+ delivery_time: when,
+ received: AtomicBool::new(false),
+ }
+ }
+ /// Creates a channel that delivers a message after a certain duration of time.
+ #[inline]
+ pub fn new_timeout(dur: Duration) -> Self {
+ Self::new_deadline(Instant::now() + dur)
+ }
+
+ /// Attempts to receive a message without blocking.
+ #[inline]
+ pub fn try_recv(&self) -> Result<Instant, TryRecvError> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ // The message has already been received.
+ return Err(TryRecvError::Empty);
+ }
+
+ if Instant::now() < self.delivery_time {
+ // The message was not delivered yet.
+ return Err(TryRecvError::Empty);
+ }
+
+ // Try receiving the message if it is still available.
+ if !self.received.swap(true, Ordering::SeqCst) {
+ // Success! Return delivery time as the message.
+ Ok(self.delivery_time)
+ } else {
+ // The message was already received.
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ #[inline]
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ // The message has already been received.
+ utils::sleep_until(deadline);
+ return Err(RecvTimeoutError::Timeout);
+ }
+
+ // Wait until the message is received or the deadline is reached.
+ loop {
+ let now = Instant::now();
+
+ let deadline = match deadline {
+ // Check if we can receive the next message.
+ _ if now >= self.delivery_time => break,
+ // Check if the timeout deadline has been reached.
+ Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
+
+ // Sleep until one of the above happens
+ Some(d) if d < self.delivery_time => d,
+ _ => self.delivery_time,
+ };
+
+ thread::sleep(deadline - now);
+ }
+
+ // Try receiving the message if it is still available.
+ if !self.received.swap(true, Ordering::SeqCst) {
+ // Success! Return the message, which is the instant at which it was delivered.
+ Ok(self.delivery_time)
+ } else {
+ // The message was already received. Block forever.
+ utils::sleep_until(None);
+ unreachable!()
+ }
+ }
+
+ /// Reads a message from the channel.
+ #[inline]
+ pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
+ token.at.ok_or(())
+ }
+
+ /// Returns `true` if the channel is empty.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ return true;
+ }
+
+ // If the delivery time hasn't been reached yet, the channel is empty.
+ if Instant::now() < self.delivery_time {
+ return true;
+ }
+
+ // The delivery time has been reached. The channel is empty only if the message has already
+ // been received.
+ self.received.load(Ordering::SeqCst)
+ }
+
+ /// Returns `true` if the channel is full.
+ #[inline]
+ pub fn is_full(&self) -> bool {
+ !self.is_empty()
+ }
+
+ /// Returns the number of messages in the channel.
+ #[inline]
+ pub fn len(&self) -> usize {
+ if self.is_empty() {
+ 0
+ } else {
+ 1
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ #[inline]
+ pub fn capacity(&self) -> Option<usize> {
+ Some(1)
+ }
+}
+
+impl SelectHandle for Channel {
+ #[inline]
+ fn try_select(&self, token: &mut Token) -> bool {
+ match self.try_recv() {
+ Ok(msg) => {
+ token.at = Some(msg);
+ true
+ }
+ Err(TryRecvError::Disconnected) => {
+ token.at = None;
+ true
+ }
+ Err(TryRecvError::Empty) => false,
+ }
+ }
+
+ #[inline]
+ fn deadline(&self) -> Option<Instant> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ None
+ } else {
+ Some(self.delivery_time)
+ }
+ }
+
+ #[inline]
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unregister(&self, _oper: Operation) {}
+
+ #[inline]
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ #[inline]
+ fn is_ready(&self) -> bool {
+ !self.is_empty()
+ }
+
+ #[inline]
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unwatch(&self, _oper: Operation) {}
+}