aboutsummaryrefslogtreecommitdiff
path: root/src/time/driver/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/time/driver/mod.rs')
-rw-r--r--src/time/driver/mod.rs82
1 files changed, 49 insertions, 33 deletions
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index 554042f..8532c55 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
//! Time driver
mod atomic_stack;
@@ -9,15 +11,9 @@ pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
-mod registration;
-pub(crate) use self::registration::Registration;
-
-mod stack;
-use self::stack::Stack;
-
use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::park::{Park, Unpark};
-use crate::time::{wheel, Error};
+use crate::time::{error::Error, wheel};
use crate::time::{Clock, Duration, Instant};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
@@ -26,12 +22,12 @@ use std::sync::Arc;
use std::usize;
use std::{cmp, fmt};
-/// Time implementation that drives [`Delay`][delay], [`Interval`][interval], and [`Timeout`][timeout].
+/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
///
/// A `Driver` instance tracks the state necessary for managing time and
-/// notifying the [`Delay`][delay] instances once their deadlines are reached.
+/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
///
-/// It is expected that a single instance manages many individual [`Delay`][delay]
+/// It is expected that a single instance manages many individual [`Sleep`][sleep]
/// instances. The `Driver` implementation is thread-safe and, as such, is able
/// to handle callers from across threads.
///
@@ -42,9 +38,9 @@ use std::{cmp, fmt};
/// The driver has a resolution of one millisecond. Any unit of time that falls
/// between milliseconds are rounded up to the next millisecond.
///
-/// When an instance is dropped, any outstanding [`Delay`][delay] instance that has not
+/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
/// elapsed will be notified with an error. At this point, calling `poll` on the
-/// [`Delay`][delay] instance will result in panic.
+/// [`Sleep`][sleep] instance will result in panic.
///
/// # Implementation
///
@@ -71,29 +67,32 @@ use std::{cmp, fmt};
/// * Level 5: 64 x ~12 day slots.
///
/// When the timer processes entries at level zero, it will notify all the
-/// `Delay` instances as their deadlines have been reached. For all higher
+/// `Sleep` instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
-/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
+/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
-/// [delay]: crate::time::Delay
+/// [sleep]: crate::time::Sleep
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
-pub(crate) struct Driver<T> {
+pub(crate) struct Driver<T: Park> {
/// Shared state
inner: Arc<Inner>,
/// Timer wheel
- wheel: wheel::Wheel<Stack>,
+ wheel: wheel::Wheel,
/// Thread parker. The `Driver` park implementation delegates to this.
park: T,
/// Source of "now" instances
clock: Clock,
+
+ /// True if the driver is being shutdown
+ is_shutdown: bool,
}
/// Timer state shared between `Driver`, `Handle`, and `Registration`.
@@ -135,12 +134,13 @@ where
wheel: wheel::Wheel::new(),
park,
clock,
+ is_shutdown: false,
}
}
/// Returns a handle to the timer.
///
- /// The `Handle` is how `Delay` instances are created. The `Delay` instances
+ /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances
/// can either be created directly or the `Handle` instance can be passed to
/// `with_default`, setting the timer as the default timer for the execution
/// context.
@@ -159,9 +159,8 @@ where
self.clock.now() - self.inner.start,
crate::time::Round::Down,
);
- let mut poll = wheel::Poll::new(now);
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(now) {
let when = entry.when_internal().expect("invalid internal entry state");
// Fire the entry
@@ -189,7 +188,7 @@ where
self.clear_entry(&entry);
}
(None, Some(when)) => {
- // Queue the entry
+ // Add the entry to the timer wheel
self.add_entry(entry, when);
}
(Some(_), Some(next)) => {
@@ -201,19 +200,17 @@ where
}
fn clear_entry(&mut self, entry: &Arc<Entry>) {
- self.wheel.remove(entry, &mut ());
+ self.wheel.remove(entry);
entry.set_when_internal(None);
}
/// Fires the entry if it needs to, otherwise queue it to be processed later.
- ///
- /// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
- use crate::time::wheel::InsertError;
+ use crate::time::error::InsertError;
entry.set_when_internal(Some(when));
- match self.wheel.insert(when, entry, &mut ()) {
+ match self.wheel.insert(when, entry) {
Ok(_) => {}
Err((entry, InsertError::Elapsed)) => {
// The entry's deadline has elapsed, so fire it and update the
@@ -225,7 +222,7 @@ where
// The entry's deadline is invalid, so error it and update the
// internal state accordingly.
entry.set_when_internal(None);
- entry.error();
+ entry.error(Error::invalid());
}
}
}
@@ -303,10 +300,12 @@ where
Ok(())
}
-}
-impl<T> Drop for Driver<T> {
- fn drop(&mut self) {
+ fn shutdown(&mut self) {
+ if self.is_shutdown {
+ return;
+ }
+
use std::u64;
// Shutdown the stack of entries to process, preventing any new entries
@@ -314,11 +313,24 @@ impl<T> Drop for Driver<T> {
self.inner.process.shutdown();
// Clear the wheel, using u64::MAX allows us to drain everything
- let mut poll = wheel::Poll::new(u64::MAX);
+ let end_of_time = u64::MAX;
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
- entry.error();
+ while let Some(entry) = self.wheel.poll(end_of_time) {
+ entry.error(Error::shutdown());
}
+
+ self.park.shutdown();
+
+ self.is_shutdown = true;
+ }
+}
+
+impl<T> Drop for Driver<T>
+where
+ T: Park,
+{
+ fn drop(&mut self) {
+ self.shutdown();
}
}
@@ -368,6 +380,10 @@ impl Inner {
debug_assert!(prev <= MAX_TIMEOUTS);
}
+ /// add the entry to the "process queue". entries are not immediately
+ /// pushed into the timer wheel but are instead pushed into the
+ /// process queue and then moved from the process queue into the timer
+ /// wheel on next `process`
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout