diff options
Diffstat (limited to 'tests/task_blocking.rs')
-rw-r--r-- | tests/task_blocking.rs | 108 |
1 files changed, 76 insertions, 32 deletions
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index 50c070a..eec19cc 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -28,7 +28,7 @@ async fn basic_blocking() { } } -#[tokio::test(threaded_scheduler)] +#[tokio::test(flavor = "multi_thread")] async fn block_in_blocking() { // Run a few times for _ in 0..100 { @@ -51,7 +51,7 @@ async fn block_in_blocking() { } } -#[tokio::test(threaded_scheduler)] +#[tokio::test(flavor = "multi_thread")] async fn block_in_block() { // Run a few times for _ in 0..100 { @@ -71,7 +71,7 @@ async fn block_in_block() { } } -#[tokio::test(basic_scheduler)] +#[tokio::test(flavor = "current_thread")] #[should_panic] async fn no_block_in_basic_scheduler() { task::block_in_place(|| {}); @@ -79,10 +79,7 @@ async fn no_block_in_basic_scheduler() { #[test] fn yes_block_in_threaded_block_on() { - let mut rt = runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let rt = runtime::Runtime::new().unwrap(); rt.block_on(async { task::block_in_place(|| {}); }); @@ -91,7 +88,7 @@ fn yes_block_in_threaded_block_on() { #[test] #[should_panic] fn no_block_in_basic_block_on() { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = runtime::Builder::new_current_thread().build().unwrap(); rt.block_on(async { task::block_in_place(|| {}); }); @@ -99,15 +96,11 @@ fn no_block_in_basic_block_on() { #[test] fn can_enter_basic_rt_from_within_block_in_place() { - let mut outer = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let outer = tokio::runtime::Runtime::new().unwrap(); outer.block_on(async { tokio::task::block_in_place(|| { - let mut inner = tokio::runtime::Builder::new() - .basic_scheduler() + let inner = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); @@ -120,15 +113,11 @@ fn can_enter_basic_rt_from_within_block_in_place() { fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; - let mut outer = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let outer = tokio::runtime::Runtime::new().unwrap(); let result = catch_unwind(AssertUnwindSafe(|| { outer.block_on(async { - let _ = tokio::runtime::Builder::new() - .basic_scheduler() + let _ = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); }); @@ -147,14 +136,10 @@ fn useful_panic_message_when_dropping_rt_in_rt() { #[test] fn can_shutdown_with_zero_timeout_in_runtime() { - let mut outer = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let outer = tokio::runtime::Runtime::new().unwrap(); outer.block_on(async { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); rt.shutdown_timeout(Duration::from_nanos(0)); @@ -163,16 +148,75 @@ fn can_shutdown_with_zero_timeout_in_runtime() { #[test] fn can_shutdown_now_in_runtime() { - let mut outer = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let outer = tokio::runtime::Runtime::new().unwrap(); outer.block_on(async { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); rt.shutdown_background(); }); } + +#[test] +fn coop_disabled_in_block_in_place() { + let outer = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + for i in 0..200 { + tx.send(i).unwrap(); + } + drop(tx); + + outer.block_on(async move { + let jh = tokio::spawn(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); + }) + }) + }); + + tokio::time::timeout(Duration::from_secs(1), jh) + .await + .expect("timed out (probably hanging)") + .unwrap() + }); +} + +#[test] +fn coop_disabled_in_block_in_place_in_block_on() { + let (done_tx, done_rx) = std::sync::mpsc::channel(); + let done = done_tx.clone(); + thread::spawn(move || { + let outer = tokio::runtime::Runtime::new().unwrap(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + for i in 0..200 { + tx.send(i).unwrap(); + } + drop(tx); + + outer.block_on(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); + }) + }) + }); + + let _ = done.send(Ok(())); + }); + + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + let _ = done_tx.send(Err("timed out (probably hanging)")); + }); + + done_rx.recv().unwrap().unwrap(); +} |