aboutsummaryrefslogtreecommitdiff
path: root/src/time/driver
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-03-05 09:35:44 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-03-05 09:35:44 +0000
commite29ba45f952a1c193d6091f8aead991e88882126 (patch)
tree16d053e70d21e456d52f4a7762ee41441342b7a2 /src/time/driver
parent925d648e545e70d6a4faae3d7efe5e0de885f922 (diff)
parente3d8d80d2d8744ccdcd175323e0864c8f30fcedc (diff)
downloadtokio-e29ba45f952a1c193d6091f8aead991e88882126.tar.gz
Upgrade rust/crates/tokio to 1.2.0 am: e3d8d80d2d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1582273 MUST ONLY BE SUBMITTED BY AUTOMERGER Change-Id: I2d2cb9aba7b17be665909defd7673c645b22f2ab
Diffstat (limited to 'src/time/driver')
-rw-r--r--src/time/driver/entry.rs76
-rw-r--r--src/time/driver/handle.rs5
-rw-r--r--src/time/driver/mod.rs2
-rw-r--r--src/time/driver/sleep.rs109
-rw-r--r--src/time/driver/tests/mod.rs12
-rw-r--r--src/time/driver/wheel/level.rs4
-rw-r--r--src/time/driver/wheel/mod.rs17
7 files changed, 132 insertions, 93 deletions
diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs
index bcad988..11366d2 100644
--- a/src/time/driver/entry.rs
+++ b/src/time/driver/entry.rs
@@ -53,6 +53,7 @@
//! refuse to mark the timer as pending.
use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;
use crate::sync::AtomicWaker;
@@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value();
const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
-/// Not all platforms support 64-bit compare-and-swap. This hack replaces the
-/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow,
-/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now.
-///
-/// Note: We use "x86 or 64-bit pointers" as the condition here because
-/// target_has_atomic is not stable.
-#[cfg(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-))]
-type AtomicU64 = crate::loom::sync::atomic::AtomicU64;
-
-#[cfg(not(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-)))]
-#[derive(Debug)]
-struct AtomicU64 {
- inner: crate::loom::sync::Mutex<u64>,
-}
-
-#[cfg(not(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-)))]
-impl AtomicU64 {
- fn new(v: u64) -> Self {
- Self {
- inner: crate::loom::sync::Mutex::new(v),
- }
- }
-
- fn load(&self, _order: Ordering) -> u64 {
- debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
- *self.inner.lock()
- }
-
- fn store(&self, v: u64, _order: Ordering) {
- debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
- *self.inner.lock() = v;
- }
-
- fn compare_exchange(
- &self,
- current: u64,
- new: u64,
- _success: Ordering,
- _failure: Ordering,
- ) -> Result<u64, u64> {
- debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock
- debug_assert_ne!(_failure, Ordering::SeqCst);
-
- let mut lock = self.inner.lock();
-
- if *lock == current {
- *lock = new;
- Ok(current)
- } else {
- Err(*lock)
- }
- }
-
- fn compare_exchange_weak(
- &self,
- current: u64,
- new: u64,
- success: Ordering,
- failure: Ordering,
- ) -> Result<u64, u64> {
- self.compare_exchange(current, new, success, failure)
- }
-}
-
/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
/// well as the registered waker.
@@ -300,7 +228,7 @@ impl StateCell {
/// expiration time.
///
/// While this function is memory-safe, it should only be called from a
- /// context holding both `&mut TimerEntry` and the driver lock.
+ /// context holding both `&mut TimerEntry` and the driver lock.
fn set_expiration(&self, timestamp: u64) {
debug_assert!(timestamp < STATE_MIN_VALUE);
diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs
index bfc49fb..e934b56 100644
--- a/src/time/driver/handle.rs
+++ b/src/time/driver/handle.rs
@@ -47,7 +47,7 @@ cfg_rt! {
/// panicking.
pub(crate) fn current() -> Self {
crate::runtime::context::time_handle()
- .expect("there is no timer running, must be called from the context of Tokio runtime")
+ .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}
@@ -71,8 +71,7 @@ cfg_not_rt! {
/// lazy, and so outside executed inside the runtime successfuly without
/// panicking.
pub(crate) fn current() -> Self {
- panic!("there is no timer running, must be called from the context of Tokio runtime or \
- `rt` is not enabled")
+ panic!(crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index 9fbc0b3..615307e 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -102,8 +102,8 @@ pub(self) struct ClockTime {
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
+ start_time: clock.now(),
clock,
- start_time: super::clock::now(),
}
}
diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs
index 69a6e6d..2438f14 100644
--- a/src/time/driver/sleep.rs
+++ b/src/time/driver/sleep.rs
@@ -58,8 +58,93 @@ pub fn sleep(duration: Duration) -> Sleep {
}
pin_project! {
- /// Future returned by [`sleep`](sleep) and
- /// [`sleep_until`](sleep_until).
+ /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
+ ///
+ /// This type does not implement the `Unpin` trait, which means that if you
+ /// use it with [`select!`] or by calling `poll`, you have to pin it first.
+ /// If you use it with `.await`, this does not apply.
+ ///
+ /// # Examples
+ ///
+ /// Wait 100ms and print "100 ms have elapsed".
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// sleep(Duration::from_millis(100)).await;
+ /// println!("100 ms have elapsed");
+ /// }
+ /// ```
+ ///
+ /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
+ /// necessary when the same `Sleep` is selected on multiple times.
+ /// ```no_run
+ /// use tokio::time::{self, Duration, Instant};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let sleep = time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// loop {
+ /// tokio::select! {
+ /// () = &mut sleep => {
+ /// println!("timer elapsed");
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
+ /// },
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
+ /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ ///
+ /// struct HasSleep {
+ /// sleep: Pin<Box<Sleep>>,
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.sleep.as_mut().poll(cx)
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with pin projection. This method avoids the `Box`, but
+ /// the `HasSleep` struct will not be `Unpin` as a consequence.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ /// use pin_project_lite::pin_project;
+ ///
+ /// pin_project! {
+ /// struct HasSleep {
+ /// #[pin]
+ /// sleep: Sleep,
+ /// }
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.project().sleep.poll(cx)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`select!`]: ../macro.select.html
+ /// [`tokio::pin!`]: ../macro.pin.html
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
@@ -98,6 +183,26 @@ impl Sleep {
///
/// This function can be called both before and after the future has
/// completed.
+ ///
+ /// To call this method, you will usually combine the call with
+ /// [`Pin::as_mut`], which lets you call the method with consuming the
+ /// `Sleep` itself.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant};
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let sleep = tokio::time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
+ /// # }
+ /// ```
+ ///
+ /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
diff --git a/src/time/driver/tests/mod.rs b/src/time/driver/tests/mod.rs
index cfefed3..8ae4a84 100644
--- a/src/time/driver/tests/mod.rs
+++ b/src/time/driver/tests/mod.rs
@@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
#[test]
fn single_timer() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -72,7 +72,7 @@ fn single_timer() {
#[test]
fn drop_timer() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -103,7 +103,7 @@ fn drop_timer() {
#[test]
fn change_waker() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -138,7 +138,7 @@ fn reset_future() {
model(|| {
let finished_early = Arc::new(AtomicBool::new(false));
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -185,7 +185,7 @@ fn reset_future() {
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
clock.pause();
let time_source = super::ClockTime::new(clock.clone());
@@ -226,7 +226,7 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref());
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
clock.pause();
let time_source = super::ClockTime::new(clock.clone());
diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs
index 58280b1..81d6b58 100644
--- a/src/time/driver/wheel/level.rs
+++ b/src/time/driver/wheel/level.rs
@@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}
-/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
#[test]
fn test_slot_for() {
- for pos in 1..64 {
+ for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}
@@ -274,4 +273,3 @@ mod test {
}
}
}
-*/
diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs
index 164cac4..24bf517 100644
--- a/src/time/driver/wheel/mod.rs
+++ b/src/time/driver/wheel/mod.rs
@@ -122,6 +122,13 @@ impl Wheel {
if when == u64::max_value() {
self.pending.remove(item);
} else {
+ debug_assert!(
+ self.elapsed <= when,
+ "elapsed={}; when={}",
+ self.elapsed,
+ when
+ );
+
let level = self.level_for(when);
self.levels[level].remove_entry(item);
@@ -281,15 +288,17 @@ impl Wheel {
}
fn level_for(elapsed: u64, when: u64) -> usize {
- let mut masked = elapsed ^ when;
+ const SLOT_MASK: u64 = (1 << 6) - 1;
+
+ // Mask in the trailing bits ignored by the level calculation in order to cap
+ // the possible leading zeros
+ let mut masked = elapsed ^ when | SLOT_MASK;
if masked >= MAX_DURATION {
// Fudge the timer into the top level
masked = MAX_DURATION - 1;
}
- assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
-
let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;
@@ -302,7 +311,7 @@ mod test {
#[test]
fn test_level_for() {
- for pos in 1..64 {
+ for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),