diff options
author | Joel Galenson <jgalenson@google.com> | 2021-06-22 09:28:11 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-06-22 09:28:30 -0700 |
commit | b53dd06ad19d902c2155b2f616725b14b423a776 (patch) | |
tree | b92904fad1f985434ad7622c3960e6e670b1c918 /src/time/driver/mod.rs | |
parent | 8c2e0e8165f4f0132f3a5c78337fbba15b102768 (diff) | |
download | tokio-b53dd06ad19d902c2155b2f616725b14b423a776.tar.gz |
Upgrade rust/crates/tokio to 1.7.1
Test: make
Change-Id: I7ebd839df13023db6f2057e09d8b73967436b856
Diffstat (limited to 'src/time/driver/mod.rs')
-rw-r--r-- | src/time/driver/mod.rs | 93 |
1 files changed, 75 insertions, 18 deletions
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 3eb1004..37d2231 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> { /// Parker to delegate to park: P, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, } /// A structure which handles conversion from Instants to u64 timestamps. @@ -178,6 +187,8 @@ where time_source, handle: Handle::new(Arc::new(inner)), park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), } } @@ -192,8 +203,6 @@ where } fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { - let clock = &self.time_source.clock; - let mut lock = self.handle.get().state.lock(); assert!(!self.handle.is_shutdown()); @@ -217,26 +226,14 @@ where duration = std::cmp::min(limit, duration); } - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - // Simulate advancing time - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park_timeout(Duration::from_secs(0))?; } } None => { if let Some(duration) = limit { - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park()?; } @@ -248,6 +245,39 @@ where Ok(()) } + + cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) + } + } } impl Handle { @@ -387,11 +417,11 @@ impl<P> Park for Driver<P> where P: Park + 'static, { - type Unpark = P::Unpark; + type Unpark = TimerUnpark<P>; type Error = P::Error; fn unpark(&self) -> Self::Unpark { - self.park.unpark() + TimerUnpark::new(self) } fn park(&mut self) -> Result<(), Self::Error> { @@ -426,6 +456,33 @@ where } } +pub(crate) struct TimerUnpark<P: Park + 'static> { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc<AtomicBool>, +} + +impl<P: Park + 'static> TimerUnpark<P> { + fn new(driver: &Driver<P>) -> TimerUnpark<P> { + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl<P: Park + 'static> Unpark for TimerUnpark<P> { + fn unpark(&self) { + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); + } +} + // ===== impl Inner ===== impl Inner { |