aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/basic_scheduler.rs')
-rw-r--r--src/runtime/basic_scheduler.rs519
1 files changed, 0 insertions, 519 deletions
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs
deleted file mode 100644
index 872d0d5..0000000
--- a/src/runtime/basic_scheduler.rs
+++ /dev/null
@@ -1,519 +0,0 @@
-use crate::future::poll_fn;
-use crate::loom::sync::atomic::AtomicBool;
-use crate::loom::sync::Mutex;
-use crate::park::{Park, Unpark};
-use crate::runtime::context::EnterGuard;
-use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
-use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
-use crate::runtime::Callback;
-use crate::sync::notify::Notify;
-use crate::util::{waker_ref, Wake, WakerRef};
-
-use std::cell::RefCell;
-use std::collections::VecDeque;
-use std::fmt;
-use std::future::Future;
-use std::sync::atomic::Ordering::{AcqRel, Release};
-use std::sync::Arc;
-use std::task::Poll::{Pending, Ready};
-use std::time::Duration;
-
-/// Executes tasks on the current thread
-pub(crate) struct BasicScheduler<P: Park> {
- /// Inner state guarded by a mutex that is shared
- /// between all `block_on` calls.
- inner: Mutex<Option<Inner<P>>>,
-
- /// Notifier for waking up other threads to steal the
- /// parker.
- notify: Notify,
-
- /// Sendable task spawner
- spawner: Spawner,
-
- /// This is usually None, but right before dropping the BasicScheduler, it
- /// is changed to `Some` with the context being the runtime's own context.
- /// This ensures that any tasks dropped in the `BasicScheduler`s destructor
- /// run in that runtime's context.
- context_guard: Option<EnterGuard>,
-}
-
-/// The inner scheduler that owns the task queue and the main parker P.
-struct Inner<P: Park> {
- /// Scheduler run queue
- ///
- /// When the scheduler is executed, the queue is removed from `self` and
- /// moved into `Context`.
- ///
- /// This indirection is to allow `BasicScheduler` to be `Send`.
- tasks: Option<Tasks>,
-
- /// Sendable task spawner
- spawner: Spawner,
-
- /// Current tick
- tick: u8,
-
- /// Thread park handle
- park: P,
-
- /// Callback for a worker parking itself
- before_park: Option<Callback>,
- /// Callback for a worker unparking itself
- after_unpark: Option<Callback>,
-
- /// Stats batcher
- stats: WorkerStatsBatcher,
-}
-
-#[derive(Clone)]
-pub(crate) struct Spawner {
- shared: Arc<Shared>,
-}
-
-struct Tasks {
- /// Local run queue.
- ///
- /// Tasks notified from the current thread are pushed into this queue.
- queue: VecDeque<task::Notified<Arc<Shared>>>,
-}
-
-/// A remote scheduler entry.
-///
-/// These are filled in by remote threads sending instructions to the scheduler.
-enum RemoteMsg {
- /// A remote thread wants to spawn a task.
- Schedule(task::Notified<Arc<Shared>>),
-}
-
-// Safety: Used correctly, the task header is "thread safe". Ultimately the task
-// is owned by the current thread executor, for which this instruction is being
-// sent.
-unsafe impl Send for RemoteMsg {}
-
-/// Scheduler state shared between threads.
-struct Shared {
- /// Remote run queue. None if the `Runtime` has been dropped.
- queue: Mutex<Option<VecDeque<RemoteMsg>>>,
-
- /// Collection of all active tasks spawned onto this executor.
- owned: OwnedTasks<Arc<Shared>>,
-
- /// Unpark the blocked thread.
- unpark: Box<dyn Unpark>,
-
- /// Indicates whether the blocked on thread was woken.
- woken: AtomicBool,
-
- /// Keeps track of various runtime stats.
- stats: RuntimeStats,
-}
-
-/// Thread-local context.
-struct Context {
- /// Shared scheduler state
- shared: Arc<Shared>,
-
- /// Local queue
- tasks: RefCell<Tasks>,
-}
-
-/// Initial queue capacity.
-const INITIAL_CAPACITY: usize = 64;
-
-/// Max number of tasks to poll per tick.
-#[cfg(loom)]
-const MAX_TASKS_PER_TICK: usize = 4;
-#[cfg(not(loom))]
-const MAX_TASKS_PER_TICK: usize = 61;
-
-/// How often to check the remote queue first.
-const REMOTE_FIRST_INTERVAL: u8 = 31;
-
-// Tracks the current BasicScheduler.
-scoped_thread_local!(static CURRENT: Context);
-
-impl<P: Park> BasicScheduler<P> {
- pub(crate) fn new(
- park: P,
- before_park: Option<Callback>,
- after_unpark: Option<Callback>,
- ) -> BasicScheduler<P> {
- let unpark = Box::new(park.unpark());
-
- let spawner = Spawner {
- shared: Arc::new(Shared {
- queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
- owned: OwnedTasks::new(),
- unpark: unpark as Box<dyn Unpark>,
- woken: AtomicBool::new(false),
- stats: RuntimeStats::new(1),
- }),
- };
-
- let inner = Mutex::new(Some(Inner {
- tasks: Some(Tasks {
- queue: VecDeque::with_capacity(INITIAL_CAPACITY),
- }),
- spawner: spawner.clone(),
- tick: 0,
- park,
- before_park,
- after_unpark,
- stats: WorkerStatsBatcher::new(0),
- }));
-
- BasicScheduler {
- inner,
- notify: Notify::new(),
- spawner,
- context_guard: None,
- }
- }
-
- pub(crate) fn spawner(&self) -> &Spawner {
- &self.spawner
- }
-
- pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
- pin!(future);
-
- // Attempt to steal the dedicated parker and block_on the future if we can there,
- // otherwise, lets select on a notification that the parker is available
- // or the future is complete.
- loop {
- if let Some(inner) = &mut self.take_inner() {
- return inner.block_on(future);
- } else {
- let mut enter = crate::runtime::enter(false);
-
- let notified = self.notify.notified();
- pin!(notified);
-
- if let Some(out) = enter
- .block_on(poll_fn(|cx| {
- if notified.as_mut().poll(cx).is_ready() {
- return Ready(None);
- }
-
- if let Ready(out) = future.as_mut().poll(cx) {
- return Ready(Some(out));
- }
-
- Pending
- }))
- .expect("Failed to `Enter::block_on`")
- {
- return out;
- }
- }
- }
- }
-
- fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
- let inner = self.inner.lock().take()?;
-
- Some(InnerGuard {
- inner: Some(inner),
- basic_scheduler: self,
- })
- }
-
- pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
- self.context_guard = Some(guard);
- }
-}
-
-impl<P: Park> Inner<P> {
- /// Blocks on the provided future and drives the runtime's driver.
- fn block_on<F: Future>(&mut self, future: F) -> F::Output {
- enter(self, |scheduler, context| {
- let _enter = crate::runtime::enter(false);
- let waker = scheduler.spawner.waker_ref();
- let mut cx = std::task::Context::from_waker(&waker);
-
- pin!(future);
-
- 'outer: loop {
- if scheduler.spawner.reset_woken() {
- scheduler.stats.incr_poll_count();
- if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
- return v;
- }
- }
-
- for _ in 0..MAX_TASKS_PER_TICK {
- // Get and increment the current tick
- let tick = scheduler.tick;
- scheduler.tick = scheduler.tick.wrapping_add(1);
-
- let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
- scheduler.spawner.pop().or_else(|| {
- context
- .tasks
- .borrow_mut()
- .queue
- .pop_front()
- .map(RemoteMsg::Schedule)
- })
- } else {
- context
- .tasks
- .borrow_mut()
- .queue
- .pop_front()
- .map(RemoteMsg::Schedule)
- .or_else(|| scheduler.spawner.pop())
- };
-
- let entry = match entry {
- Some(entry) => entry,
- None => {
- if let Some(f) = &scheduler.before_park {
- f();
- }
- // This check will fail if `before_park` spawns a task for us to run
- // instead of parking the thread
- if context.tasks.borrow_mut().queue.is_empty() {
- // Park until the thread is signaled
- scheduler.stats.about_to_park();
- scheduler.stats.submit(&scheduler.spawner.shared.stats);
- scheduler.park.park().expect("failed to park");
- scheduler.stats.returned_from_park();
- }
- if let Some(f) = &scheduler.after_unpark {
- f();
- }
-
- // Try polling the `block_on` future next
- continue 'outer;
- }
- };
-
- match entry {
- RemoteMsg::Schedule(task) => {
- scheduler.stats.incr_poll_count();
- let task = context.shared.owned.assert_owner(task);
- crate::coop::budget(|| task.run())
- }
- }
- }
-
- // Yield to the park, this drives the timer and pulls any pending
- // I/O events.
- scheduler.stats.submit(&scheduler.spawner.shared.stats);
- scheduler
- .park
- .park_timeout(Duration::from_millis(0))
- .expect("failed to park");
- }
- })
- }
-}
-
-/// Enters the scheduler context. This sets the queue and other necessary
-/// scheduler state in the thread-local.
-fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
-where
- F: FnOnce(&mut Inner<P>, &Context) -> R,
- P: Park,
-{
- // Ensures the run queue is placed back in the `BasicScheduler` instance
- // once `block_on` returns.`
- struct Guard<'a, P: Park> {
- context: Option<Context>,
- scheduler: &'a mut Inner<P>,
- }
-
- impl<P: Park> Drop for Guard<'_, P> {
- fn drop(&mut self) {
- let Context { tasks, .. } = self.context.take().expect("context missing");
- self.scheduler.tasks = Some(tasks.into_inner());
- }
- }
-
- // Remove `tasks` from `self` and place it in a `Context`.
- let tasks = scheduler.tasks.take().expect("invalid state");
-
- let guard = Guard {
- context: Some(Context {
- shared: scheduler.spawner.shared.clone(),
- tasks: RefCell::new(tasks),
- }),
- scheduler,
- };
-
- let context = guard.context.as_ref().unwrap();
- let scheduler = &mut *guard.scheduler;
-
- CURRENT.set(context, || f(scheduler, context))
-}
-
-impl<P: Park> Drop for BasicScheduler<P> {
- fn drop(&mut self) {
- // Avoid a double panic if we are currently panicking and
- // the lock may be poisoned.
-
- let mut inner = match self.inner.lock().take() {
- Some(inner) => inner,
- None if std::thread::panicking() => return,
- None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
- };
-
- enter(&mut inner, |scheduler, context| {
- // Drain the OwnedTasks collection. This call also closes the
- // collection, ensuring that no tasks are ever pushed after this
- // call returns.
- context.shared.owned.close_and_shutdown_all();
-
- // Drain local queue
- // We already shut down every task, so we just need to drop the task.
- for task in context.tasks.borrow_mut().queue.drain(..) {
- drop(task);
- }
-
- // Drain remote queue and set it to None
- let remote_queue = scheduler.spawner.shared.queue.lock().take();
-
- // Using `Option::take` to replace the shared queue with `None`.
- // We already shut down every task, so we just need to drop the task.
- if let Some(remote_queue) = remote_queue {
- for entry in remote_queue {
- match entry {
- RemoteMsg::Schedule(task) => {
- drop(task);
- }
- }
- }
- }
-
- assert!(context.shared.owned.is_empty());
- });
- }
-}
-
-impl<P: Park> fmt::Debug for BasicScheduler<P> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("BasicScheduler").finish()
- }
-}
-
-// ===== impl Spawner =====
-
-impl Spawner {
- /// Spawns a future onto the basic scheduler
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: crate::future::Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let (handle, notified) = self.shared.owned.bind(future, self.shared.clone());
-
- if let Some(notified) = notified {
- self.shared.schedule(notified);
- }
-
- handle
- }
-
- pub(crate) fn stats(&self) -> &RuntimeStats {
- &self.shared.stats
- }
-
- fn pop(&self) -> Option<RemoteMsg> {
- match self.shared.queue.lock().as_mut() {
- Some(queue) => queue.pop_front(),
- None => None,
- }
- }
-
- fn waker_ref(&self) -> WakerRef<'_> {
- // Set woken to true when enter block_on, ensure outer future
- // be polled for the first time when enter loop
- self.shared.woken.store(true, Release);
- waker_ref(&self.shared)
- }
-
- // reset woken to false and return original value
- pub(crate) fn reset_woken(&self) -> bool {
- self.shared.woken.swap(false, AcqRel)
- }
-}
-
-impl fmt::Debug for Spawner {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Spawner").finish()
- }
-}
-
-// ===== impl Shared =====
-
-impl Schedule for Arc<Shared> {
- fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
- self.owned.remove(task)
- }
-
- fn schedule(&self, task: task::Notified<Self>) {
- CURRENT.with(|maybe_cx| match maybe_cx {
- Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
- cx.tasks.borrow_mut().queue.push_back(task);
- }
- _ => {
- // If the queue is None, then the runtime has shut down. We
- // don't need to do anything with the notification in that case.
- let mut guard = self.queue.lock();
- if let Some(queue) = guard.as_mut() {
- queue.push_back(RemoteMsg::Schedule(task));
- drop(guard);
- self.unpark.unpark();
- }
- }
- });
- }
-}
-
-impl Wake for Shared {
- fn wake(self: Arc<Self>) {
- Wake::wake_by_ref(&self)
- }
-
- /// Wake by reference
- fn wake_by_ref(arc_self: &Arc<Self>) {
- arc_self.woken.store(true, Release);
- arc_self.unpark.unpark();
- }
-}
-
-// ===== InnerGuard =====
-
-/// Used to ensure we always place the Inner value
-/// back into its slot in `BasicScheduler`, even if the
-/// future panics.
-struct InnerGuard<'a, P: Park> {
- inner: Option<Inner<P>>,
- basic_scheduler: &'a BasicScheduler<P>,
-}
-
-impl<P: Park> InnerGuard<'_, P> {
- fn block_on<F: Future>(&mut self, future: F) -> F::Output {
- // The only time inner gets set to `None` is if we have dropped
- // already so this unwrap is safe.
- self.inner.as_mut().unwrap().block_on(future)
- }
-}
-
-impl<P: Park> Drop for InnerGuard<'_, P> {
- fn drop(&mut self) {
- if let Some(scheduler) = self.inner.take() {
- let mut lock = self.basic_scheduler.inner.lock();
-
- // Replace old scheduler back into the state to allow
- // other threads to pick it up and drive it.
- lock.replace(scheduler);
-
- // Wake up other possible threads that could steal
- // the dedicated parker P.
- self.basic_scheduler.notify.notify_one()
- }
- }
-}