aboutsummaryrefslogtreecommitdiff
path: root/src/sync/parker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/parker.rs')
-rw-r--r--src/sync/parker.rs110
1 files changed, 61 insertions, 49 deletions
diff --git a/src/sync/parker.rs b/src/sync/parker.rs
index fc13d2e..aefa515 100644
--- a/src/sync/parker.rs
+++ b/src/sync/parker.rs
@@ -1,20 +1,19 @@
+use crate::primitive::sync::atomic::AtomicUsize;
+use crate::primitive::sync::{Arc, Condvar, Mutex};
+use core::sync::atomic::Ordering::SeqCst;
use std::fmt;
use std::marker::PhantomData;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::{Arc, Condvar, Mutex};
-use std::time::Duration;
+use std::time::{Duration, Instant};
/// A thread parking primitive.
///
/// Conceptually, each `Parker` has an associated token which is initially not present:
///
/// * The [`park`] method blocks the current thread unless or until the token is available, at
-/// which point it automatically consumes the token. It may also return *spuriously*, without
-/// consuming the token.
+/// which point it automatically consumes the token.
///
-/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
-/// time.
+/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
+/// a specified maximum time.
///
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
@@ -43,13 +42,13 @@ use std::time::Duration;
/// u.unpark();
/// });
///
-/// // Wakes up when `u.unpark()` provides the token, but may also wake up
-/// // spuriously before that without consuming the token.
+/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
+/// [`park_deadline`]: Parker::park_deadline
/// [`unpark`]: Unparker::unpark
pub struct Parker {
unparker: Unparker,
@@ -90,9 +89,6 @@ impl Parker {
/// Blocks the current thread until the token is made available.
///
- /// A call to `park` may wake up spuriously without consuming the token, and callers should be
- /// prepared for this possibility.
- ///
/// # Examples
///
/// ```
@@ -113,9 +109,6 @@ impl Parker {
/// Blocks the current thread until the token is made available, but only for a limited time.
///
- /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
- /// should be prepared for this possibility.
- ///
/// # Examples
///
/// ```
@@ -128,7 +121,25 @@ impl Parker {
/// p.park_timeout(Duration::from_millis(500));
/// ```
pub fn park_timeout(&self, timeout: Duration) {
- self.unparker.inner.park(Some(timeout));
+ self.park_deadline(Instant::now() + timeout)
+ }
+
+ /// Blocks the current thread until the token is made available, or until a certain deadline.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_deadline(deadline);
+ /// ```
+ pub fn park_deadline(&self, deadline: Instant) {
+ self.unparker.inner.park(Some(deadline))
}
/// Returns a reference to an associated [`Unparker`].
@@ -227,8 +238,7 @@ impl Unparker {
/// u.unpark();
/// });
///
- /// // Wakes up when `u.unpark()` provides the token, but may also wake up
- /// // spuriously before that without consuming the token.
+ /// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
@@ -302,7 +312,7 @@ struct Inner {
}
impl Inner {
- fn park(&self, timeout: Option<Duration>) {
+ fn park(&self, deadline: Option<Instant>) {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
@@ -313,8 +323,8 @@ impl Inner {
}
// If the timeout is zero, then there is no need to actually block.
- if let Some(ref dur) = timeout {
- if *dur == Duration::from_millis(0) {
+ if let Some(deadline) = deadline {
+ if deadline <= Instant::now() {
return;
}
}
@@ -338,40 +348,42 @@ impl Inner {
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
- match timeout {
- None => {
- loop {
- // Block the current thread on the conditional variable.
- m = self.cvar.wait(m).unwrap();
-
- if self
- .state
- .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
- .is_ok()
- {
- // got a notification
- return;
+ loop {
+ // Block the current thread on the conditional variable.
+ m = match deadline {
+ None => self.cvar.wait(m).unwrap(),
+ Some(deadline) => {
+ let now = Instant::now();
+ if now < deadline {
+ // We could check for a timeout here, in the return value of wait_timeout,
+ // but in the case that a timeout and an unpark arrive simultaneously, we
+ // prefer to report the former.
+ self.cvar.wait_timeout(m, deadline - now).unwrap().0
+ } else {
+ // We've timed out; swap out the state back to empty on our way out
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED | PARKED => return,
+ n => panic!("inconsistent park_timeout state: {}", n),
+ };
}
-
- // spurious wakeup, go back to sleep
}
- }
- Some(timeout) => {
- // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
- // notification we just want to unconditionally set `state` back to `EMPTY`, either
- // consuming a notification or un-flagging ourselves as parked.
- let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
+ };
- match self.state.swap(EMPTY, SeqCst) {
- NOTIFIED => {} // got a notification
- PARKED => {} // no notification
- n => panic!("inconsistent park_timeout state: {}", n),
- }
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
}
+
+ // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
+ // in the branch above, when we discover the deadline is in the past
}
}
- pub fn unpark(&self) {
+ pub(crate) fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather