diff options
author | Haibo Huang <hhb@google.com> | 2021-03-05 09:35:44 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-03-05 09:35:44 +0000 |
commit | e29ba45f952a1c193d6091f8aead991e88882126 (patch) | |
tree | 16d053e70d21e456d52f4a7762ee41441342b7a2 /src/time/driver | |
parent | 925d648e545e70d6a4faae3d7efe5e0de885f922 (diff) | |
parent | e3d8d80d2d8744ccdcd175323e0864c8f30fcedc (diff) | |
download | tokio-e29ba45f952a1c193d6091f8aead991e88882126.tar.gz |
Upgrade rust/crates/tokio to 1.2.0 am: e3d8d80d2d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1582273
MUST ONLY BE SUBMITTED BY AUTOMERGER
Change-Id: I2d2cb9aba7b17be665909defd7673c645b22f2ab
Diffstat (limited to 'src/time/driver')
-rw-r--r-- | src/time/driver/entry.rs | 76 | ||||
-rw-r--r-- | src/time/driver/handle.rs | 5 | ||||
-rw-r--r-- | src/time/driver/mod.rs | 2 | ||||
-rw-r--r-- | src/time/driver/sleep.rs | 109 | ||||
-rw-r--r-- | src/time/driver/tests/mod.rs | 12 | ||||
-rw-r--r-- | src/time/driver/wheel/level.rs | 4 | ||||
-rw-r--r-- | src/time/driver/wheel/mod.rs | 17 |
7 files changed, 132 insertions, 93 deletions
diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs index bcad988..11366d2 100644 --- a/src/time/driver/entry.rs +++ b/src/time/driver/entry.rs @@ -53,6 +53,7 @@ //! refuse to mark the timer as pending. use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; use crate::sync::AtomicWaker; @@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value(); const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; -/// Not all platforms support 64-bit compare-and-swap. This hack replaces the -/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow, -/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now. -/// -/// Note: We use "x86 or 64-bit pointers" as the condition here because -/// target_has_atomic is not stable. -#[cfg(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -))] -type AtomicU64 = crate::loom::sync::atomic::AtomicU64; - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -#[derive(Debug)] -struct AtomicU64 { - inner: crate::loom::sync::Mutex<u64>, -} - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -impl AtomicU64 { - fn new(v: u64) -> Self { - Self { - inner: crate::loom::sync::Mutex::new(v), - } - } - - fn load(&self, _order: Ordering) -> u64 { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() - } - - fn store(&self, v: u64, _order: Ordering) { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() = v; - } - - fn compare_exchange( - &self, - current: u64, - new: u64, - _success: Ordering, - _failure: Ordering, - ) -> Result<u64, u64> { - debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock - debug_assert_ne!(_failure, Ordering::SeqCst); - - let mut lock = self.inner.lock(); - - if *lock == current { - *lock = new; - Ok(current) - } else { - Err(*lock) - } - } - - fn compare_exchange_weak( - &self, - current: u64, - new: u64, - success: Ordering, - failure: Ordering, - ) -> Result<u64, u64> { - self.compare_exchange(current, new, success, failure) - } -} - /// This structure holds the current shared state of the timer - its scheduled /// time (if registered), or otherwise the result of the timer completing, as /// well as the registered waker. @@ -300,7 +228,7 @@ impl StateCell { /// expiration time. /// /// While this function is memory-safe, it should only be called from a - /// context holding both `&mut TimerEntry` and the driver lock. + /// context holding both `&mut TimerEntry` and the driver lock. fn set_expiration(&self, timestamp: u64) { debug_assert!(timestamp < STATE_MIN_VALUE); diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs index bfc49fb..e934b56 100644 --- a/src/time/driver/handle.rs +++ b/src/time/driver/handle.rs @@ -47,7 +47,7 @@ cfg_rt! { /// panicking. pub(crate) fn current() -> Self { crate::runtime::context::time_handle() - .expect("there is no timer running, must be called from the context of Tokio runtime") + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") } } } @@ -71,8 +71,7 @@ cfg_not_rt! { /// lazy, and so outside executed inside the runtime successfuly without /// panicking. pub(crate) fn current() -> Self { - panic!("there is no timer running, must be called from the context of Tokio runtime or \ - `rt` is not enabled") + panic!(crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 9fbc0b3..615307e 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -102,8 +102,8 @@ pub(self) struct ClockTime { impl ClockTime { pub(self) fn new(clock: Clock) -> Self { Self { + start_time: clock.now(), clock, - start_time: super::clock::now(), } } diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs index 69a6e6d..2438f14 100644 --- a/src/time/driver/sleep.rs +++ b/src/time/driver/sleep.rs @@ -58,8 +58,93 @@ pub fn sleep(duration: Duration) -> Sleep { } pin_project! { - /// Future returned by [`sleep`](sleep) and - /// [`sleep_until`](sleep_until). + /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). + /// + /// This type does not implement the `Unpin` trait, which means that if you + /// use it with [`select!`] or by calling `poll`, you have to pin it first. + /// If you use it with `.await`, this does not apply. + /// + /// # Examples + /// + /// Wait 100ms and print "100 ms have elapsed". + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// sleep(Duration::from_millis(100)).await; + /// println!("100 ms have elapsed"); + /// } + /// ``` + /// + /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is + /// necessary when the same `Sleep` is selected on multiple times. + /// ```no_run + /// use tokio::time::{self, Duration, Instant}; + /// + /// #[tokio::main] + /// async fn main() { + /// let sleep = time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// loop { + /// tokio::select! { + /// () = &mut sleep => { + /// println!("timer elapsed"); + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); + /// }, + /// } + /// } + /// } + /// ``` + /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the + /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// + /// struct HasSleep { + /// sleep: Pin<Box<Sleep>>, + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.sleep.as_mut().poll(cx) + /// } + /// } + /// ``` + /// Use in a struct with pin projection. This method avoids the `Box`, but + /// the `HasSleep` struct will not be `Unpin` as a consequence. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// use pin_project_lite::pin_project; + /// + /// pin_project! { + /// struct HasSleep { + /// #[pin] + /// sleep: Sleep, + /// } + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.project().sleep.poll(cx) + /// } + /// } + /// ``` + /// + /// [`select!`]: ../macro.select.html + /// [`tokio::pin!`]: ../macro.pin.html #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { @@ -98,6 +183,26 @@ impl Sleep { /// /// This function can be called both before and after the future has /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method with consuming the + /// `Sleep` itself. + /// + /// # Example + /// + /// ``` + /// use tokio::time::{Duration, Instant}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let sleep = tokio::time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); + /// # } + /// ``` + /// + /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut pub fn reset(self: Pin<&mut Self>, deadline: Instant) { let me = self.project(); me.entry.reset(deadline); diff --git a/src/time/driver/tests/mod.rs b/src/time/driver/tests/mod.rs index cfefed3..8ae4a84 100644 --- a/src/time/driver/tests/mod.rs +++ b/src/time/driver/tests/mod.rs @@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) { #[test] fn single_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -72,7 +72,7 @@ fn single_timer() { #[test] fn drop_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -103,7 +103,7 @@ fn drop_timer() { #[test] fn change_waker() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -138,7 +138,7 @@ fn reset_future() { model(|| { let finished_early = Arc::new(AtomicBool::new(false)); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -185,7 +185,7 @@ fn reset_future() { #[test] #[cfg(not(loom))] fn poll_process_levels() { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); @@ -226,7 +226,7 @@ fn poll_process_levels() { fn poll_process_levels_targeted() { let mut context = Context::from_waker(noop_waker_ref()); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs index 58280b1..81d6b58 100644 --- a/src/time/driver/wheel/level.rs +++ b/src/time/driver/wheel/level.rs @@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } -/* #[cfg(all(test, not(loom)))] mod test { use super::*; #[test] fn test_slot_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!(pos as usize, slot_for(pos, 0)); } @@ -274,4 +273,3 @@ mod test { } } } -*/ diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs index 164cac4..24bf517 100644 --- a/src/time/driver/wheel/mod.rs +++ b/src/time/driver/wheel/mod.rs @@ -122,6 +122,13 @@ impl Wheel { if when == u64::max_value() { self.pending.remove(item); } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + let level = self.level_for(when); self.levels[level].remove_entry(item); @@ -281,15 +288,17 @@ impl Wheel { } fn level_for(elapsed: u64, when: u64) -> usize { - let mut masked = elapsed ^ when; + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; if masked >= MAX_DURATION { // Fudge the timer into the top level masked = MAX_DURATION - 1; } - assert!(masked != 0, "elapsed={}; when={}", elapsed, when); - let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; @@ -302,7 +311,7 @@ mod test { #[test] fn test_level_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!( 0, level_for(0, pos), |