diff options
Diffstat (limited to 'tests/task_blocking.rs')
-rw-r--r-- | tests/task_blocking.rs | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index eec19cc..82bef8a 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -7,6 +7,10 @@ use tokio_test::assert_ok; use std::thread; use std::time::Duration; +mod support { + pub(crate) mod mpsc_stream; +} + #[tokio::test] async fn basic_blocking() { // Run a few times @@ -165,7 +169,8 @@ fn coop_disabled_in_block_in_place() { .build() .unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -175,7 +180,7 @@ fn coop_disabled_in_block_in_place() { let jh = tokio::spawn(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) @@ -195,7 +200,8 @@ fn coop_disabled_in_block_in_place_in_block_on() { thread::spawn(move || { let outer = tokio::runtime::Runtime::new().unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -204,7 +210,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { outer.block_on(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) |