aboutsummaryrefslogtreecommitdiff
path: root/tests/task_local_set.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/task_local_set.rs')
-rw-r--r--tests/task_local_set.rs206
1 files changed, 187 insertions, 19 deletions
diff --git a/tests/task_local_set.rs b/tests/task_local_set.rs
index f8a35d0..2da87f5 100644
--- a/tests/task_local_set.rs
+++ b/tests/task_local_set.rs
@@ -6,18 +6,23 @@ use futures::{
FutureExt,
};
-use tokio::runtime::{self, Runtime};
+use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
use tokio::time;
+#[cfg(not(tokio_wasi))]
use std::cell::Cell;
-use std::sync::atomic::Ordering::{self, SeqCst};
-use std::sync::atomic::{AtomicBool, AtomicUsize};
+use std::sync::atomic::AtomicBool;
+#[cfg(not(tokio_wasi))]
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+#[cfg(not(tokio_wasi))]
+use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
#[tokio::test(flavor = "current_thread")]
-async fn local_basic_scheduler() {
+async fn local_current_thread_scheduler() {
LocalSet::new()
.run_until(async {
task::spawn_local(async {}).await.unwrap();
@@ -25,6 +30,7 @@ async fn local_basic_scheduler() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool() {
thread_local! {
@@ -45,6 +51,7 @@ async fn local_threadpool() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_threadpool() {
thread_local! {
@@ -60,6 +67,7 @@ async fn localset_future_threadpool() {
local.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_timers() {
static RAN1: AtomicBool = AtomicBool::new(false);
@@ -104,6 +112,7 @@ async fn localset_future_drives_all_local_futs() {
assert!(RAN3.load(Ordering::SeqCst));
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
@@ -126,7 +135,23 @@ async fn local_threadpool_timer() {
})
.await;
}
+#[test]
+fn enter_guard_spawn() {
+ let local = LocalSet::new();
+ let _guard = local.enter();
+ // Run the local task set.
+ let join = task::spawn_local(async { true });
+ let rt = runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ local.block_on(&rt, async move {
+ assert!(join.await.unwrap());
+ });
+}
+
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[test]
// This will panic, since the thread that calls `block_on` cannot use
// in-place blocking inside of `block_on`.
@@ -153,6 +178,7 @@ fn local_threadpool_blocking_in_place() {
});
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_blocking_run() {
thread_local! {
@@ -181,6 +207,7 @@ async fn local_threadpool_blocking_run() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn all_spawns_are_local() {
use futures::future;
@@ -207,6 +234,7 @@ async fn all_spawns_are_local() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn nested_spawn_is_local() {
thread_local! {
@@ -242,6 +270,7 @@ async fn nested_spawn_is_local() {
.await;
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[test]
fn join_local_future_elsewhere() {
thread_local! {
@@ -255,14 +284,12 @@ fn join_local_future_elsewhere() {
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let join = task::spawn_local(async move {
- println!("hello world running...");
assert!(
ON_RT_THREAD.with(|cell| cell.get()),
"local task must run on local thread, no matter where it is awaited"
);
rx.await.unwrap();
- println!("hello world task done");
"hello world"
});
let join2 = task::spawn(async move {
@@ -272,16 +299,34 @@ fn join_local_future_elsewhere() {
);
tx.send(()).expect("task shouldn't have ended yet");
- println!("waking up hello world...");
join.await.expect("task should complete successfully");
-
- println!("hello world task joined");
});
join2.await.unwrap()
});
}
+// Tests for <https://github.com/tokio-rs/tokio/issues/4973>
+#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
+#[tokio::test(flavor = "multi_thread")]
+async fn localset_in_thread_local() {
+ thread_local! {
+ static LOCAL_SET: LocalSet = LocalSet::new();
+ }
+
+ // holds runtime thread until end of main fn.
+ let (_tx, rx) = oneshot::channel::<()>();
+ let handle = tokio::runtime::Handle::current();
+
+ std::thread::spawn(move || {
+ LOCAL_SET.with(|local_set| {
+ handle.block_on(local_set.run_until(async move {
+ let _ = rx.await;
+ }))
+ });
+ });
+}
+
#[test]
fn drop_cancels_tasks() {
use std::rc::Rc;
@@ -345,9 +390,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
- Err(RecvTimeoutError::Disconnected) => {
- println!("done_rx dropped, did the test thread panic?");
- }
+ Err(RecvTimeoutError::Disconnected) => {}
// Test completed successfully!
Ok(()) => {}
}
@@ -355,6 +398,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
thread.join().expect("test thread should not panic!")
}
+#[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")]
#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
@@ -377,6 +421,10 @@ fn drop_cancels_remote_tasks() {
});
}
+#[cfg_attr(
+ tokio_wasi,
+ ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
+)]
#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
@@ -398,6 +446,7 @@ fn local_tasks_wake_join_all() {
});
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[test]
fn local_tasks_are_polled_after_tick() {
// This test depends on timing, so we run it up to five times.
@@ -414,6 +463,7 @@ fn local_tasks_are_polled_after_tick() {
local_tasks_are_polled_after_tick_inner();
}
+#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[tokio::main(flavor = "current_thread")]
async fn local_tasks_are_polled_after_tick_inner() {
// Reproduces issues #1899 and #1900
@@ -450,12 +500,15 @@ async fn local_tasks_are_polled_after_tick_inner() {
tx.send(()).unwrap();
}
- time::sleep(Duration::from_millis(20)).await;
- let rx1 = RX1.load(SeqCst);
- let rx2 = RX2.load(SeqCst);
- println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
- assert_eq!(EXPECTED, rx1);
- assert_eq!(EXPECTED, rx2);
+ loop {
+ time::sleep(Duration::from_millis(20)).await;
+ let rx1 = RX1.load(SeqCst);
+ let rx2 = RX2.load(SeqCst);
+
+ if rx1 == EXPECTED && rx2 == EXPECTED {
+ break;
+ }
+ }
});
while let Some(oneshot) = rx.recv().await {
@@ -517,7 +570,122 @@ async fn spawn_wakes_localset() {
}
}
-fn rt() -> Runtime {
+#[test]
+fn store_local_set_in_thread_local_with_runtime() {
+ use tokio::runtime::Runtime;
+
+ thread_local! {
+ static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
+ }
+
+ struct RtAndLocalSet {
+ rt: Runtime,
+ local: LocalSet,
+ }
+
+ impl RtAndLocalSet {
+ fn new() -> RtAndLocalSet {
+ RtAndLocalSet {
+ rt: tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap(),
+ local: LocalSet::new(),
+ }
+ }
+
+ async fn inner_method(&self) {
+ self.local
+ .run_until(async move {
+ tokio::task::spawn_local(async {});
+ })
+ .await
+ }
+
+ fn method(&self) {
+ self.rt.block_on(self.inner_method());
+ }
+ }
+
+ CURRENT.with(|f| {
+ f.method();
+ });
+}
+
+#[cfg(tokio_unstable)]
+mod unstable {
+ use tokio::runtime::UnhandledPanic;
+ use tokio::task::LocalSet;
+
+ #[tokio::test]
+ #[should_panic(
+ expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
+ )]
+ async fn shutdown_on_panic() {
+ LocalSet::new()
+ .unhandled_panic(UnhandledPanic::ShutdownRuntime)
+ .run_until(async {
+ tokio::task::spawn_local(async {
+ panic!("boom");
+ });
+
+ futures::future::pending::<()>().await;
+ })
+ .await;
+ }
+
+ // This test compares that, when the task driving `run_until` has already
+ // consumed budget, the `run_until` future has less budget than a "spawned"
+ // task.
+ //
+ // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
+ // internally. This is why the test uses indirection to test this.
+ #[tokio::test]
+ async fn run_until_does_not_get_own_budget() {
+ // Consume some budget
+ tokio::task::consume_budget().await;
+
+ LocalSet::new()
+ .run_until(async {
+ let spawned = tokio::spawn(async {
+ let mut spawned_n = 0;
+
+ {
+ let mut spawned = tokio_test::task::spawn(async {
+ loop {
+ spawned_n += 1;
+ tokio::task::consume_budget().await;
+ }
+ });
+ // Poll once
+ assert!(!spawned.poll().is_ready());
+ }
+
+ spawned_n
+ });
+
+ let mut run_until_n = 0;
+ {
+ let mut run_until = tokio_test::task::spawn(async {
+ loop {
+ run_until_n += 1;
+ tokio::task::consume_budget().await;
+ }
+ });
+ // Poll once
+ assert!(!run_until.poll().is_ready());
+ }
+
+ let spawned_n = spawned.await.unwrap();
+ assert_ne!(spawned_n, 0);
+ assert_ne!(run_until_n, 0);
+ assert!(spawned_n > run_until_n);
+ })
+ .await
+ }
+}
+
+fn rt() -> runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()