diff options
author | Haibo Huang <hhb@google.com> | 2021-01-14 17:23:22 -0800 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2021-01-15 20:44:08 +0100 |
commit | 290fc4903cd00fc31d93e0ecd49c402e6833c569 (patch) | |
tree | 4a9646d2ab712bae1ead875992160c7248588daf /src/time/driver/mod.rs | |
parent | 84cad6596f48e471881980dcba7df9cb5b4b0139 (diff) | |
download | tokio-290fc4903cd00fc31d93e0ecd49c402e6833c569.tar.gz |
Upgrade rust/crates/tokio to 1.0.2platform-tools-31.0.0
Test: make
Change-Id: Ic48ff709bade266749eac8c146856901ce78da7f
Diffstat (limited to 'src/time/driver/mod.rs')
-rw-r--r-- | src/time/driver/mod.rs | 496 |
1 files changed, 266 insertions, 230 deletions
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 8532c55..9fbc0b3 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -1,26 +1,29 @@ +// Currently, rust warns when an unsafe fn contains an unsafe {} block. However, +// in the future, this will change to the reverse. For now, suppress this +// warning and generally stick with being explicit about unsafety. +#![allow(unused_unsafe)] #![cfg_attr(not(feature = "rt"), allow(dead_code))] //! Time driver -mod atomic_stack; -use self::atomic_stack::AtomicStack; - mod entry; -pub(super) use self::entry::Entry; +pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; mod handle; pub(crate) use self::handle::Handle; -use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; +mod wheel; + +pub(super) mod sleep; + +use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; -use crate::time::{error::Error, wheel}; +use crate::time::error::Error; use crate::time::{Clock, Duration, Instant}; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; - -use std::sync::Arc; -use std::usize; -use std::{cmp, fmt}; +use std::convert::TryInto; +use std::fmt; +use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// @@ -78,63 +81,96 @@ use std::{cmp, fmt}; /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver<T: Park> { +pub(crate) struct Driver<P: Park + 'static> { + /// Timing backend in use + time_source: ClockTime, + /// Shared state - inner: Arc<Inner>, + inner: Handle, - /// Timer wheel - wheel: wheel::Wheel, + /// Parker to delegate to + park: P, +} + +/// A structure which handles conversion from Instants to u64 timestamps. +#[derive(Debug, Clone)] +pub(self) struct ClockTime { + clock: super::clock::Clock, + start_time: Instant, +} - /// Thread parker. The `Driver` park implementation delegates to this. - park: T, +impl ClockTime { + pub(self) fn new(clock: Clock) -> Self { + Self { + clock, + start_time: super::clock::now(), + } + } - /// Source of "now" instances - clock: Clock, + pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { + // Round up to the end of a ms + self.instant_to_tick(t + Duration::from_nanos(999_999)) + } - /// True if the driver is being shutdown - is_shutdown: bool, + pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { + // round up + let dur: Duration = t + .checked_duration_since(self.start_time) + .unwrap_or_else(|| Duration::from_secs(0)); + let ms = dur.as_millis(); + + ms.try_into().expect("Duration too far into the future") + } + + pub(self) fn tick_to_duration(&self, t: u64) -> Duration { + Duration::from_millis(t) + } + + pub(self) fn now(&self) -> u64 { + self.instant_to_tick(self.clock.now()) + } } /// Timer state shared between `Driver`, `Handle`, and `Registration`. -pub(crate) struct Inner { - /// The instant at which the timer started running. - start: Instant, +pub(self) struct Inner { + /// Timing backend in use + time_source: ClockTime, /// The last published timer `elapsed` value. - elapsed: AtomicU64, + elapsed: u64, - /// Number of active timeouts - num: AtomicUsize, + /// The earliest time at which we promise to wake up without unparking + next_wake: Option<NonZeroU64>, - /// Head of the "process" linked list. - process: AtomicStack, + /// Timer wheel + wheel: wheel::Wheel, + + /// True if the driver is being shutdown + is_shutdown: bool, - /// Unparks the timer thread. + /// Unparker that can be used to wake the time driver unpark: Box<dyn Unpark>, } -/// Maximum number of timeouts the system can handle concurrently. -const MAX_TIMEOUTS: usize = usize::MAX >> 1; - // ===== impl Driver ===== -impl<T> Driver<T> +impl<P> Driver<P> where - T: Park, + P: Park + 'static, { /// Creates a new `Driver` instance that uses `park` to block the current - /// thread and `clock` to get the current `Instant`. + /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: T, clock: Clock) -> Driver<T> { - let unpark = Box::new(park.unpark()); + pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { + let time_source = ClockTime::new(clock); + + let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); Driver { - inner: Arc::new(Inner::new(clock.now(), unpark)), - wheel: wheel::Wheel::new(), + time_source, + inner: Handle::new(Arc::new(Mutex::new(inner))), park, - clock, - is_shutdown: false, } } @@ -145,189 +181,242 @@ where /// `with_default`, setting the timer as the default timer for the execution /// context. pub(crate) fn handle(&self) -> Handle { - Handle::new(Arc::downgrade(&self.inner)) + self.inner.clone() } - /// Converts an `Expiration` to an `Instant`. - fn expiration_instant(&self, when: u64) -> Instant { - self.inner.start + Duration::from_millis(when) - } + fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { + let clock = &self.time_source.clock; - /// Runs timer related logic - fn process(&mut self) { - let now = crate::time::ms( - self.clock.now() - self.inner.start, - crate::time::Round::Down, - ); + let mut lock = self.inner.lock(); - while let Some(entry) = self.wheel.poll(now) { - let when = entry.when_internal().expect("invalid internal entry state"); + assert!(!lock.is_shutdown); - // Fire the entry - entry.fire(when); + let next_wake = lock.wheel.next_expiration_time(); + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - // Track that the entry has been fired - entry.set_when_internal(None); - } + drop(lock); - // Update the elapsed cache - self.inner.elapsed.store(self.wheel.elapsed(), SeqCst); - } + match next_wake { + Some(when) => { + let now = self.time_source.now(); + // Note that we effectively round up to 1ms here - this avoids + // very short-duration microsecond-resolution sleeps that the OS + // might treat as zero-length. + let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); + + if duration > Duration::from_millis(0) { + if let Some(limit) = limit { + duration = std::cmp::min(limit, duration); + } - /// Processes the entry queue - /// - /// This handles adding and canceling timeouts. - fn process_queue(&mut self) { - for entry in self.inner.process.take() { - match (entry.when_internal(), entry.load_state()) { - (None, None) => { - // Nothing to do - } - (Some(_), None) => { - // Remove the entry - self.clear_entry(&entry); - } - (None, Some(when)) => { - // Add the entry to the timer wheel - self.add_entry(entry, when); + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // Simulate advancing time + clock.advance(duration); + } else { + self.park.park_timeout(duration)?; + } + } else { + self.park.park_timeout(Duration::from_secs(0))?; } - (Some(_), Some(next)) => { - self.clear_entry(&entry); - self.add_entry(entry, next); + } + None => { + if let Some(duration) = limit { + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + clock.advance(duration); + } else { + self.park.park_timeout(duration)?; + } + } else { + self.park.park()?; } } } - } - - fn clear_entry(&mut self, entry: &Arc<Entry>) { - self.wheel.remove(entry); - entry.set_when_internal(None); - } - - /// Fires the entry if it needs to, otherwise queue it to be processed later. - fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { - use crate::time::error::InsertError; - entry.set_when_internal(Some(when)); + // Process pending timers after waking up + self.inner.process(); - match self.wheel.insert(when, entry) { - Ok(_) => {} - Err((entry, InsertError::Elapsed)) => { - // The entry's deadline has elapsed, so fire it and update the - // internal state accordingly. - entry.set_when_internal(None); - entry.fire(when); - } - Err((entry, InsertError::Invalid)) => { - // The entry's deadline is invalid, so error it and update the - // internal state accordingly. - entry.set_when_internal(None); - entry.error(Error::invalid()); - } - } + Ok(()) } } -impl<T> Park for Driver<T> -where - T: Park, -{ - type Unpark = T::Unpark; - type Error = T::Error; +impl Handle { + /// Runs timer related logic, and returns the next wakeup time + pub(self) fn process(&self) { + let now = self.time_source().now(); - fn unpark(&self) -> Self::Unpark { - self.park.unpark() + self.process_at_time(now) } - fn park(&mut self) -> Result<(), Self::Error> { - self.process_queue(); + pub(self) fn process_at_time(&self, now: u64) { + let mut waker_list: [Option<Waker>; 32] = Default::default(); + let mut waker_idx = 0; - match self.wheel.poll_at() { - Some(when) => { - let now = self.clock.now(); - let deadline = self.expiration_instant(when); + let mut lock = self.lock(); - if deadline > now { - let dur = deadline - now; + assert!(now >= lock.elapsed); - if self.clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - self.clock.advance(dur); - } else { - self.park.park_timeout(dur)?; + while let Some(entry) = lock.wheel.poll(now) { + debug_assert!(unsafe { entry.is_pending() }); + + // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. + if let Some(waker) = unsafe { entry.fire(Ok(())) } { + waker_list[waker_idx] = Some(waker); + + waker_idx += 1; + + if waker_idx == waker_list.len() { + // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. + drop(lock); + + for waker in waker_list.iter_mut() { + waker.take().unwrap().wake(); } - } else { - self.park.park_timeout(Duration::from_secs(0))?; + + waker_idx = 0; + + lock = self.lock(); } } - None => { - self.park.park()?; - } } - self.process(); + // Update the elapsed cache + lock.elapsed = lock.wheel.elapsed(); + lock.next_wake = lock + .wheel + .poll_at() + .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - Ok(()) + drop(lock); + + for waker in waker_list[0..waker_idx].iter_mut() { + waker.take().unwrap().wake(); + } } - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.process_queue(); + /// Removes a registered timer from the driver. + /// + /// The timer will be moved to the cancelled state. Wakers will _not_ be + /// invoked. If the timer is already completed, this function is a no-op. + /// + /// This function always acquires the driver lock, even if the entry does + /// not appear to be registered. + /// + /// SAFETY: The timer must not be registered with some other driver, and + /// `add_entry` must not be called concurrently. + pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { + unsafe { + let mut lock = self.lock(); + + if entry.as_ref().might_be_registered() { + lock.wheel.remove(entry); + } - match self.wheel.poll_at() { - Some(when) => { - let now = self.clock.now(); - let deadline = self.expiration_instant(when); + entry.as_ref().handle().fire(Ok(())); + } + } - if deadline > now { - let duration = cmp::min(deadline - now, duration); + /// Removes and re-adds an entry to the driver. + /// + /// SAFETY: The timer must be either unregistered, or registered with this + /// driver. No other threads are allowed to concurrently manipulate the + /// timer at all (the current thread should hold an exclusive reference to + /// the `TimerEntry`) + pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { + let waker = unsafe { + let mut lock = self.lock(); + + // We may have raced with a firing/deregistration, so check before + // deregistering. + if unsafe { entry.as_ref().might_be_registered() } { + lock.wheel.remove(entry); + } - if self.clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - self.clock.advance(duration); - } else { - self.park.park_timeout(duration)?; + // Now that we have exclusive control of this entry, mint a handle to reinsert it. + let entry = entry.as_ref().handle(); + + if lock.is_shutdown { + unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } + } else { + entry.set_expiration(new_tick); + + // Note: We don't have to worry about racing with some other resetting + // thread, because add_entry and reregister require exclusive control of + // the timer entry. + match unsafe { lock.wheel.insert(entry) } { + Ok(when) => { + if lock + .next_wake + .map(|next_wake| when < next_wake.get()) + .unwrap_or(true) + { + lock.unpark.unpark(); + } + + None } - } else { - self.park.park_timeout(Duration::from_secs(0))?; + Err((entry, super::error::InsertError::Elapsed)) => unsafe { + entry.fire(Ok(())) + }, } } - None => { - self.park.park_timeout(duration)?; - } + + // Must release lock before invoking waker to avoid the risk of deadlock. + }; + + // The timer was fired synchronously as a result of the reregistration. + // Wake the waker; this is needed because we might reset _after_ a poll, + // and otherwise the task won't be awoken to poll again. + if let Some(waker) = waker { + waker.wake(); } + } +} - self.process(); +impl<P> Park for Driver<P> +where + P: Park + 'static, +{ + type Unpark = P::Unpark; + type Error = P::Error; - Ok(()) + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park_internal(None) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park_internal(Some(duration)) } fn shutdown(&mut self) { - if self.is_shutdown { + let mut lock = self.inner.lock(); + + if lock.is_shutdown { return; } - use std::u64; + lock.is_shutdown = true; - // Shutdown the stack of entries to process, preventing any new entries - // from being pushed. - self.inner.process.shutdown(); + drop(lock); - // Clear the wheel, using u64::MAX allows us to drain everything - let end_of_time = u64::MAX; + // Advance time forward to the end of time. - while let Some(entry) = self.wheel.poll(end_of_time) { - entry.error(Error::shutdown()); - } + self.inner.process_at_time(u64::MAX); self.park.shutdown(); - - self.is_shutdown = true; } } -impl<T> Drop for Driver<T> +impl<P> Drop for Driver<P> where - T: Park, + P: Park + 'static, { fn drop(&mut self) { self.shutdown(); @@ -337,69 +426,16 @@ where // ===== impl Inner ===== impl Inner { - fn new(start: Instant, unpark: Box<dyn Unpark>) -> Inner { + pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { Inner { - num: AtomicUsize::new(0), - elapsed: AtomicU64::new(0), - process: AtomicStack::new(), - start, + time_source, + elapsed: 0, + next_wake: None, unpark, + wheel: wheel::Wheel::new(), + is_shutdown: false, } } - - fn elapsed(&self) -> u64 { - self.elapsed.load(SeqCst) - } - - #[cfg(all(test, loom))] - fn num(&self, ordering: std::sync::atomic::Ordering) -> usize { - self.num.load(ordering) - } - - /// Increments the number of active timeouts - fn increment(&self) -> Result<(), Error> { - let mut curr = self.num.load(Relaxed); - loop { - if curr == MAX_TIMEOUTS { - return Err(Error::at_capacity()); - } - - match self - .num - .compare_exchange_weak(curr, curr + 1, Release, Relaxed) - { - Ok(_) => return Ok(()), - Err(next) => curr = next, - } - } - } - - /// Decrements the number of active timeouts - fn decrement(&self) { - let prev = self.num.fetch_sub(1, Acquire); - debug_assert!(prev <= MAX_TIMEOUTS); - } - - /// add the entry to the "process queue". entries are not immediately - /// pushed into the timer wheel but are instead pushed into the - /// process queue and then moved from the process queue into the timer - /// wheel on next `process` - fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { - if self.process.push(entry)? { - // The timer is notified so that it can process the timeout - self.unpark.unpark(); - } - - Ok(()) - } - - fn normalize_deadline(&self, deadline: Instant) -> u64 { - if deadline < self.start { - return 0; - } - - crate::time::ms(deadline - self.start, crate::time::Round::Up) - } } impl fmt::Debug for Inner { @@ -408,5 +444,5 @@ impl fmt::Debug for Inner { } } -#[cfg(all(test, loom))] +#[cfg(test)] mod tests; |