aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/blocking
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
commitf03b3ba785a6d336884bfc525046906f8c2a9904 (patch)
tree14e2bd707d8d152ea0476ec9e686deb2a2f55b34 /src/runtime/blocking
parent40b8b369b069afb314a9d4bb92be1bdd038979f8 (diff)
downloadtokio-f03b3ba785a6d336884bfc525046906f8c2a9904.tar.gz
Import tokio-0.2.22
Test: None Change-Id: Iea7ee5e62819c9b16dbfad05a6146775df72506a
Diffstat (limited to 'src/runtime/blocking')
-rw-r--r--src/runtime/blocking/mod.rs43
-rw-r--r--src/runtime/blocking/pool.rs307
-rw-r--r--src/runtime/blocking/schedule.rs24
-rw-r--r--src/runtime/blocking/shutdown.rs68
-rw-r--r--src/runtime/blocking/task.rs43
5 files changed, 485 insertions, 0 deletions
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs
new file mode 100644
index 0000000..0b36a75
--- /dev/null
+++ b/src/runtime/blocking/mod.rs
@@ -0,0 +1,43 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking
+//! pool. When the `blocking` feature flag is **not** enabled, these APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+cfg_blocking_impl! {
+ mod pool;
+ pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
+
+ mod schedule;
+ mod shutdown;
+ pub(crate) mod task;
+
+ use crate::runtime::Builder;
+
+ pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
+
+ }
+}
+
+cfg_not_blocking_impl! {
+ use crate::runtime::Builder;
+ use std::time::Duration;
+
+ #[derive(Debug, Clone)]
+ pub(crate) struct BlockingPool {}
+
+ pub(crate) use BlockingPool as Spawner;
+
+ pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
+ BlockingPool {}
+ }
+
+ impl BlockingPool {
+ pub(crate) fn spawner(&self) -> &BlockingPool {
+ self
+ }
+
+ pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
+ }
+ }
+}
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
new file mode 100644
index 0000000..40d417b
--- /dev/null
+++ b/src/runtime/blocking/pool.rs
@@ -0,0 +1,307 @@
+//! Thread pool for blocking operations
+
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::shutdown;
+use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::task::{self, JoinHandle};
+use crate::runtime::{Builder, Callback, Handle};
+
+use std::collections::VecDeque;
+use std::fmt;
+use std::time::Duration;
+
+pub(crate) struct BlockingPool {
+ spawner: Spawner,
+ shutdown_rx: shutdown::Receiver,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// State shared between worker threads
+ shared: Mutex<Shared>,
+
+ /// Pool threads wait on this.
+ condvar: Condvar,
+
+ /// Spawned threads use this name
+ thread_name: String,
+
+ /// Spawned thread stack size
+ stack_size: Option<usize>,
+
+ /// Call after a thread starts
+ after_start: Option<Callback>,
+
+ /// Call before a thread stops
+ before_stop: Option<Callback>,
+
+ thread_cap: usize,
+}
+
+struct Shared {
+ queue: VecDeque<Task>,
+ num_th: usize,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+ shutdown_tx: Option<shutdown::Sender>,
+}
+
+type Task = task::Notified<NoopSchedule>;
+
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Run the provided function on an executor dedicated to blocking operations.
+pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ let rt = Handle::current();
+
+ let (task, handle) = task::joinable(BlockingTask::new(func));
+ let _ = rt.blocking_spawner.spawn(task, &rt);
+ handle
+}
+
+#[allow(dead_code)]
+pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ let rt = Handle::current();
+
+ let (task, _handle) = task::joinable(BlockingTask::new(func));
+ rt.blocking_spawner.spawn(task, &rt)
+}
+
+// ===== impl BlockingPool =====
+
+impl BlockingPool {
+ pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ BlockingPool {
+ spawner: Spawner {
+ inner: Arc::new(Inner {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ shutdown_tx: Some(shutdown_tx),
+ }),
+ condvar: Condvar::new(),
+ thread_name: builder.thread_name.clone(),
+ stack_size: builder.thread_stack_size,
+ after_start: builder.after_start.clone(),
+ before_stop: builder.before_stop.clone(),
+ thread_cap,
+ }),
+ },
+ shutdown_rx,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
+ let mut shared = self.spawner.inner.shared.lock().unwrap();
+
+ // The function can be called multiple times. First, by explicitly
+ // calling `shutdown` then by the drop handler calling `shutdown`. This
+ // prevents shutting down twice.
+ if shared.shutdown {
+ return;
+ }
+
+ shared.shutdown = true;
+ shared.shutdown_tx = None;
+ self.spawner.inner.condvar.notify_all();
+
+ drop(shared);
+
+ self.shutdown_rx.wait(timeout);
+ }
+}
+
+impl Drop for BlockingPool {
+ fn drop(&mut self) {
+ self.shutdown(None);
+ }
+}
+
+impl fmt::Debug for BlockingPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BlockingPool").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
+ let shutdown_tx = {
+ let mut shared = self.inner.shared.lock().unwrap();
+
+ if shared.shutdown {
+ // Shutdown the task
+ task.shutdown();
+
+ // no need to even push this task; it would never get picked up
+ return Err(());
+ }
+
+ shared.queue.push_back(task);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == self.inner.thread_cap {
+ // At max number of threads
+ None
+ } else {
+ shared.num_th += 1;
+ assert!(shared.shutdown_tx.is_some());
+ shared.shutdown_tx.clone()
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ self.inner.condvar.notify_one();
+ None
+ }
+ };
+
+ if let Some(shutdown_tx) = shutdown_tx {
+ self.spawn_thread(shutdown_tx, rt);
+ }
+
+ Ok(())
+ }
+
+ fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
+ let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
+
+ if let Some(stack_size) = self.inner.stack_size {
+ builder = builder.stack_size(stack_size);
+ }
+
+ let rt = rt.clone();
+
+ builder
+ .spawn(move || {
+ // Only the reference should be moved into the closure
+ let rt = &rt;
+ rt.enter(move || {
+ rt.blocking_spawner.inner.run();
+ drop(shutdown_tx);
+ })
+ })
+ .unwrap();
+ }
+}
+
+impl Inner {
+ fn run(&self) {
+ if let Some(f) = &self.after_start {
+ f()
+ }
+
+ let mut shared = self.shared.lock().unwrap();
+
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.run();
+
+ shared = self.shared.lock().unwrap();
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ // Even if the condvar "timed out", if the pool is entering the
+ // shutdown phase, we want to perform the cleanup logic.
+ if !shared.shutdown && timeout_result.timed_out() {
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Drain the queue
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.shutdown();
+
+ shared = self.shared.lock().unwrap();
+ }
+
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ self.condvar.notify_one();
+ }
+
+ drop(shared);
+
+ if let Some(f) = &self.before_stop {
+ f()
+ }
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("blocking::Spawner").finish()
+ }
+}
diff --git a/src/runtime/blocking/schedule.rs b/src/runtime/blocking/schedule.rs
new file mode 100644
index 0000000..4e044ab
--- /dev/null
+++ b/src/runtime/blocking/schedule.rs
@@ -0,0 +1,24 @@
+use crate::runtime::task::{self, Task};
+
+/// `task::Schedule` implementation that does nothing. This is unique to the
+/// blocking scheduler as tasks scheduled are not really futures but blocking
+/// operations.
+///
+/// We avoid storing the task by forgetting it in `bind` and re-materializing it
+/// in `release.
+pub(crate) struct NoopSchedule;
+
+impl task::Schedule for NoopSchedule {
+ fn bind(_task: Task<Self>) -> NoopSchedule {
+ // Do nothing w/ the task
+ NoopSchedule
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs
new file mode 100644
index 0000000..e76a701
--- /dev/null
+++ b/src/runtime/blocking/shutdown.rs
@@ -0,0 +1,68 @@
+//! A shutdown channel.
+//!
+//! Each worker holds the `Sender` half. When all the `Sender` halves are
+//! dropped, the `Receiver` receives a notification.
+
+use crate::loom::sync::Arc;
+use crate::sync::oneshot;
+
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+pub(super) struct Sender {
+ tx: Arc<oneshot::Sender<()>>,
+}
+
+#[derive(Debug)]
+pub(super) struct Receiver {
+ rx: oneshot::Receiver<()>,
+}
+
+pub(super) fn channel() -> (Sender, Receiver) {
+ let (tx, rx) = oneshot::channel();
+ let tx = Sender { tx: Arc::new(tx) };
+ let rx = Receiver { rx };
+
+ (tx, rx)
+}
+
+impl Receiver {
+ /// Blocks the current thread until all `Sender` handles drop.
+ ///
+ /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
+ /// duration. If `timeout` is `None`, then the thread is blocked until the
+ /// shutdown signal is received.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
+ use crate::runtime::enter::try_enter;
+
+ if timeout == Some(Duration::from_nanos(0)) {
+ return;
+ }
+
+ let mut e = match try_enter(false) {
+ Some(enter) => enter,
+ _ => {
+ if std::thread::panicking() {
+ // Don't panic in a panic
+ return;
+ } else {
+ panic!(
+ "Cannot drop a runtime in a context where blocking is not allowed. \
+ This happens when a runtime is dropped from within an asynchronous context."
+ );
+ }
+ }
+ };
+
+ // The oneshot completes with an Err
+ //
+ // If blocking fails to wait, this indicates a problem parking the
+ // current thread (usually, shutting down a runtime stored in a
+ // thread-local).
+ if let Some(timeout) = timeout {
+ let _ = e.block_on_timeout(&mut self.rx, timeout);
+ } else {
+ let _ = e.block_on(&mut self.rx);
+ }
+ }
+}
diff --git a/src/runtime/blocking/task.rs b/src/runtime/blocking/task.rs
new file mode 100644
index 0000000..a521af4
--- /dev/null
+++ b/src/runtime/blocking/task.rs
@@ -0,0 +1,43 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Converts a function to a future that completes on poll
+pub(crate) struct BlockingTask<T> {
+ func: Option<T>,
+}
+
+impl<T> BlockingTask<T> {
+ /// Initializes a new blocking task from the given function
+ pub(crate) fn new(func: T) -> BlockingTask<T> {
+ BlockingTask { func: Some(func) }
+ }
+}
+
+// The closure `F` is never pinned
+impl<T> Unpin for BlockingTask<T> {}
+
+impl<T, R> Future for BlockingTask<T>
+where
+ T: FnOnce() -> R,
+{
+ type Output = R;
+
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
+ let me = &mut *self;
+ let func = me
+ .func
+ .take()
+ .expect("[internal exception] blocking task ran twice.");
+
+ // This is a little subtle:
+ // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
+ // using coop. However, the way things are currently modeled, even running a blocking task
+ // currently goes through Task::poll(), and so is subject to budgeting. That isn't really
+ // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
+ // we want it to start without any budgeting.
+ crate::coop::stop();
+
+ Poll::Ready(func())
+ }
+}