aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/blocking/mod.rs2
-rw-r--r--src/runtime/blocking/pool.rs9
-rw-r--r--src/runtime/blocking/schedule.rs49
-rw-r--r--src/runtime/builder.rs26
-rw-r--r--src/runtime/context.rs22
-rw-r--r--src/runtime/driver.rs9
-rw-r--r--src/runtime/handle.rs6
-rw-r--r--src/runtime/io/mod.rs16
-rw-r--r--src/runtime/io/registration.rs26
-rw-r--r--src/runtime/io/scheduled_io.rs43
-rw-r--r--src/runtime/metrics/batch.rs13
-rw-r--r--src/runtime/metrics/mock.rs1
-rw-r--r--src/runtime/metrics/runtime.rs59
-rw-r--r--src/runtime/metrics/worker.rs6
-rw-r--r--src/runtime/mod.rs3
-rw-r--r--src/runtime/runtime.rs6
-rw-r--r--src/runtime/scheduler/multi_thread/queue.rs3
-rw-r--r--src/runtime/task/harness.rs49
-rw-r--r--src/runtime/task/id.rs87
-rw-r--r--src/runtime/task/join.rs6
-rw-r--r--src/runtime/task/mod.rs136
-rw-r--r--src/runtime/task/state.rs8
-rw-r--r--src/runtime/tests/loom_blocking.rs21
-rw-r--r--src/runtime/tests/loom_queue.rs2
-rw-r--r--src/runtime/tests/mod.rs20
-rw-r--r--src/runtime/tests/task.rs2
-rw-r--r--src/runtime/thread_id.rs31
-rw-r--r--src/runtime/time/mod.rs2
28 files changed, 416 insertions, 247 deletions
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs
index 88bdcfd..c42924b 100644
--- a/src/runtime/blocking/mod.rs
+++ b/src/runtime/blocking/mod.rs
@@ -17,8 +17,6 @@ cfg_trace! {
mod schedule;
mod shutdown;
mod task;
-#[cfg(all(test, not(tokio_wasm)))]
-pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;
use crate::runtime::Builder;
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 9c53614..e9f6b66 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -2,7 +2,7 @@
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
-use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
@@ -120,7 +120,7 @@ struct Shared {
}
pub(crate) struct Task {
- task: task::UnownedTask<NoopSchedule>,
+ task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}
@@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error {
}
impl Task {
- pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
+ pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}
@@ -379,7 +379,8 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
- let (task, handle) = task::unowned(fut, NoopSchedule, id);
+ let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
+
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
diff --git a/src/runtime/blocking/schedule.rs b/src/runtime/blocking/schedule.rs
index 5425224..edf775b 100644
--- a/src/runtime/blocking/schedule.rs
+++ b/src/runtime/blocking/schedule.rs
@@ -1,15 +1,52 @@
+#[cfg(feature = "test-util")]
+use crate::runtime::scheduler;
use crate::runtime::task::{self, Task};
+use crate::runtime::Handle;
-/// `task::Schedule` implementation that does nothing. This is unique to the
-/// blocking scheduler as tasks scheduled are not really futures but blocking
-/// operations.
+/// `task::Schedule` implementation that does nothing (except some bookkeeping
+/// in test-util builds). This is unique to the blocking scheduler as tasks
+/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
-/// in `release.
-pub(crate) struct NoopSchedule;
+/// in `release`.
+pub(crate) struct BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: Handle,
+}
+
+impl BlockingSchedule {
+ #[cfg_attr(not(feature = "test-util"), allow(unused_variables))]
+ pub(crate) fn new(handle: &Handle) -> Self {
+ #[cfg(feature = "test-util")]
+ {
+ match &handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.inhibit_auto_advance();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
+ BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: handle.clone(),
+ }
+ }
+}
-impl task::Schedule for NoopSchedule {
+impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ #[cfg(feature = "test-util")]
+ {
+ match &self.handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.allow_auto_advance();
+ handle.driver.unpark();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
None
}
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index d49a4da..ea0df2e 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -44,6 +44,7 @@ pub struct Builder {
/// Whether or not to enable the I/O driver
enable_io: bool,
+ nevents: usize,
/// Whether or not to enable the time driver
enable_time: bool,
@@ -181,6 +182,7 @@ cfg_unstable! {
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
+#[derive(Clone, Copy)]
pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
@@ -228,6 +230,7 @@ impl Builder {
// I/O defaults to "off"
enable_io: false,
+ nevents: 1024,
// Time defaults to "off"
enable_time: false,
@@ -235,6 +238,7 @@ impl Builder {
// The clock starts not-paused
start_paused: false,
+ // Read from environment variable first in multi-threaded mode.
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
@@ -302,6 +306,8 @@ impl Builder {
/// This can be any number above 0 though it is advised to keep this value
/// on the smaller side.
///
+ /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
+ ///
/// # Default
///
/// The default value is the number of cores available to the system.
@@ -647,6 +653,7 @@ impl Builder {
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
+ nevents: self.nevents,
}
}
@@ -938,6 +945,25 @@ cfg_io_driver! {
self.enable_io = true;
self
}
+
+ /// Enables the I/O driver and configures the max number of events to be
+ /// processed per tick.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new_current_thread()
+ /// .enable_io()
+ /// .max_io_events_per_tick(1024)
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
+ self.nevents = capacity;
+ self
+ }
}
}
diff --git a/src/runtime/context.rs b/src/runtime/context.rs
index 2e54c8b..fef53ca 100644
--- a/src/runtime/context.rs
+++ b/src/runtime/context.rs
@@ -15,6 +15,10 @@ cfg_rt! {
}
struct Context {
+ /// Uniquely identifies the current thread
+ #[cfg(feature = "rt")]
+ thread_id: Cell<Option<ThreadId>>,
+
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,
@@ -46,6 +50,9 @@ struct Context {
tokio_thread_local! {
static CONTEXT: Context = {
Context {
+ #[cfg(feature = "rt")]
+ thread_id: Cell::new(None),
+
/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
#[cfg(feature = "rt")]
@@ -82,10 +89,23 @@ pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, A
}
cfg_rt! {
- use crate::runtime::TryCurrentError;
+ use crate::runtime::{ThreadId, TryCurrentError};
use std::fmt;
+ pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
+ CONTEXT.try_with(|ctx| {
+ match ctx.thread_id.get() {
+ Some(id) => id,
+ None => {
+ let id = ThreadId::next();
+ ctx.thread_id.set(Some(id));
+ id
+ }
+ }
+ })
+ }
+
#[derive(Debug, Clone, Copy)]
#[must_use]
pub(crate) enum EnterRuntime {
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
index 8f9c512..4fb6b87 100644
--- a/src/runtime/driver.rs
+++ b/src/runtime/driver.rs
@@ -36,11 +36,12 @@ pub(crate) struct Cfg {
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
+ pub(crate) nevents: usize,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
- let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
+ let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
@@ -135,12 +136,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}
- fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);
let ret = if enabled {
- let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;
+ let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
@@ -201,7 +202,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);
- fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index da47ecb..c5dc65f 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -118,9 +118,9 @@ impl Handle {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// See [module level][mod] documentation for more details.
///
diff --git a/src/runtime/io/mod.rs b/src/runtime/io/mod.rs
index 02039f2..2e578b6 100644
--- a/src/runtime/io/mod.rs
+++ b/src/runtime/io/mod.rs
@@ -60,6 +60,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
+ is_shutdown: bool,
}
struct IoDispatcher {
@@ -104,7 +105,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
- pub(crate) fn new() -> io::Result<(Driver, Handle)> {
+ pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
@@ -116,7 +117,7 @@ impl Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
- events: mio::Events::with_capacity(1024),
+ events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
@@ -147,9 +148,8 @@ impl Driver {
if handle.shutdown() {
self.resources.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown. And shutdown will clear all wakers.
+ // If a task is waiting on the I/O resource, notify it that the
+ // runtime is being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
@@ -282,16 +282,12 @@ impl Handle {
true
}
- fn is_shutdown(&self) -> bool {
- return self.io_dispatch.read().unwrap().is_shutdown;
- }
-
fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
- "failed to find event loop",
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
io.allocator.allocate().ok_or_else(|| {
diff --git a/src/runtime/io/registration.rs b/src/runtime/io/registration.rs
index 7b95f7f..140b924 100644
--- a/src/runtime/io/registration.rs
+++ b/src/runtime/io/registration.rs
@@ -148,7 +148,7 @@ impl Registration {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
- if self.handle().is_shutdown() {
+ if ev.is_shutdown {
return Poll::Ready(Err(gone()));
}
@@ -217,28 +217,22 @@ impl Drop for Registration {
}
fn gone() -> io::Error {
- io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
+ io::Error::new(
+ io::ErrorKind::Other,
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
+ )
}
cfg_io_readiness! {
impl Registration {
pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
- use std::future::Future;
- use std::pin::Pin;
+ let ev = self.shared.readiness(interest).await;
- let fut = self.shared.readiness(interest);
- pin!(fut);
-
- crate::future::poll_fn(|cx| {
- if self.handle().is_shutdown() {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
- )));
- }
+ if ev.is_shutdown {
+ return Err(gone())
+ }
- Pin::new(&mut fut).poll(cx).map(Ok)
- }).await
+ Ok(ev)
}
pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
diff --git a/src/runtime/io/scheduled_io.rs b/src/runtime/io/scheduled_io.rs
index 1709091..197a4e0 100644
--- a/src/runtime/io/scheduled_io.rs
+++ b/src/runtime/io/scheduled_io.rs
@@ -46,9 +46,6 @@ struct Waiters {
/// Waker used for AsyncWrite.
writer: Option<Waker>,
-
- /// True if this ScheduledIo has been killed due to IO driver shutdown.
- is_shutdown: bool,
}
cfg_io_readiness! {
@@ -95,7 +92,7 @@ cfg_io_readiness! {
// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
-// | reserved | generation | driver tick | readiness |
+// | shutdown | generation | driver tick | readiness |
// |----------+------------+--------------+-----------|
// | 1 bit | 7 bits + 8 bits + 16 bits |
@@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8);
const GENERATION: bit::Pack = TICK.then(7);
+const SHUTDOWN: bit::Pack = GENERATION.then(1);
+
#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
@@ -138,9 +137,11 @@ impl ScheduledIo {
}
/// Invoked when the IO driver is shut down; forces this ScheduledIo into a
- /// permanently ready state.
+ /// permanently shutdown state.
pub(super) fn shutdown(&self) {
- self.wake0(Ready::ALL, true)
+ let mask = SHUTDOWN.pack(1, 0);
+ self.readiness.fetch_or(mask, AcqRel);
+ self.wake(Ready::ALL);
}
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
@@ -219,16 +220,10 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
- self.wake0(ready, false);
- }
-
- fn wake0(&self, ready: Ready, shutdown: bool) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
- waiters.is_shutdown |= shutdown;
-
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
@@ -283,6 +278,7 @@ impl ScheduledIo {
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
+ is_shutdown: SHUTDOWN.unpack(curr) != 0,
}
}
@@ -299,8 +295,9 @@ impl ScheduledIo {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
- if ready.is_empty() {
+ if ready.is_empty() && !is_shutdown {
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
@@ -325,10 +322,12 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if waiters.is_shutdown {
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+ if is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
+ is_shutdown,
})
} else if ready.is_empty() {
Poll::Pending
@@ -336,12 +335,14 @@ impl ScheduledIo {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
+ is_shutdown,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
+ is_shutdown,
})
}
}
@@ -433,16 +434,17 @@ cfg_io_readiness! {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
- if !ready.is_empty() {
+ if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
// Wasn't ready, take the lock (and check again while locked).
@@ -450,18 +452,19 @@ cfg_io_readiness! {
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
- if waiters.is_shutdown {
+ if is_shutdown {
ready = Ready::ALL;
}
let ready = ready.intersection(interest);
- if !ready.is_empty() {
+ if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
// Not ready even after locked, insert into list...
@@ -514,6 +517,7 @@ cfg_io_readiness! {
let w = unsafe { &mut *waiter.get() };
let curr = scheduled_io.readiness.load(Acquire);
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
@@ -528,6 +532,7 @@ cfg_io_readiness! {
return Poll::Ready(ReadyEvent {
tick,
ready,
+ is_shutdown,
});
}
}
diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs
index f1c3fa6..4e6b28d 100644
--- a/src/runtime/metrics/batch.rs
+++ b/src/runtime/metrics/batch.rs
@@ -11,9 +11,12 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker woke w/o doing work.
noop_count: u64,
- /// Number of times stolen.
+ /// Number of tasks stolen.
steal_count: u64,
+ /// Number of times tasks where stolen.
+ steal_operations: u64,
+
/// Number of tasks that were polled by the worker.
poll_count: u64,
@@ -39,6 +42,7 @@ impl MetricsBatch {
park_count: 0,
noop_count: 0,
steal_count: 0,
+ steal_operations: 0,
poll_count: 0,
poll_count_on_last_park: 0,
local_schedule_count: 0,
@@ -52,6 +56,9 @@ impl MetricsBatch {
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
+ worker
+ .steal_operations
+ .store(self.steal_operations, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
worker
@@ -98,6 +105,10 @@ cfg_rt_multi_thread! {
self.steal_count += by as u64;
}
+ pub(crate) fn incr_steal_operations(&mut self) {
+ self.steal_operations += 1;
+ }
+
pub(crate) fn incr_overflow_count(&mut self) {
self.overflow_count += 1;
}
diff --git a/src/runtime/metrics/mock.rs b/src/runtime/metrics/mock.rs
index 6b9cf70..c388dc0 100644
--- a/src/runtime/metrics/mock.rs
+++ b/src/runtime/metrics/mock.rs
@@ -38,6 +38,7 @@ impl MetricsBatch {
cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
+ pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}
diff --git a/src/runtime/metrics/runtime.rs b/src/runtime/metrics/runtime.rs
index dee14a4..d29cb3d 100644
--- a/src/runtime/metrics/runtime.rs
+++ b/src/runtime/metrics/runtime.rs
@@ -210,10 +210,57 @@ impl RuntimeMetrics {
.load(Relaxed)
}
+ /// Returns the number of tasks the given worker thread stole from
+ /// another worker thread.
+ ///
+ /// This metric only applies to the **multi-threaded** runtime and will
+ /// always return `0` when using the current thread runtime.
+ ///
+ /// The worker steal count starts at zero when the runtime is created and
+ /// increases by `N` each time the worker has processed its scheduled queue
+ /// and successfully steals `N` more pending tasks from another worker.
+ ///
+ /// The counter is monotonically increasing. It is never decremented or
+ /// reset to zero.
+ ///
+ /// # Arguments
+ ///
+ /// `worker` is the index of the worker being queried. The given value must
+ /// be between 0 and `num_workers()`. The index uniquely identifies a single
+ /// worker and will continue to identify the worker throughout the lifetime
+ /// of the runtime instance.
+ ///
+ /// # Panics
+ ///
+ /// The method panics when `worker` represents an invalid worker, i.e. is
+ /// greater than or equal to `num_workers()`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Handle;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let metrics = Handle::current().metrics();
+ ///
+ /// let n = metrics.worker_steal_count(0);
+ /// println!("worker 0 has stolen {} tasks", n);
+ /// }
+ /// ```
+ pub fn worker_steal_count(&self, worker: usize) -> u64 {
+ self.handle
+ .inner
+ .worker_metrics(worker)
+ .steal_count
+ .load(Relaxed)
+ }
+
/// Returns the number of times the given worker thread stole tasks from
/// another worker thread.
///
- /// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime.
+ /// This metric only applies to the **multi-threaded** runtime and will
+ /// always return `0` when using the current thread runtime.
///
/// The worker steal count starts at zero when the runtime is created and
/// increases by one each time the worker has processed its scheduled queue
@@ -243,15 +290,15 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
- /// let n = metrics.worker_noop_count(0);
+ /// let n = metrics.worker_steal_operations(0);
/// println!("worker 0 has stolen tasks {} times", n);
/// }
/// ```
- pub fn worker_steal_count(&self, worker: usize) -> u64 {
+ pub fn worker_steal_operations(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
- .steal_count
+ .steal_operations
.load(Relaxed)
}
@@ -328,8 +375,8 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
- /// let n = metrics.worker_poll_count(0);
- /// println!("worker 0 has polled {} tasks", n);
+ /// let n = metrics.worker_total_busy_duration(0);
+ /// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
diff --git a/src/runtime/metrics/worker.rs b/src/runtime/metrics/worker.rs
index ec58de6..a40c76e 100644
--- a/src/runtime/metrics/worker.rs
+++ b/src/runtime/metrics/worker.rs
@@ -17,9 +17,12 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: AtomicU64,
- /// Number of times the worker attempted to steal.
+ /// Number of tasks the worker stole.
pub(crate) steal_count: AtomicU64,
+ /// Number of times the worker stole
+ pub(crate) steal_operations: AtomicU64,
+
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,
@@ -43,6 +46,7 @@ impl WorkerMetrics {
park_count: AtomicU64::new(0),
noop_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
+ steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 45b79b0..b6f43ea 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -237,6 +237,9 @@ cfg_rt! {
mod runtime;
pub use runtime::{Runtime, RuntimeFlavor};
+ mod thread_id;
+ pub(crate) use thread_id::ThreadId;
+
cfg_metrics! {
mod metrics;
pub use metrics::RuntimeMetrics;
diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs
index 9ede0a7..1985673 100644
--- a/src/runtime/runtime.rs
+++ b/src/runtime/runtime.rs
@@ -163,9 +163,9 @@ impl Runtime {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the
- /// background immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// See [module level][mod] documentation for more details.
///
diff --git a/src/runtime/scheduler/multi_thread/queue.rs b/src/runtime/scheduler/multi_thread/queue.rs
index 59b448d..faf56db 100644
--- a/src/runtime/scheduler/multi_thread/queue.rs
+++ b/src/runtime/scheduler/multi_thread/queue.rs
@@ -263,7 +263,7 @@ impl<T> Local<T> {
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
- buffer: &*self.inner.buffer,
+ buffer: &self.inner.buffer,
head: head as UnsignedLong,
i: 0,
};
@@ -353,6 +353,7 @@ impl<T> Steal<T> {
}
dst_metrics.incr_steal_count(n as u16);
+ dst_metrics.incr_steal_operations();
// We are returning a task here
n -= 1;
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index c079297..8e3c3d1 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -194,7 +194,7 @@ where
TransitionToRunning::Success => {
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
- let cx = Context::from_waker(&*waker_ref);
+ let cx = Context::from_waker(&waker_ref);
let res = poll_future(self.core(), cx);
if res == Poll::Ready(()) {
@@ -315,9 +315,10 @@ where
// this task. It is our responsibility to drop the
// output.
self.core().drop_future_or_output();
- } else if snapshot.has_join_waker() {
- // Notify the join handle. The previous transition obtains the
- // lock on the waker cell.
+ } else if snapshot.is_join_waker_set() {
+ // Notify the waker. Reading the waker field is safe per rule 4
+ // in task/mod.rs, since the JOIN_WAKER bit is set and the call
+ // to transition_to_complete() above set the COMPLETE bit.
self.trailer().wake_join();
}
}));
@@ -367,36 +368,30 @@ fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
debug_assert!(snapshot.is_join_interested());
if !snapshot.is_complete() {
- // The waker must be stored in the task struct.
- let res = if snapshot.has_join_waker() {
- // There already is a waker stored in the struct. If it matches
- // the provided waker, then there is no further work to do.
- // Otherwise, the waker must be swapped.
- let will_wake = unsafe {
- // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
- // may mutate the `waker` field.
- trailer.will_wake(waker)
- };
-
- if will_wake {
- // The task is not complete **and** the waker is up to date,
- // there is nothing further that needs to be done.
+ // If the task is not complete, try storing the provided waker in the
+ // task's waker field.
+
+ let res = if snapshot.is_join_waker_set() {
+ // If JOIN_WAKER is set, then JoinHandle has previously stored a
+ // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
+
+ // Optimization: if the stored waker and the provided waker wake the
+ // same task, then return without touching the waker field. (Reading
+ // the waker field below is safe per rule 3 in task/mod.rs.)
+ if unsafe { trailer.will_wake(waker) } {
return false;
}
- // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
- // field then update the field with the new join worker.
- //
- // This requires two atomic operations, unsetting the bit and
- // then resetting it. If the task transitions to complete
- // concurrently to either one of those operations, then setting
- // the join waker fails and we proceed to reading the task
- // output.
+ // Otherwise swap the stored waker with the provided waker by
+ // following the rule 5 in task/mod.rs.
header
.state
.unset_waker()
.and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
} else {
+ // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
+ // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
+ // of rule 5 and try to store the provided waker in the waker field.
set_join_waker(header, trailer, waker.clone(), snapshot)
};
@@ -417,7 +412,7 @@ fn set_join_waker(
snapshot: Snapshot,
) -> Result<Snapshot, Snapshot> {
assert!(snapshot.is_join_interested());
- assert!(!snapshot.has_join_waker());
+ assert!(!snapshot.is_join_waker_set());
// Safety: Only the `JoinHandle` may set the `waker` field. When
// `JOIN_INTEREST` is **not** set, nothing else will touch the field.
diff --git a/src/runtime/task/id.rs b/src/runtime/task/id.rs
new file mode 100644
index 0000000..2b0d95c
--- /dev/null
+++ b/src/runtime/task/id.rs
@@ -0,0 +1,87 @@
+use crate::runtime::context;
+
+use std::fmt;
+
+/// An opaque ID that uniquely identifies a task relative to all other currently
+/// running tasks.
+///
+/// # Notes
+///
+/// - Task IDs are unique relative to other *currently running* tasks. When a
+/// task completes, the same ID may be used for another task.
+/// - Task IDs are *not* sequential, and do not indicate the order in which
+/// tasks are spawned, what runtime a task is spawned on, or any other data.
+/// - The task ID of the currently running task can be obtained from inside the
+/// task via the [`task::try_id()`](crate::task::try_id()) and
+/// [`task::id()`](crate::task::id()) functions and from outside the task via
+/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [unstable]: crate#unstable-features
+#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
+pub struct Id(u64);
+
+/// Returns the [`Id`] of the currently running task.
+///
+/// # Panics
+///
+/// This function panics if called from outside a task. Please note that calls
+/// to `block_on` do not have task IDs, so the method will panic if called from
+/// within a call to `block_on`. For a version of this function that doesn't
+/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn id() -> Id {
+ context::current_task_id().expect("Can't get a task id when not inside a task")
+}
+
+/// Returns the [`Id`] of the currently running task, or `None` if called outside
+/// of a task.
+///
+/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
+/// that it returns `None` rather than panicking if called outside of a task
+/// context.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn try_id() -> Option<Id> {
+ context::current_task_id()
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+impl Id {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
+
+ static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
+
+ Self(NEXT_ID.fetch_add(1, Relaxed))
+ }
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ self.0
+ }
+}
diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs
index 5660575..11c4b9b 100644
--- a/src/runtime/task/join.rs
+++ b/src/runtime/task/join.rs
@@ -11,9 +11,9 @@ cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
- /// for a Tokio task rather than a thread. You do not need to `.await` the
- /// `JoinHandle` to make the task execute — it will start running in the
- /// background immediately.
+ /// for a Tokio task rather than a thread. Note that the background task
+ /// associated with this `JoinHandle` started running immediately when you
+ /// called spawn, even if you have not yet awaited the `JoinHandle`.
///
/// A `JoinHandle` *detaches* the associated task when it is dropped, which
/// means that there is no longer any handle to the task, and no way to `join`
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
index fea6e0f..55131ac 100644
--- a/src/runtime/task/mod.rs
+++ b/src/runtime/task/mod.rs
@@ -168,19 +168,20 @@
// unstable. This should be removed once `JoinSet` is stabilized.
#![cfg_attr(not(tokio_unstable), allow(dead_code))]
-use crate::runtime::context;
-
mod core;
use self::core::Cell;
use self::core::Header;
mod error;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::error::JoinError;
mod harness;
use self::harness::Harness;
+mod id;
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+pub use id::{id, try_id, Id};
+
cfg_rt_multi_thread! {
mod inject;
pub(super) use self::inject::Inject;
@@ -191,10 +192,8 @@ mod abort;
mod join;
#[cfg(feature = "rt")]
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::abort::AbortHandle;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;
mod list;
@@ -215,70 +214,6 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
-/// An opaque ID that uniquely identifies a task relative to all other currently
-/// running tasks.
-///
-/// # Notes
-///
-/// - Task IDs are unique relative to other *currently running* tasks. When a
-/// task completes, the same ID may be used for another task.
-/// - Task IDs are *not* sequential, and do not indicate the order in which
-/// tasks are spawned, what runtime a task is spawned on, or any other data.
-/// - The task ID of the currently running task can be obtained from inside the
-/// task via the [`task::try_id()`](crate::task::try_id()) and
-/// [`task::id()`](crate::task::id()) functions and from outside the task via
-/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [unstable]: crate#unstable-features
-#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
-pub struct Id(u64);
-
-/// Returns the [`Id`] of the currently running task.
-///
-/// # Panics
-///
-/// This function panics if called from outside a task. Please note that calls
-/// to `block_on` do not have task IDs, so the method will panic if called from
-/// within a call to `block_on`. For a version of this function that doesn't
-/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [task ID]: crate::task::Id
-/// [unstable]: crate#unstable-features
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[track_caller]
-pub fn id() -> Id {
- context::current_task_id().expect("Can't get a task id when not inside a task")
-}
-
-/// Returns the [`Id`] of the currently running task, or `None` if called outside
-/// of a task.
-///
-/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
-/// that it returns `None` rather than panicking if called outside of a task
-/// context.
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [task ID]: crate::task::Id
-/// [unstable]: crate#unstable-features
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[track_caller]
-pub fn try_id() -> Option<Id> {
- context::current_task_id()
-}
-
/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
@@ -554,66 +489,3 @@ unsafe impl<S> linked_list::Link for Task<S> {
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}
-
-impl fmt::Display for Id {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
-}
-
-impl Id {
- // When 64-bit atomics are available, use a static `AtomicU64` counter to
- // generate task IDs.
- //
- // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches
- // between an atomic and mutex-based implementation here, rather than having
- // two separate functions for targets with and without 64-bit atomics.
- // However, because we can't use the mutex-based implementation in a static
- // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I
- // thought it was nicer to avoid the `OnceCell` overhead on 64-bit
- // platforms...
- cfg_has_atomic_u64! {
- pub(crate) fn next() -> Self {
- use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
- static NEXT_ID: AtomicU64 = AtomicU64::new(1);
- Self(NEXT_ID.fetch_add(1, Relaxed))
- }
- }
-
- cfg_not_has_atomic_u64! {
- cfg_has_const_mutex_new! {
- pub(crate) fn next() -> Self {
- use crate::loom::sync::Mutex;
- static NEXT_ID: Mutex<u64> = Mutex::const_new(1);
-
- let mut lock = NEXT_ID.lock();
- let id = *lock;
- *lock += 1;
- Self(id)
- }
- }
-
- cfg_not_has_const_mutex_new! {
- pub(crate) fn next() -> Self {
- use crate::util::once_cell::OnceCell;
- use crate::loom::sync::Mutex;
-
- fn init_next_id() -> Mutex<u64> {
- Mutex::new(1)
- }
-
- static NEXT_ID: OnceCell<Mutex<u64>> = OnceCell::new();
-
- let next_id = NEXT_ID.get(init_next_id);
- let mut lock = next_id.lock();
- let id = *lock;
- *lock += 1;
- Self(id)
- }
- }
- }
-
- pub(crate) fn as_u64(&self) -> u64 {
- self.0
- }
-}
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs
index c2d5b28..7728312 100644
--- a/src/runtime/task/state.rs
+++ b/src/runtime/task/state.rs
@@ -378,7 +378,7 @@ impl State {
pub(super) fn set_join_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(!curr.has_join_waker());
+ assert!(!curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -398,7 +398,7 @@ impl State {
pub(super) fn unset_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(curr.has_join_waker());
+ assert!(curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -546,7 +546,7 @@ impl Snapshot {
self.0 &= !JOIN_INTEREST
}
- pub(super) fn has_join_waker(self) -> bool {
+ pub(super) fn is_join_waker_set(self) -> bool {
self.0 & JOIN_WAKER == JOIN_WAKER
}
@@ -588,7 +588,7 @@ impl fmt::Debug for Snapshot {
.field("is_notified", &self.is_notified())
.field("is_cancelled", &self.is_cancelled())
.field("is_join_interested", &self.is_join_interested())
- .field("has_join_waker", &self.has_join_waker())
+ .field("is_join_waker_set", &self.is_join_waker_set())
.field("ref_count", &self.ref_count())
.finish()
}
diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs
index 89de85e..5c4aeae 100644
--- a/src/runtime/tests/loom_blocking.rs
+++ b/src/runtime/tests/loom_blocking.rs
@@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread
});
}
+#[test]
+fn spawn_blocking_when_paused() {
+ use std::time::Duration;
+ loom::model(|| {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+ let handle = rt.handle();
+ let _enter = handle.enter();
+ let a = crate::task::spawn_blocking(|| {});
+ let b = crate::task::spawn_blocking(|| {});
+ rt.block_on(crate::time::timeout(Duration::from_millis(1), async move {
+ a.await.expect("blocking task should finish");
+ b.await.expect("blocking task should finish");
+ }))
+ .expect("timeout should not trigger");
+ });
+}
+
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs
index 8d4e1d3..fc93bf3 100644
--- a/src/runtime/tests/loom_queue.rs
+++ b/src/runtime/tests/loom_queue.rs
@@ -1,6 +1,6 @@
-use crate::runtime::blocking::NoopSchedule;
use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::Inject;
+use crate::runtime::tests::NoopSchedule;
use crate::runtime::MetricsBatch;
use loom::thread;
diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs
index 1c67dfe..4e7c245 100644
--- a/src/runtime/tests/mod.rs
+++ b/src/runtime/tests/mod.rs
@@ -2,11 +2,29 @@
// other code when running loom tests.
#![cfg_attr(loom, warn(dead_code, unreachable_pub))]
+use self::noop_scheduler::NoopSchedule;
use self::unowned_wrapper::unowned;
+mod noop_scheduler {
+ use crate::runtime::task::{self, Task};
+
+ /// `task::Schedule` implementation that does nothing, for testing.
+ pub(crate) struct NoopSchedule;
+
+ impl task::Schedule for NoopSchedule {
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+ }
+}
+
mod unowned_wrapper {
- use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{Id, JoinHandle, Notified};
+ use crate::runtime::tests::NoopSchedule;
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs
index 173e5b0..a79c0f5 100644
--- a/src/runtime/tests/task.rs
+++ b/src/runtime/tests/task.rs
@@ -1,5 +1,5 @@
-use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;
use std::collections::VecDeque;
diff --git a/src/runtime/thread_id.rs b/src/runtime/thread_id.rs
new file mode 100644
index 0000000..ef39289
--- /dev/null
+++ b/src/runtime/thread_id.rs
@@ -0,0 +1,31 @@
+use std::num::NonZeroU64;
+
+#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
+pub(crate) struct ThreadId(NonZeroU64);
+
+impl ThreadId {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
+
+ static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0);
+
+ let mut last = NEXT_ID.load(Relaxed);
+ loop {
+ let id = match last.checked_add(1) {
+ Some(id) => id,
+ None => exhausted(),
+ };
+
+ match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) {
+ Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()),
+ Err(id) => last = id,
+ }
+ }
+ }
+}
+
+#[cold]
+#[allow(dead_code)]
+fn exhausted() -> ! {
+ panic!("failed to generate unique thread ID: bitspace exhausted")
+}
diff --git a/src/runtime/time/mod.rs b/src/runtime/time/mod.rs
index 240f8f1..f81cab8 100644
--- a/src/runtime/time/mod.rs
+++ b/src/runtime/time/mod.rs
@@ -222,7 +222,7 @@ impl Driver {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
- if clock.is_paused() {
+ if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
// If the time driver was woken, then the park completed