diff options
Diffstat (limited to 'src/time/driver/mod.rs')
-rw-r--r-- | src/time/driver/mod.rs | 82 |
1 files changed, 49 insertions, 33 deletions
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 554042f..8532c55 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + //! Time driver mod atomic_stack; @@ -9,15 +11,9 @@ pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; -mod registration; -pub(crate) use self::registration::Registration; - -mod stack; -use self::stack::Stack; - use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; use crate::park::{Park, Unpark}; -use crate::time::{wheel, Error}; +use crate::time::{error::Error, wheel}; use crate::time::{Clock, Duration, Instant}; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; @@ -26,12 +22,12 @@ use std::sync::Arc; use std::usize; use std::{cmp, fmt}; -/// Time implementation that drives [`Delay`][delay], [`Interval`][interval], and [`Timeout`][timeout]. +/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// /// A `Driver` instance tracks the state necessary for managing time and -/// notifying the [`Delay`][delay] instances once their deadlines are reached. +/// notifying the [`Sleep`][sleep] instances once their deadlines are reached. /// -/// It is expected that a single instance manages many individual [`Delay`][delay] +/// It is expected that a single instance manages many individual [`Sleep`][sleep] /// instances. The `Driver` implementation is thread-safe and, as such, is able /// to handle callers from across threads. /// @@ -42,9 +38,9 @@ use std::{cmp, fmt}; /// The driver has a resolution of one millisecond. Any unit of time that falls /// between milliseconds are rounded up to the next millisecond. /// -/// When an instance is dropped, any outstanding [`Delay`][delay] instance that has not +/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not /// elapsed will be notified with an error. At this point, calling `poll` on the -/// [`Delay`][delay] instance will result in panic. +/// [`Sleep`][sleep] instance will result in panic. /// /// # Implementation /// @@ -71,29 +67,32 @@ use std::{cmp, fmt}; /// * Level 5: 64 x ~12 day slots. /// /// When the timer processes entries at level zero, it will notify all the -/// `Delay` instances as their deadlines have been reached. For all higher +/// `Sleep` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf -/// [delay]: crate::time::Delay +/// [sleep]: crate::time::Sleep /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver<T> { +pub(crate) struct Driver<T: Park> { /// Shared state inner: Arc<Inner>, /// Timer wheel - wheel: wheel::Wheel<Stack>, + wheel: wheel::Wheel, /// Thread parker. The `Driver` park implementation delegates to this. park: T, /// Source of "now" instances clock: Clock, + + /// True if the driver is being shutdown + is_shutdown: bool, } /// Timer state shared between `Driver`, `Handle`, and `Registration`. @@ -135,12 +134,13 @@ where wheel: wheel::Wheel::new(), park, clock, + is_shutdown: false, } } /// Returns a handle to the timer. /// - /// The `Handle` is how `Delay` instances are created. The `Delay` instances + /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances /// can either be created directly or the `Handle` instance can be passed to /// `with_default`, setting the timer as the default timer for the execution /// context. @@ -159,9 +159,8 @@ where self.clock.now() - self.inner.start, crate::time::Round::Down, ); - let mut poll = wheel::Poll::new(now); - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(now) { let when = entry.when_internal().expect("invalid internal entry state"); // Fire the entry @@ -189,7 +188,7 @@ where self.clear_entry(&entry); } (None, Some(when)) => { - // Queue the entry + // Add the entry to the timer wheel self.add_entry(entry, when); } (Some(_), Some(next)) => { @@ -201,19 +200,17 @@ where } fn clear_entry(&mut self, entry: &Arc<Entry>) { - self.wheel.remove(entry, &mut ()); + self.wheel.remove(entry); entry.set_when_internal(None); } /// Fires the entry if it needs to, otherwise queue it to be processed later. - /// - /// Returns `None` if the entry was fired. fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { - use crate::time::wheel::InsertError; + use crate::time::error::InsertError; entry.set_when_internal(Some(when)); - match self.wheel.insert(when, entry, &mut ()) { + match self.wheel.insert(when, entry) { Ok(_) => {} Err((entry, InsertError::Elapsed)) => { // The entry's deadline has elapsed, so fire it and update the @@ -225,7 +222,7 @@ where // The entry's deadline is invalid, so error it and update the // internal state accordingly. entry.set_when_internal(None); - entry.error(); + entry.error(Error::invalid()); } } } @@ -303,10 +300,12 @@ where Ok(()) } -} -impl<T> Drop for Driver<T> { - fn drop(&mut self) { + fn shutdown(&mut self) { + if self.is_shutdown { + return; + } + use std::u64; // Shutdown the stack of entries to process, preventing any new entries @@ -314,11 +313,24 @@ impl<T> Drop for Driver<T> { self.inner.process.shutdown(); // Clear the wheel, using u64::MAX allows us to drain everything - let mut poll = wheel::Poll::new(u64::MAX); + let end_of_time = u64::MAX; - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { - entry.error(); + while let Some(entry) = self.wheel.poll(end_of_time) { + entry.error(Error::shutdown()); } + + self.park.shutdown(); + + self.is_shutdown = true; + } +} + +impl<T> Drop for Driver<T> +where + T: Park, +{ + fn drop(&mut self) { + self.shutdown(); } } @@ -368,6 +380,10 @@ impl Inner { debug_assert!(prev <= MAX_TIMEOUTS); } + /// add the entry to the "process queue". entries are not immediately + /// pushed into the timer wheel but are instead pushed into the + /// process queue and then moved from the process queue into the timer + /// wheel on next `process` fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { if self.process.push(entry)? { // The timer is notified so that it can process the timeout |