diff options
Diffstat (limited to 'src/runtime/basic_scheduler.rs')
-rw-r--r-- | src/runtime/basic_scheduler.rs | 190 |
1 files changed, 148 insertions, 42 deletions
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs index 7e1c257..5ca8467 100644 --- a/src/runtime/basic_scheduler.rs +++ b/src/runtime/basic_scheduler.rs @@ -1,22 +1,35 @@ +use crate::future::poll_fn; +use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; -use crate::util::linked_list::LinkedList; -use crate::util::{waker_ref, Wake}; +use crate::sync::notify::Notify; +use crate::util::linked_list::{Link, LinkedList}; +use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; -use std::task::Poll::Ready; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler<P> -where - P: Park, -{ +pub(crate) struct BasicScheduler<P: Park> { + /// Inner state guarded by a mutex that is shared + /// between all `block_on` calls. + inner: Mutex<Option<Inner<P>>>, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, + + /// Sendable task spawner + spawner: Spawner, +} + +/// The inner scheduler that owns the task queue and the main parker P. +struct Inner<P: Park> { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -42,7 +55,7 @@ pub(crate) struct Spawner { struct Tasks { /// Collection of all active tasks spawned onto this executor. - owned: LinkedList<Task<Arc<Shared>>>, + owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>, /// Local run queue. /// @@ -59,7 +72,7 @@ struct Shared { unpark: Box<dyn Unpark>, } -/// Thread-local context +/// Thread-local context. struct Context { /// Shared scheduler state shared: Arc<Shared>, @@ -68,38 +81,43 @@ struct Context { tasks: RefCell<Tasks>, } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often ot check the remote queue first +/// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -// Tracks the current BasicScheduler +// Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl<P> BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> BasicScheduler<P> { pub(crate) fn new(park: P) -> BasicScheduler<P> { let unpark = Box::new(park.unpark()); - BasicScheduler { + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }; + + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - spawner: Spawner { - shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), - unpark: unpark as Box<dyn Unpark>, - }), - }, + spawner: spawner.clone(), tick: 0, park, + })); + + BasicScheduler { + inner, + notify: Notify::new(), + spawner, } } @@ -116,13 +134,57 @@ where self.spawner.spawn(future) } - pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output - where - F: Future, - { + pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(inner) = &mut self.take_inner() { + return inner.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } + } + + fn take_inner(&self) -> Option<InnerGuard<'_, P>> { + let inner = self.inner.lock().take()?; + + Some(InnerGuard { + inner: Some(inner), + basic_scheduler: &self, + }) + } +} + +impl<P: Park> Inner<P> { + /// Block on the future provided and drive the runtime's driver. + fn block_on<F: Future>(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { - let _enter = runtime::enter(false); - let waker = waker_ref(&scheduler.spawner.shared); + let _enter = crate::runtime::enter(false); + let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); @@ -177,16 +239,16 @@ where /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where - F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + F: FnOnce(&mut Inner<P>, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option<Context>, - scheduler: &'a mut BasicScheduler<P>, + scheduler: &'a mut Inner<P>, } impl<P: Park> Drop for Guard<'_, P> { @@ -213,12 +275,18 @@ where CURRENT.set(context, || f(scheduler, context)) } -impl<P> Drop for BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> Drop for BasicScheduler<P> { fn drop(&mut self) { - enter(self, |scheduler, context| { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let mut inner = match self.inner.lock().take() { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), + }; + + enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { @@ -236,7 +304,7 @@ where } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + for task in scheduler.spawner.shared.queue.lock().drain(..) { task.shutdown(); } @@ -266,7 +334,11 @@ impl Spawner { } fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { - self.shared.queue.lock().unwrap().pop_front() + self.shared.queue.lock().pop_front() + } + + fn waker_ref(&self) -> WakerRef<'_> { + waker_ref(&self.shared) } } @@ -307,7 +379,7 @@ impl Schedule for Arc<Shared> { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); + self.queue.lock().push_back(task); self.unpark.unpark(); } }); @@ -324,3 +396,37 @@ impl Wake for Shared { arc_self.unpark.unpark(); } } + +// ===== InnerGuard ===== + +/// Used to ensure we always place the Inner value +/// back into its slot in `BasicScheduler`, even if the +/// future panics. +struct InnerGuard<'a, P: Park> { + inner: Option<Inner<P>>, + basic_scheduler: &'a BasicScheduler<P>, +} + +impl<P: Park> InnerGuard<'_, P> { + fn block_on<F: Future>(&mut self, future: F) -> F::Output { + // The only time inner gets set to `None` is if we have dropped + // already so this unwrap is safe. + self.inner.as_mut().unwrap().block_on(future) + } +} + +impl<P: Park> Drop for InnerGuard<'_, P> { + fn drop(&mut self) { + if let Some(scheduler) = self.inner.take() { + let mut lock = self.basic_scheduler.inner.lock(); + + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + lock.replace(scheduler); + + // Wake up other possible threads that could steal + // the dedicated parker P. + self.basic_scheduler.notify.notify_one() + } + } +} |