From b53dd06ad19d902c2155b2f616725b14b423a776 Mon Sep 17 00:00:00 2001 From: Joel Galenson Date: Tue, 22 Jun 2021 09:28:11 -0700 Subject: Upgrade rust/crates/tokio to 1.7.1 Test: make Change-Id: I7ebd839df13023db6f2057e09d8b73967436b856 --- src/time/driver/mod.rs | 93 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 18 deletions(-) (limited to 'src/time/driver/mod.rs') diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 3eb1004..37d2231 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -91,6 +91,15 @@ pub(crate) struct Driver { /// Parker to delegate to park: P, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + did_wake: Arc, } /// A structure which handles conversion from Instants to u64 timestamps. @@ -178,6 +187,8 @@ where time_source, handle: Handle::new(Arc::new(inner)), park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), } } @@ -192,8 +203,6 @@ where } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { - let clock = &self.time_source.clock; - let mut lock = self.handle.get().state.lock(); assert!(!self.handle.is_shutdown()); @@ -217,26 +226,14 @@ where duration = std::cmp::min(limit, duration); } - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - // Simulate advancing time - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park_timeout(Duration::from_secs(0))?; } } None => { if let Some(duration) = limit { - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park()?; } @@ -248,6 +245,39 @@ where Ok(()) } + + cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) + } + } } impl Handle { @@ -387,11 +417,11 @@ impl

Park for Driver

where P: Park + 'static, { - type Unpark = P::Unpark; + type Unpark = TimerUnpark

; type Error = P::Error; fn unpark(&self) -> Self::Unpark { - self.park.unpark() + TimerUnpark::new(self) } fn park(&mut self) -> Result<(), Self::Error> { @@ -426,6 +456,33 @@ where } } +pub(crate) struct TimerUnpark { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc, +} + +impl TimerUnpark

{ + fn new(driver: &Driver

) -> TimerUnpark

{ + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl Unpark for TimerUnpark

{ + fn unpark(&self) { + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); + } +} + // ===== impl Inner ===== impl Inner { -- cgit v1.2.3