aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/blocking
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-23 09:39:31 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-23 09:52:09 -0700
commitd5495b03381a3ebe0805db353d198b285b535b5c (patch)
tree778b8524d15fca8b73db0253ee0e1919d0848bb6 /src/runtime/blocking
parentba45c5bedf31df8562364c61d3dfb5262f10642e (diff)
downloadtokio-d5495b03381a3ebe0805db353d198b285b535b5c.tar.gz
Update to tokio-0.3.1 and add new features
Test: Build Change-Id: I5b5b9b386a21982a019653d0cf0bd3afc505cfac
Diffstat (limited to 'src/runtime/blocking')
-rw-r--r--src/runtime/blocking/mod.rs21
-rw-r--r--src/runtime/blocking/pool.rs72
-rw-r--r--src/runtime/blocking/shutdown.rs11
3 files changed, 65 insertions, 39 deletions
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs
index 0b36a75..fece3c2 100644
--- a/src/runtime/blocking/mod.rs
+++ b/src/runtime/blocking/mod.rs
@@ -3,22 +3,20 @@
//! shells. This isolates the complexity of dealing with conditional
//! compilation.
-cfg_blocking_impl! {
- mod pool;
- pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
+mod pool;
+pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
- mod schedule;
- mod shutdown;
- pub(crate) mod task;
+mod schedule;
+mod shutdown;
+pub(crate) mod task;
- use crate::runtime::Builder;
-
- pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
- BlockingPool::new(builder, thread_cap)
+use crate::runtime::Builder;
- }
+pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
}
+/*
cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;
@@ -41,3 +39,4 @@ cfg_not_blocking_impl! {
}
}
}
+*/
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 40d417b..2967a10 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -5,9 +5,13 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::builder::ThreadNameFn;
+use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
+use slab::Slab;
+
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
@@ -30,7 +34,7 @@ struct Inner {
condvar: Condvar,
/// Spawned threads use this name
- thread_name: String,
+ thread_name: ThreadNameFn,
/// Spawned thread stack size
stack_size: Option<usize>,
@@ -41,7 +45,11 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,
+ // Maximum number of threads
thread_cap: usize,
+
+ // Customizable wait timeout
+ keep_alive: Duration,
}
struct Shared {
@@ -51,6 +59,7 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
+ worker_threads: Slab<thread::JoinHandle<()>>,
}
type Task = task::Notified<NoopSchedule>;
@@ -62,11 +71,8 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
-
- let (task, handle) = task::joinable(BlockingTask::new(func));
- let _ = rt.blocking_spawner.spawn(task, &rt);
- handle
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
+ rt.spawn_blocking(func)
}
#[allow(dead_code)]
@@ -74,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
@@ -85,6 +91,7 @@ where
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
+ let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
BlockingPool {
spawner: Spawner {
@@ -96,6 +103,7 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
+ worker_threads: Slab::new(),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
@@ -103,6 +111,7 @@ impl BlockingPool {
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
+ keep_alive,
}),
},
shutdown_rx,
@@ -114,7 +123,7 @@ impl BlockingPool {
}
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
- let mut shared = self.spawner.inner.shared.lock().unwrap();
+ let mut shared = self.spawner.inner.shared.lock();
// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
@@ -126,10 +135,15 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
+ let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
drop(shared);
- self.shutdown_rx.wait(timeout);
+ if self.shutdown_rx.wait(timeout) {
+ for handle in workers.drain() {
+ let _ = handle.join();
+ }
+ }
}
}
@@ -150,7 +164,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
if shared.shutdown {
// Shutdown the task
@@ -187,14 +201,24 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- self.spawn_thread(shutdown_tx, rt);
+ let mut shared = self.inner.shared.lock();
+ let entry = shared.worker_threads.vacant_entry();
+
+ let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
+
+ entry.insert(handle);
}
Ok(())
}
- fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
- let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
+ fn spawn_thread(
+ &self,
+ shutdown_tx: shutdown::Sender,
+ rt: &Handle,
+ worker_id: usize,
+ ) -> thread::JoinHandle<()> {
+ let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
@@ -205,23 +229,21 @@ impl Spawner {
builder
.spawn(move || {
// Only the reference should be moved into the closure
- let rt = &rt;
- rt.enter(move || {
- rt.blocking_spawner.inner.run();
- drop(shutdown_tx);
- })
+ let _enter = crate::runtime::context::enter(rt.clone());
+ rt.blocking_spawner.inner.run(worker_id);
+ drop(shutdown_tx);
})
- .unwrap();
+ .unwrap()
}
}
impl Inner {
- fn run(&self) {
+ fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start {
f()
}
- let mut shared = self.shared.lock().unwrap();
+ let mut shared = self.shared.lock();
'main: loop {
// BUSY
@@ -229,14 +251,14 @@ impl Inner {
drop(shared);
task.run();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// IDLE
shared.num_idle += 1;
while !shared.shutdown {
- let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+ let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
@@ -252,6 +274,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
+ shared.worker_threads.remove(worker_id);
+
break 'main;
}
@@ -264,7 +288,7 @@ impl Inner {
drop(shared);
task.shutdown();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs
index e76a701..3b6cc59 100644
--- a/src/runtime/blocking/shutdown.rs
+++ b/src/runtime/blocking/shutdown.rs
@@ -32,11 +32,13 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
- pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
+ ///
+ /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter;
if timeout == Some(Duration::from_nanos(0)) {
- return;
+ return true;
}
let mut e = match try_enter(false) {
@@ -44,7 +46,7 @@ impl Receiver {
_ => {
if std::thread::panicking() {
// Don't panic in a panic
- return;
+ return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
@@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a
// thread-local).
if let Some(timeout) = timeout {
- let _ = e.block_on_timeout(&mut self.rx, timeout);
+ e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
+ true
}
}
}