use crate::primitive::sync::atomic::AtomicUsize; use crate::primitive::sync::{Arc, Condvar, Mutex}; use core::sync::atomic::Ordering::SeqCst; use std::fmt; use std::marker::PhantomData; use std::time::{Duration, Instant}; /// A thread parking primitive. /// /// Conceptually, each `Parker` has an associated token which is initially not present: /// /// * The [`park`] method blocks the current thread unless or until the token is available, at /// which point it automatically consumes the token. /// /// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for /// a specified maximum time. /// /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call /// returning immediately. /// /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using /// [`park`] and [`unpark`]. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// /// // Make the token available. /// u.unpark(); /// // Wakes up immediately and consumes the token. /// p.park(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_millis(500)); /// u.unpark(); /// }); /// /// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout /// [`park_deadline`]: Parker::park_deadline /// [`unpark`]: Unparker::unpark pub struct Parker { unparker: Unparker, _marker: PhantomData<*const ()>, } unsafe impl Send for Parker {} impl Default for Parker { fn default() -> Self { Self { unparker: Unparker { inner: Arc::new(Inner { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new(), }), }, _marker: PhantomData, } } } impl Parker { /// Creates a new `Parker`. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// ``` /// pub fn new() -> Parker { Self::default() } /// Blocks the current thread until the token is made available. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// /// // Make the token available. /// u.unpark(); /// /// // Wakes up immediately and consumes the token. /// p.park(); /// ``` pub fn park(&self) { self.unparker.inner.park(None); } /// Blocks the current thread until the token is made available, but only for a limited time. /// /// # Examples /// /// ``` /// use std::time::Duration; /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// /// // Waits for the token to become available, but will not wait longer than 500 ms. /// p.park_timeout(Duration::from_millis(500)); /// ``` pub fn park_timeout(&self, timeout: Duration) { match Instant::now().checked_add(timeout) { Some(deadline) => self.park_deadline(deadline), None => self.park(), } } /// Blocks the current thread until the token is made available, or until a certain deadline. /// /// # Examples /// /// ``` /// use std::time::{Duration, Instant}; /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let deadline = Instant::now() + Duration::from_millis(500); /// /// // Waits for the token to become available, but will not wait longer than 500 ms. /// p.park_deadline(deadline); /// ``` pub fn park_deadline(&self, deadline: Instant) { self.unparker.inner.park(Some(deadline)) } /// Returns a reference to an associated [`Unparker`]. /// /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// /// // Make the token available. /// u.unpark(); /// // Wakes up immediately and consumes the token. /// p.park(); /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout pub fn unparker(&self) -> &Unparker { &self.unparker } /// Converts a `Parker` into a raw pointer. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let raw = Parker::into_raw(p); /// # let _ = unsafe { Parker::from_raw(raw) }; /// ``` pub fn into_raw(this: Parker) -> *const () { Unparker::into_raw(this.unparker) } /// Converts a raw pointer into a `Parker`. /// /// # Safety /// /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let raw = Parker::into_raw(p); /// let p = unsafe { Parker::from_raw(raw) }; /// ``` pub unsafe fn from_raw(ptr: *const ()) -> Parker { Parker { unparker: Unparker::from_raw(ptr), _marker: PhantomData, } } } impl fmt::Debug for Parker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Parker { .. }") } } /// Unparks a thread parked by the associated [`Parker`]. pub struct Unparker { inner: Arc, } unsafe impl Send for Unparker {} unsafe impl Sync for Unparker {} impl Unparker { /// Atomically makes the token available if it is not already. /// /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is /// any. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_utils::sync::Parker; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_millis(500)); /// u.unpark(); /// }); /// /// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout pub fn unpark(&self) { self.inner.unpark() } /// Converts an `Unparker` into a raw pointer. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::{Parker, Unparker}; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// let raw = Unparker::into_raw(u); /// # let _ = unsafe { Unparker::from_raw(raw) }; /// ``` pub fn into_raw(this: Unparker) -> *const () { Arc::into_raw(this.inner).cast::<()>() } /// Converts a raw pointer into an `Unparker`. /// /// # Safety /// /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. /// /// # Examples /// /// ``` /// use crossbeam_utils::sync::{Parker, Unparker}; /// /// let p = Parker::new(); /// let u = p.unparker().clone(); /// /// let raw = Unparker::into_raw(u); /// let u = unsafe { Unparker::from_raw(raw) }; /// ``` pub unsafe fn from_raw(ptr: *const ()) -> Unparker { Unparker { inner: Arc::from_raw(ptr.cast::()), } } } impl fmt::Debug for Unparker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Unparker { .. }") } } impl Clone for Unparker { fn clone(&self) -> Unparker { Unparker { inner: self.inner.clone(), } } } const EMPTY: usize = 0; const PARKED: usize = 1; const NOTIFIED: usize = 2; struct Inner { state: AtomicUsize, lock: Mutex<()>, cvar: Condvar, } impl Inner { fn park(&self, deadline: Option) { // If we were previously notified then we consume this notification and return quickly. if self .state .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) .is_ok() { return; } // If the timeout is zero, then there is no need to actually block. if let Some(deadline) = deadline { if deadline <= Instant::now() { return; } } // Otherwise we need to coordinate going to sleep. let mut m = self.lock.lock().unwrap(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} // Consume this notification to avoid spurious wakeups in the next park. Err(NOTIFIED) => { // We must read `state` here, even though we know it will be `NOTIFIED`. This is // because `unpark` may have been called again since we read `NOTIFIED` in the // `compare_exchange` above. We must perform an acquire operation that synchronizes // with that `unpark` to observe any writes it made before the call to `unpark`. To // do that we must read from the write it made to `state`. let old = self.state.swap(EMPTY, SeqCst); assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); return; } Err(n) => panic!("inconsistent park_timeout state: {}", n), } loop { // Block the current thread on the conditional variable. m = match deadline { None => self.cvar.wait(m).unwrap(), Some(deadline) => { let now = Instant::now(); if now < deadline { // We could check for a timeout here, in the return value of wait_timeout, // but in the case that a timeout and an unpark arrive simultaneously, we // prefer to report the former. self.cvar.wait_timeout(m, deadline - now).unwrap().0 } else { // We've timed out; swap out the state back to empty on our way out match self.state.swap(EMPTY, SeqCst) { NOTIFIED | PARKED => return, n => panic!("inconsistent park_timeout state: {}", n), }; } } }; if self .state .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) .is_ok() { // got a notification return; } // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught // in the branch above, when we discover the deadline is in the past } } pub(crate) fn unpark(&self) { // To ensure the unparked thread will observe any writes we made before this call, we must // perform a release operation that `park` can synchronize with. To do that we must write // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. match self.state.swap(NOTIFIED, SeqCst) { EMPTY => return, // no one was waiting NOTIFIED => return, // already unparked PARKED => {} // gotta go wake someone up _ => panic!("inconsistent state in unpark"), } // There is a period between when the parked thread sets `state` to `PARKED` (or last // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. // If we were to notify during this period it would be ignored and then when the parked // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this // stage so we can acquire `lock` to wait until it is ready to receive the notification. // // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes // it doesn't get woken only to have to wait for us to release `lock`. drop(self.lock.lock().unwrap()); self.cvar.notify_one(); } }