diff options
Diffstat (limited to 'src/runtime')
28 files changed, 416 insertions, 247 deletions
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs index 88bdcfd..c42924b 100644 --- a/src/runtime/blocking/mod.rs +++ b/src/runtime/blocking/mod.rs @@ -17,8 +17,6 @@ cfg_trace! { mod schedule; mod shutdown; mod task; -#[cfg(all(test, not(tokio_wasm)))] -pub(crate) use schedule::NoopSchedule; pub(crate) use task::BlockingTask; use crate::runtime::Builder; diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 9c53614..e9f6b66 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -2,7 +2,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::schedule::BlockingSchedule; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -120,7 +120,7 @@ struct Shared { } pub(crate) struct Task { - task: task::UnownedTask<NoopSchedule>, + task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory, } @@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error { } impl Task { - pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task { + pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task { Task { task, mandatory } } @@ -379,7 +379,8 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); + let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); + let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) } diff --git a/src/runtime/blocking/schedule.rs b/src/runtime/blocking/schedule.rs index 5425224..edf775b 100644 --- a/src/runtime/blocking/schedule.rs +++ b/src/runtime/blocking/schedule.rs @@ -1,15 +1,52 @@ +#[cfg(feature = "test-util")] +use crate::runtime::scheduler; use crate::runtime::task::{self, Task}; +use crate::runtime::Handle; -/// `task::Schedule` implementation that does nothing. This is unique to the -/// blocking scheduler as tasks scheduled are not really futures but blocking -/// operations. +/// `task::Schedule` implementation that does nothing (except some bookkeeping +/// in test-util builds). This is unique to the blocking scheduler as tasks +/// scheduled are not really futures but blocking operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it -/// in `release. -pub(crate) struct NoopSchedule; +/// in `release`. +pub(crate) struct BlockingSchedule { + #[cfg(feature = "test-util")] + handle: Handle, +} + +impl BlockingSchedule { + #[cfg_attr(not(feature = "test-util"), allow(unused_variables))] + pub(crate) fn new(handle: &Handle) -> Self { + #[cfg(feature = "test-util")] + { + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } + BlockingSchedule { + #[cfg(feature = "test-util")] + handle: handle.clone(), + } + } +} -impl task::Schedule for NoopSchedule { +impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + #[cfg(feature = "test-util")] + { + match &self.handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } None } diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index d49a4da..ea0df2e 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -44,6 +44,7 @@ pub struct Builder { /// Whether or not to enable the I/O driver enable_io: bool, + nevents: usize, /// Whether or not to enable the time driver enable_time: bool, @@ -181,6 +182,7 @@ cfg_unstable! { pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; +#[derive(Clone, Copy)] pub(crate) enum Kind { CurrentThread, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] @@ -228,6 +230,7 @@ impl Builder { // I/O defaults to "off" enable_io: false, + nevents: 1024, // Time defaults to "off" enable_time: false, @@ -235,6 +238,7 @@ impl Builder { // The clock starts not-paused start_paused: false, + // Read from environment variable first in multi-threaded mode. // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, @@ -302,6 +306,8 @@ impl Builder { /// This can be any number above 0 though it is advised to keep this value /// on the smaller side. /// + /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. + /// /// # Default /// /// The default value is the number of cores available to the system. @@ -647,6 +653,7 @@ impl Builder { enable_io: self.enable_io, enable_time: self.enable_time, start_paused: self.start_paused, + nevents: self.nevents, } } @@ -938,6 +945,25 @@ cfg_io_driver! { self.enable_io = true; self } + + /// Enables the I/O driver and configures the max number of events to be + /// processed per tick. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_io() + /// .max_io_events_per_tick(1024) + /// .build() + /// .unwrap(); + /// ``` + pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { + self.nevents = capacity; + self + } } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 2e54c8b..fef53ca 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -15,6 +15,10 @@ cfg_rt! { } struct Context { + /// Uniquely identifies the current thread + #[cfg(feature = "rt")] + thread_id: Cell<Option<ThreadId>>, + /// Handle to the runtime scheduler running on the current thread. #[cfg(feature = "rt")] handle: RefCell<Option<scheduler::Handle>>, @@ -46,6 +50,9 @@ struct Context { tokio_thread_local! { static CONTEXT: Context = { Context { + #[cfg(feature = "rt")] + thread_id: Cell::new(None), + /// Tracks the current runtime handle to use when spawning, /// accessing drivers, etc... #[cfg(feature = "rt")] @@ -82,10 +89,23 @@ pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, A } cfg_rt! { - use crate::runtime::TryCurrentError; + use crate::runtime::{ThreadId, TryCurrentError}; use std::fmt; + pub(crate) fn thread_id() -> Result<ThreadId, AccessError> { + CONTEXT.try_with(|ctx| { + match ctx.thread_id.get() { + Some(id) => id, + None => { + let id = ThreadId::next(); + ctx.thread_id.set(Some(id)); + id + } + } + }) + } + #[derive(Debug, Clone, Copy)] #[must_use] pub(crate) enum EnterRuntime { diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index 8f9c512..4fb6b87 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -36,11 +36,12 @@ pub(crate) struct Cfg { pub(crate) enable_time: bool, pub(crate) enable_pause_time: bool, pub(crate) start_paused: bool, + pub(crate) nevents: usize, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { - let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; + let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); @@ -135,12 +136,12 @@ cfg_io_driver! { Disabled(UnparkThread), } - fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { #[cfg(loom)] assert!(!enabled); let ret = if enabled { - let (io_driver, io_handle) = crate::runtime::io::Driver::new()?; + let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?; let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; let process_driver = create_process_driver(signal_driver); @@ -201,7 +202,7 @@ cfg_not_io_driver! { #[derive(Debug)] pub(crate) struct IoStack(ParkThread); - fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); Ok((IoStack(park_thread), unpark_thread, Default::default())) diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index da47ecb..c5dc65f 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -118,9 +118,9 @@ impl Handle { /// thread pool. The thread pool is then responsible for polling the future /// until it completes. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. /// /// See [module level][mod] documentation for more details. /// diff --git a/src/runtime/io/mod.rs b/src/runtime/io/mod.rs index 02039f2..2e578b6 100644 --- a/src/runtime/io/mod.rs +++ b/src/runtime/io/mod.rs @@ -60,6 +60,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, + is_shutdown: bool, } struct IoDispatcher { @@ -104,7 +105,7 @@ fn _assert_kinds() { impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. - pub(crate) fn new() -> io::Result<(Driver, Handle)> { + pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { let poll = mio::Poll::new()?; #[cfg(not(tokio_wasi))] let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; @@ -116,7 +117,7 @@ impl Driver { let driver = Driver { tick: 0, signal_ready: false, - events: mio::Events::with_capacity(1024), + events: mio::Events::with_capacity(nevents), poll, resources: slab, }; @@ -147,9 +148,8 @@ impl Driver { if handle.shutdown() { self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. And shutdown will clear all wakers. + // If a task is waiting on the I/O resource, notify it that the + // runtime is being shutdown. And shutdown will clear all wakers. io.shutdown(); }); } @@ -282,16 +282,12 @@ impl Handle { true } - fn is_shutdown(&self) -> bool { - return self.io_dispatch.read().unwrap().is_shutdown; - } - fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> { let io = self.io_dispatch.read().unwrap(); if io.is_shutdown { return Err(io::Error::new( io::ErrorKind::Other, - "failed to find event loop", + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, )); } io.allocator.allocate().ok_or_else(|| { diff --git a/src/runtime/io/registration.rs b/src/runtime/io/registration.rs index 7b95f7f..140b924 100644 --- a/src/runtime/io/registration.rs +++ b/src/runtime/io/registration.rs @@ -148,7 +148,7 @@ impl Registration { let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle().is_shutdown() { + if ev.is_shutdown { return Poll::Ready(Err(gone())); } @@ -217,28 +217,22 @@ impl Drop for Registration { } fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") + io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + ) } cfg_io_readiness! { impl Registration { pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { - use std::future::Future; - use std::pin::Pin; + let ev = self.shared.readiness(interest).await; - let fut = self.shared.readiness(interest); - pin!(fut); - - crate::future::poll_fn(|cx| { - if self.handle().is_shutdown() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ))); - } + if ev.is_shutdown { + return Err(gone()) + } - Pin::new(&mut fut).poll(cx).map(Ok) - }).await + Ok(ev) } pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { diff --git a/src/runtime/io/scheduled_io.rs b/src/runtime/io/scheduled_io.rs index 1709091..197a4e0 100644 --- a/src/runtime/io/scheduled_io.rs +++ b/src/runtime/io/scheduled_io.rs @@ -46,9 +46,6 @@ struct Waiters { /// Waker used for AsyncWrite. writer: Option<Waker>, - - /// True if this ScheduledIo has been killed due to IO driver shutdown. - is_shutdown: bool, } cfg_io_readiness! { @@ -95,7 +92,7 @@ cfg_io_readiness! { // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. // -// | reserved | generation | driver tick | readiness | +// | shutdown | generation | driver tick | readiness | // |----------+------------+--------------+-----------| // | 1 bit | 7 bits + 8 bits + 16 bits | @@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8); const GENERATION: bit::Pack = TICK.then(7); +const SHUTDOWN: bit::Pack = GENERATION.then(1); + #[test] fn test_generations_assert_same() { assert_eq!(super::GENERATION, GENERATION); @@ -138,9 +137,11 @@ impl ScheduledIo { } /// Invoked when the IO driver is shut down; forces this ScheduledIo into a - /// permanently ready state. + /// permanently shutdown state. pub(super) fn shutdown(&self) { - self.wake0(Ready::ALL, true) + let mask = SHUTDOWN.pack(1, 0); + self.readiness.fetch_or(mask, AcqRel); + self.wake(Ready::ALL); } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -219,16 +220,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { - self.wake0(ready, false); - } - - fn wake0(&self, ready: Ready, shutdown: bool) { let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); - waiters.is_shutdown |= shutdown; - // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -283,6 +278,7 @@ impl ScheduledIo { ReadyEvent { tick: TICK.unpack(curr) as u8, ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + is_shutdown: SHUTDOWN.unpack(curr) != 0, } } @@ -299,8 +295,9 @@ impl ScheduledIo { let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if ready.is_empty() { + if ready.is_empty() && !is_shutdown { // Update the task info let mut waiters = self.waiters.lock(); let slot = match direction { @@ -325,10 +322,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if waiters.is_shutdown { + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + if is_shutdown { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready: direction.mask(), + is_shutdown, }) } else if ready.is_empty() { Poll::Pending @@ -336,12 +335,14 @@ impl ScheduledIo { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } @@ -433,16 +434,17 @@ cfg_io_readiness! { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); let ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Wasn't ready, take the lock (and check again while locked). @@ -450,18 +452,19 @@ cfg_io_readiness! { let curr = scheduled_io.readiness.load(SeqCst); let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if waiters.is_shutdown { + if is_shutdown { ready = Ready::ALL; } let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Not ready even after locked, insert into list... @@ -514,6 +517,7 @@ cfg_io_readiness! { let w = unsafe { &mut *waiter.get() }; let curr = scheduled_io.readiness.load(Acquire); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // The returned tick might be newer than the event // which notified our waker. This is ok because the future @@ -528,6 +532,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, ready, + is_shutdown, }); } } diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs index f1c3fa6..4e6b28d 100644 --- a/src/runtime/metrics/batch.rs +++ b/src/runtime/metrics/batch.rs @@ -11,9 +11,12 @@ pub(crate) struct MetricsBatch { /// Number of times the worker woke w/o doing work. noop_count: u64, - /// Number of times stolen. + /// Number of tasks stolen. steal_count: u64, + /// Number of times tasks where stolen. + steal_operations: u64, + /// Number of tasks that were polled by the worker. poll_count: u64, @@ -39,6 +42,7 @@ impl MetricsBatch { park_count: 0, noop_count: 0, steal_count: 0, + steal_operations: 0, poll_count: 0, poll_count_on_last_park: 0, local_schedule_count: 0, @@ -52,6 +56,9 @@ impl MetricsBatch { worker.park_count.store(self.park_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); + worker + .steal_operations + .store(self.steal_operations, Relaxed); worker.poll_count.store(self.poll_count, Relaxed); worker @@ -98,6 +105,10 @@ cfg_rt_multi_thread! { self.steal_count += by as u64; } + pub(crate) fn incr_steal_operations(&mut self) { + self.steal_operations += 1; + } + pub(crate) fn incr_overflow_count(&mut self) { self.overflow_count += 1; } diff --git a/src/runtime/metrics/mock.rs b/src/runtime/metrics/mock.rs index 6b9cf70..c388dc0 100644 --- a/src/runtime/metrics/mock.rs +++ b/src/runtime/metrics/mock.rs @@ -38,6 +38,7 @@ impl MetricsBatch { cfg_rt_multi_thread! { impl MetricsBatch { pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + pub(crate) fn incr_steal_operations(&mut self) {} pub(crate) fn incr_overflow_count(&mut self) {} } } diff --git a/src/runtime/metrics/runtime.rs b/src/runtime/metrics/runtime.rs index dee14a4..d29cb3d 100644 --- a/src/runtime/metrics/runtime.rs +++ b/src/runtime/metrics/runtime.rs @@ -210,10 +210,57 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the number of tasks the given worker thread stole from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by `N` each time the worker has processed its scheduled queue + /// and successfully steals `N` more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_count(0); + /// println!("worker 0 has stolen {} tasks", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } + /// Returns the number of times the given worker thread stole tasks from /// another worker thread. /// - /// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime. + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. /// /// The worker steal count starts at zero when the runtime is created and /// increases by one each time the worker has processed its scheduled queue @@ -243,15 +290,15 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_noop_count(0); + /// let n = metrics.worker_steal_operations(0); /// println!("worker 0 has stolen tasks {} times", n); /// } /// ``` - pub fn worker_steal_count(&self, worker: usize) -> u64 { + pub fn worker_steal_operations(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .steal_count + .steal_operations .load(Relaxed) } @@ -328,8 +375,8 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_poll_count(0); - /// println!("worker 0 has polled {} tasks", n); + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); /// } /// ``` pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { diff --git a/src/runtime/metrics/worker.rs b/src/runtime/metrics/worker.rs index ec58de6..a40c76e 100644 --- a/src/runtime/metrics/worker.rs +++ b/src/runtime/metrics/worker.rs @@ -17,9 +17,12 @@ pub(crate) struct WorkerMetrics { /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: AtomicU64, - /// Number of times the worker attempted to steal. + /// Number of tasks the worker stole. pub(crate) steal_count: AtomicU64, + /// Number of times the worker stole + pub(crate) steal_operations: AtomicU64, + /// Number of tasks the worker polled. pub(crate) poll_count: AtomicU64, @@ -43,6 +46,7 @@ impl WorkerMetrics { park_count: AtomicU64::new(0), noop_count: AtomicU64::new(0), steal_count: AtomicU64::new(0), + steal_operations: AtomicU64::new(0), poll_count: AtomicU64::new(0), overflow_count: AtomicU64::new(0), busy_duration_total: AtomicU64::new(0), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 45b79b0..b6f43ea 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -237,6 +237,9 @@ cfg_rt! { mod runtime; pub use runtime::{Runtime, RuntimeFlavor}; + mod thread_id; + pub(crate) use thread_id::ThreadId; + cfg_metrics! { mod metrics; pub use metrics::RuntimeMetrics; diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs index 9ede0a7..1985673 100644 --- a/src/runtime/runtime.rs +++ b/src/runtime/runtime.rs @@ -163,9 +163,9 @@ impl Runtime { /// thread pool. The thread pool is then responsible for polling the future /// until it completes. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the - /// background immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. /// /// See [module level][mod] documentation for more details. /// diff --git a/src/runtime/scheduler/multi_thread/queue.rs b/src/runtime/scheduler/multi_thread/queue.rs index 59b448d..faf56db 100644 --- a/src/runtime/scheduler/multi_thread/queue.rs +++ b/src/runtime/scheduler/multi_thread/queue.rs @@ -263,7 +263,7 @@ impl<T> Local<T> { // safety: The CAS above ensures that no consumer will look at these // values again, and we are the only producer. let batch_iter = BatchTaskIter { - buffer: &*self.inner.buffer, + buffer: &self.inner.buffer, head: head as UnsignedLong, i: 0, }; @@ -353,6 +353,7 @@ impl<T> Steal<T> { } dst_metrics.incr_steal_count(n as u16); + dst_metrics.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index c079297..8e3c3d1 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -194,7 +194,7 @@ where TransitionToRunning::Success => { let header_ptr = self.header_ptr(); let waker_ref = waker_ref::<T, S>(&header_ptr); - let cx = Context::from_waker(&*waker_ref); + let cx = Context::from_waker(&waker_ref); let res = poll_future(self.core(), cx); if res == Poll::Ready(()) { @@ -315,9 +315,10 @@ where // this task. It is our responsibility to drop the // output. self.core().drop_future_or_output(); - } else if snapshot.has_join_waker() { - // Notify the join handle. The previous transition obtains the - // lock on the waker cell. + } else if snapshot.is_join_waker_set() { + // Notify the waker. Reading the waker field is safe per rule 4 + // in task/mod.rs, since the JOIN_WAKER bit is set and the call + // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); } })); @@ -367,36 +368,30 @@ fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { debug_assert!(snapshot.is_join_interested()); if !snapshot.is_complete() { - // The waker must be stored in the task struct. - let res = if snapshot.has_join_waker() { - // There already is a waker stored in the struct. If it matches - // the provided waker, then there is no further work to do. - // Otherwise, the waker must be swapped. - let will_wake = unsafe { - // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` - // may mutate the `waker` field. - trailer.will_wake(waker) - }; - - if will_wake { - // The task is not complete **and** the waker is up to date, - // there is nothing further that needs to be done. + // If the task is not complete, try storing the provided waker in the + // task's waker field. + + let res = if snapshot.is_join_waker_set() { + // If JOIN_WAKER is set, then JoinHandle has previously stored a + // waker in the waker field per step (iii) of rule 5 in task/mod.rs. + + // Optimization: if the stored waker and the provided waker wake the + // same task, then return without touching the waker field. (Reading + // the waker field below is safe per rule 3 in task/mod.rs.) + if unsafe { trailer.will_wake(waker) } { return false; } - // Unset the `JOIN_WAKER` to gain mutable access to the `waker` - // field then update the field with the new join worker. - // - // This requires two atomic operations, unsetting the bit and - // then resetting it. If the task transitions to complete - // concurrently to either one of those operations, then setting - // the join waker fails and we proceed to reading the task - // output. + // Otherwise swap the stored waker with the provided waker by + // following the rule 5 in task/mod.rs. header .state .unset_waker() .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) } else { + // If JOIN_WAKER is unset, then JoinHandle has mutable access to the + // waker field per rule 2 in task/mod.rs; therefore, skip step (i) + // of rule 5 and try to store the provided waker in the waker field. set_join_waker(header, trailer, waker.clone(), snapshot) }; @@ -417,7 +412,7 @@ fn set_join_waker( snapshot: Snapshot, ) -> Result<Snapshot, Snapshot> { assert!(snapshot.is_join_interested()); - assert!(!snapshot.has_join_waker()); + assert!(!snapshot.is_join_waker_set()); // Safety: Only the `JoinHandle` may set the `waker` field. When // `JOIN_INTEREST` is **not** set, nothing else will touch the field. diff --git a/src/runtime/task/id.rs b/src/runtime/task/id.rs new file mode 100644 index 0000000..2b0d95c --- /dev/null +++ b/src/runtime/task/id.rs @@ -0,0 +1,87 @@ +use crate::runtime::context; + +use std::fmt; + +/// An opaque ID that uniquely identifies a task relative to all other currently +/// running tasks. +/// +/// # Notes +/// +/// - Task IDs are unique relative to other *currently running* tasks. When a +/// task completes, the same ID may be used for another task. +/// - Task IDs are *not* sequential, and do not indicate the order in which +/// tasks are spawned, what runtime a task is spawned on, or any other data. +/// - The task ID of the currently running task can be obtained from inside the +/// task via the [`task::try_id()`](crate::task::try_id()) and +/// [`task::id()`](crate::task::id()) functions and from outside the task via +/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +pub struct Id(u64); + +/// Returns the [`Id`] of the currently running task. +/// +/// # Panics +/// +/// This function panics if called from outside a task. Please note that calls +/// to `block_on` do not have task IDs, so the method will panic if called from +/// within a call to `block_on`. For a version of this function that doesn't +/// panic, see [`task::try_id()`](crate::runtime::task::try_id()). +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn id() -> Id { + context::current_task_id().expect("Can't get a task id when not inside a task") +} + +/// Returns the [`Id`] of the currently running task, or `None` if called outside +/// of a task. +/// +/// This function is similar to [`task::id()`](crate::runtime::task::id()), except +/// that it returns `None` rather than panicking if called outside of a task +/// context. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn try_id() -> Option<Id> { + context::current_task_id() +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Id { + pub(crate) fn next() -> Self { + use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + + Self(NEXT_ID.fetch_add(1, Relaxed)) + } + + pub(crate) fn as_u64(&self) -> u64 { + self.0 + } +} diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs index 5660575..11c4b9b 100644 --- a/src/runtime/task/join.rs +++ b/src/runtime/task/join.rs @@ -11,9 +11,9 @@ cfg_rt! { /// An owned permission to join on a task (await its termination). /// /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] - /// for a Tokio task rather than a thread. You do not need to `.await` the - /// `JoinHandle` to make the task execute — it will start running in the - /// background immediately. + /// for a Tokio task rather than a thread. Note that the background task + /// associated with this `JoinHandle` started running immediately when you + /// called spawn, even if you have not yet awaited the `JoinHandle`. /// /// A `JoinHandle` *detaches* the associated task when it is dropped, which /// means that there is no longer any handle to the task, and no way to `join` diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index fea6e0f..55131ac 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -168,19 +168,20 @@ // unstable. This should be removed once `JoinSet` is stabilized. #![cfg_attr(not(tokio_unstable), allow(dead_code))] -use crate::runtime::context; - mod core; use self::core::Cell; use self::core::Header; mod error; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::error::JoinError; mod harness; use self::harness::Harness; +mod id; +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +pub use id::{id, try_id, Id}; + cfg_rt_multi_thread! { mod inject; pub(super) use self::inject::Inject; @@ -191,10 +192,8 @@ mod abort; mod join; #[cfg(feature = "rt")] -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::abort::AbortHandle; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; mod list; @@ -215,70 +214,6 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; -/// An opaque ID that uniquely identifies a task relative to all other currently -/// running tasks. -/// -/// # Notes -/// -/// - Task IDs are unique relative to other *currently running* tasks. When a -/// task completes, the same ID may be used for another task. -/// - Task IDs are *not* sequential, and do not indicate the order in which -/// tasks are spawned, what runtime a task is spawned on, or any other data. -/// - The task ID of the currently running task can be obtained from inside the -/// task via the [`task::try_id()`](crate::task::try_id()) and -/// [`task::id()`](crate::task::id()) functions and from outside the task via -/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] -pub struct Id(u64); - -/// Returns the [`Id`] of the currently running task. -/// -/// # Panics -/// -/// This function panics if called from outside a task. Please note that calls -/// to `block_on` do not have task IDs, so the method will panic if called from -/// within a call to `block_on`. For a version of this function that doesn't -/// panic, see [`task::try_id()`](crate::runtime::task::try_id()). -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [task ID]: crate::task::Id -/// [unstable]: crate#unstable-features -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[track_caller] -pub fn id() -> Id { - context::current_task_id().expect("Can't get a task id when not inside a task") -} - -/// Returns the [`Id`] of the currently running task, or `None` if called outside -/// of a task. -/// -/// This function is similar to [`task::id()`](crate::runtime::task::id()), except -/// that it returns `None` rather than panicking if called outside of a task -/// context. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [task ID]: crate::task::Id -/// [unstable]: crate#unstable-features -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[track_caller] -pub fn try_id() -> Option<Id> { - context::current_task_id() -} - /// An owned handle to the task, tracked by ref count. #[repr(transparent)] pub(crate) struct Task<S: 'static> { @@ -554,66 +489,3 @@ unsafe impl<S> linked_list::Link for Task<S> { self::core::Trailer::addr_of_owned(Header::get_trailer(target)) } } - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl Id { - // When 64-bit atomics are available, use a static `AtomicU64` counter to - // generate task IDs. - // - // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches - // between an atomic and mutex-based implementation here, rather than having - // two separate functions for targets with and without 64-bit atomics. - // However, because we can't use the mutex-based implementation in a static - // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I - // thought it was nicer to avoid the `OnceCell` overhead on 64-bit - // platforms... - cfg_has_atomic_u64! { - pub(crate) fn next() -> Self { - use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - Self(NEXT_ID.fetch_add(1, Relaxed)) - } - } - - cfg_not_has_atomic_u64! { - cfg_has_const_mutex_new! { - pub(crate) fn next() -> Self { - use crate::loom::sync::Mutex; - static NEXT_ID: Mutex<u64> = Mutex::const_new(1); - - let mut lock = NEXT_ID.lock(); - let id = *lock; - *lock += 1; - Self(id) - } - } - - cfg_not_has_const_mutex_new! { - pub(crate) fn next() -> Self { - use crate::util::once_cell::OnceCell; - use crate::loom::sync::Mutex; - - fn init_next_id() -> Mutex<u64> { - Mutex::new(1) - } - - static NEXT_ID: OnceCell<Mutex<u64>> = OnceCell::new(); - - let next_id = NEXT_ID.get(init_next_id); - let mut lock = next_id.lock(); - let id = *lock; - *lock += 1; - Self(id) - } - } - } - - pub(crate) fn as_u64(&self) -> u64 { - self.0 - } -} diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs index c2d5b28..7728312 100644 --- a/src/runtime/task/state.rs +++ b/src/runtime/task/state.rs @@ -378,7 +378,7 @@ impl State { pub(super) fn set_join_waker(&self) -> UpdateResult { self.fetch_update(|curr| { assert!(curr.is_join_interested()); - assert!(!curr.has_join_waker()); + assert!(!curr.is_join_waker_set()); if curr.is_complete() { return None; @@ -398,7 +398,7 @@ impl State { pub(super) fn unset_waker(&self) -> UpdateResult { self.fetch_update(|curr| { assert!(curr.is_join_interested()); - assert!(curr.has_join_waker()); + assert!(curr.is_join_waker_set()); if curr.is_complete() { return None; @@ -546,7 +546,7 @@ impl Snapshot { self.0 &= !JOIN_INTEREST } - pub(super) fn has_join_waker(self) -> bool { + pub(super) fn is_join_waker_set(self) -> bool { self.0 & JOIN_WAKER == JOIN_WAKER } @@ -588,7 +588,7 @@ impl fmt::Debug for Snapshot { .field("is_notified", &self.is_notified()) .field("is_cancelled", &self.is_cancelled()) .field("is_join_interested", &self.is_join_interested()) - .field("has_join_waker", &self.has_join_waker()) + .field("is_join_waker_set", &self.is_join_waker_set()) .field("ref_count", &self.ref_count()) .finish() } diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs index 89de85e..5c4aeae 100644 --- a/src/runtime/tests/loom_blocking.rs +++ b/src/runtime/tests/loom_blocking.rs @@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread }); } +#[test] +fn spawn_blocking_when_paused() { + use std::time::Duration; + loom::model(|| { + let rt = crate::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + let handle = rt.handle(); + let _enter = handle.enter(); + let a = crate::task::spawn_blocking(|| {}); + let b = crate::task::spawn_blocking(|| {}); + rt.block_on(crate::time::timeout(Duration::from_millis(1), async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + })) + .expect("timeout should not trigger"); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs index 8d4e1d3..fc93bf3 100644 --- a/src/runtime/tests/loom_queue.rs +++ b/src/runtime/tests/loom_queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; +use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch; use loom::thread; diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs index 1c67dfe..4e7c245 100644 --- a/src/runtime/tests/mod.rs +++ b/src/runtime/tests/mod.rs @@ -2,11 +2,29 @@ // other code when running loom tests. #![cfg_attr(loom, warn(dead_code, unreachable_pub))] +use self::noop_scheduler::NoopSchedule; use self::unowned_wrapper::unowned; +mod noop_scheduler { + use crate::runtime::task::{self, Task}; + + /// `task::Schedule` implementation that does nothing, for testing. + pub(crate) struct NoopSchedule; + + impl task::Schedule for NoopSchedule { + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } + } +} + mod unowned_wrapper { - use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{Id, JoinHandle, Notified}; + use crate::runtime::tests::NoopSchedule; #[cfg(all(tokio_unstable, feature = "tracing"))] pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>) diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs index 173e5b0..a79c0f5 100644 --- a/src/runtime/tests/task.rs +++ b/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; diff --git a/src/runtime/thread_id.rs b/src/runtime/thread_id.rs new file mode 100644 index 0000000..ef39289 --- /dev/null +++ b/src/runtime/thread_id.rs @@ -0,0 +1,31 @@ +use std::num::NonZeroU64; + +#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)] +pub(crate) struct ThreadId(NonZeroU64); + +impl ThreadId { + pub(crate) fn next() -> Self { + use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0); + + let mut last = NEXT_ID.load(Relaxed); + loop { + let id = match last.checked_add(1) { + Some(id) => id, + None => exhausted(), + }; + + match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) { + Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()), + Err(id) => last = id, + } + } + } +} + +#[cold] +#[allow(dead_code)] +fn exhausted() -> ! { + panic!("failed to generate unique thread ID: bitspace exhausted") +} diff --git a/src/runtime/time/mod.rs b/src/runtime/time/mod.rs index 240f8f1..f81cab8 100644 --- a/src/runtime/time/mod.rs +++ b/src/runtime/time/mod.rs @@ -222,7 +222,7 @@ impl Driver { let handle = rt_handle.time(); let clock = &handle.time_source.clock; - if clock.is_paused() { + if clock.can_auto_advance() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed |