aboutsummaryrefslogtreecommitdiff
path: root/tests/task_blocking.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/task_blocking.rs')
-rw-r--r--tests/task_blocking.rs108
1 files changed, 76 insertions, 32 deletions
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index 50c070a..eec19cc 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -28,7 +28,7 @@ async fn basic_blocking() {
}
}
-#[tokio::test(threaded_scheduler)]
+#[tokio::test(flavor = "multi_thread")]
async fn block_in_blocking() {
// Run a few times
for _ in 0..100 {
@@ -51,7 +51,7 @@ async fn block_in_blocking() {
}
}
-#[tokio::test(threaded_scheduler)]
+#[tokio::test(flavor = "multi_thread")]
async fn block_in_block() {
// Run a few times
for _ in 0..100 {
@@ -71,7 +71,7 @@ async fn block_in_block() {
}
}
-#[tokio::test(basic_scheduler)]
+#[tokio::test(flavor = "current_thread")]
#[should_panic]
async fn no_block_in_basic_scheduler() {
task::block_in_place(|| {});
@@ -79,10 +79,7 @@ async fn no_block_in_basic_scheduler() {
#[test]
fn yes_block_in_threaded_block_on() {
- let mut rt = runtime::Builder::new()
- .threaded_scheduler()
- .build()
- .unwrap();
+ let rt = runtime::Runtime::new().unwrap();
rt.block_on(async {
task::block_in_place(|| {});
});
@@ -91,7 +88,7 @@ fn yes_block_in_threaded_block_on() {
#[test]
#[should_panic]
fn no_block_in_basic_block_on() {
- let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
task::block_in_place(|| {});
});
@@ -99,15 +96,11 @@ fn no_block_in_basic_block_on() {
#[test]
fn can_enter_basic_rt_from_within_block_in_place() {
- let mut outer = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .build()
- .unwrap();
+ let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
tokio::task::block_in_place(|| {
- let mut inner = tokio::runtime::Builder::new()
- .basic_scheduler()
+ let inner = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
@@ -120,15 +113,11 @@ fn can_enter_basic_rt_from_within_block_in_place() {
fn useful_panic_message_when_dropping_rt_in_rt() {
use std::panic::{catch_unwind, AssertUnwindSafe};
- let mut outer = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .build()
- .unwrap();
+ let outer = tokio::runtime::Runtime::new().unwrap();
let result = catch_unwind(AssertUnwindSafe(|| {
outer.block_on(async {
- let _ = tokio::runtime::Builder::new()
- .basic_scheduler()
+ let _ = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
});
@@ -147,14 +136,10 @@ fn useful_panic_message_when_dropping_rt_in_rt() {
#[test]
fn can_shutdown_with_zero_timeout_in_runtime() {
- let mut outer = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .build()
- .unwrap();
+ let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
- let rt = tokio::runtime::Builder::new()
- .basic_scheduler()
+ let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.shutdown_timeout(Duration::from_nanos(0));
@@ -163,16 +148,75 @@ fn can_shutdown_with_zero_timeout_in_runtime() {
#[test]
fn can_shutdown_now_in_runtime() {
- let mut outer = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .build()
- .unwrap();
+ let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
- let rt = tokio::runtime::Builder::new()
- .basic_scheduler()
+ let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.shutdown_background();
});
}
+
+#[test]
+fn coop_disabled_in_block_in_place() {
+ let outer = tokio::runtime::Builder::new_multi_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ for i in 0..200 {
+ tx.send(i).unwrap();
+ }
+ drop(tx);
+
+ outer.block_on(async move {
+ let jh = tokio::spawn(async move {
+ tokio::task::block_in_place(move || {
+ futures::executor::block_on(async move {
+ use tokio::stream::StreamExt;
+ assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
+ })
+ })
+ });
+
+ tokio::time::timeout(Duration::from_secs(1), jh)
+ .await
+ .expect("timed out (probably hanging)")
+ .unwrap()
+ });
+}
+
+#[test]
+fn coop_disabled_in_block_in_place_in_block_on() {
+ let (done_tx, done_rx) = std::sync::mpsc::channel();
+ let done = done_tx.clone();
+ thread::spawn(move || {
+ let outer = tokio::runtime::Runtime::new().unwrap();
+
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ for i in 0..200 {
+ tx.send(i).unwrap();
+ }
+ drop(tx);
+
+ outer.block_on(async move {
+ tokio::task::block_in_place(move || {
+ futures::executor::block_on(async move {
+ use tokio::stream::StreamExt;
+ assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
+ })
+ })
+ });
+
+ let _ = done.send(Ok(()));
+ });
+
+ thread::spawn(move || {
+ thread::sleep(Duration::from_secs(1));
+ let _ = done_tx.send(Err("timed out (probably hanging)"));
+ });
+
+ done_rx.recv().unwrap().unwrap();
+}