aboutsummaryrefslogtreecommitdiff
path: root/src/time/driver/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/time/driver/mod.rs')
-rw-r--r--src/time/driver/mod.rs93
1 files changed, 75 insertions, 18 deletions
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index 3eb1004..37d2231 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> {
/// Parker to delegate to
park: P,
+
+ // When `true`, a call to `park_timeout` should immediately return and time
+ // should not advance. One reason for this to be `true` is if the task
+ // passed to `Runtime::block_on` called `task::yield_now()`.
+ //
+ // While it may look racy, it only has any effect when the clock is paused
+ // and pausing the clock is restricted to a single-threaded runtime.
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
}
/// A structure which handles conversion from Instants to u64 timestamps.
@@ -178,6 +187,8 @@ where
time_source,
handle: Handle::new(Arc::new(inner)),
park,
+ #[cfg(feature = "test-util")]
+ did_wake: Arc::new(AtomicBool::new(false)),
}
}
@@ -192,8 +203,6 @@ where
}
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
- let clock = &self.time_source.clock;
-
let mut lock = self.handle.get().state.lock();
assert!(!self.handle.is_shutdown());
@@ -217,26 +226,14 @@ where
duration = std::cmp::min(limit, duration);
}
- if clock.is_paused() {
- self.park.park_timeout(Duration::from_secs(0))?;
-
- // Simulate advancing time
- clock.advance(duration);
- } else {
- self.park.park_timeout(duration)?;
- }
+ self.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
- if clock.is_paused() {
- self.park.park_timeout(Duration::from_secs(0))?;
- clock.advance(duration);
- } else {
- self.park.park_timeout(duration)?;
- }
+ self.park_timeout(duration)?;
} else {
self.park.park()?;
}
@@ -248,6 +245,39 @@ where
Ok(())
}
+
+ cfg_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ let clock = &self.time_source.clock;
+
+ if clock.is_paused() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+
+ // If the time driver was woken, then the park completed
+ // before the "duration" elapsed (usually caused by a
+ // yield in `Runtime::block_on`). In this case, we don't
+ // advance the clock.
+ if !self.did_wake() {
+ // Simulate advancing time
+ clock.advance(duration);
+ }
+ } else {
+ self.park.park_timeout(duration)?;
+ }
+
+ Ok(())
+ }
+
+ fn did_wake(&self) -> bool {
+ self.did_wake.swap(false, Ordering::SeqCst)
+ }
+ }
+
+ cfg_not_test_util! {
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
+ self.park.park_timeout(duration)
+ }
+ }
}
impl Handle {
@@ -387,11 +417,11 @@ impl<P> Park for Driver<P>
where
P: Park + 'static,
{
- type Unpark = P::Unpark;
+ type Unpark = TimerUnpark<P>;
type Error = P::Error;
fn unpark(&self) -> Self::Unpark {
- self.park.unpark()
+ TimerUnpark::new(self)
}
fn park(&mut self) -> Result<(), Self::Error> {
@@ -426,6 +456,33 @@ where
}
}
+pub(crate) struct TimerUnpark<P: Park + 'static> {
+ inner: P::Unpark,
+
+ #[cfg(feature = "test-util")]
+ did_wake: Arc<AtomicBool>,
+}
+
+impl<P: Park + 'static> TimerUnpark<P> {
+ fn new(driver: &Driver<P>) -> TimerUnpark<P> {
+ TimerUnpark {
+ inner: driver.park.unpark(),
+
+ #[cfg(feature = "test-util")]
+ did_wake: driver.did_wake.clone(),
+ }
+ }
+}
+
+impl<P: Park + 'static> Unpark for TimerUnpark<P> {
+ fn unpark(&self) {
+ #[cfg(feature = "test-util")]
+ self.did_wake.store(true, Ordering::SeqCst);
+
+ self.inner.unpark();
+ }
+}
+
// ===== impl Inner =====
impl Inner {