aboutsummaryrefslogtreecommitdiff
path: root/tests/task_blocking.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/task_blocking.rs')
-rw-r--r--tests/task_blocking.rs14
1 files changed, 10 insertions, 4 deletions
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index eec19cc..82bef8a 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -7,6 +7,10 @@ use tokio_test::assert_ok;
use std::thread;
use std::time::Duration;
+mod support {
+ pub(crate) mod mpsc_stream;
+}
+
#[tokio::test]
async fn basic_blocking() {
// Run a few times
@@ -165,7 +169,8 @@ fn coop_disabled_in_block_in_place() {
.build()
.unwrap();
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
+
for i in 0..200 {
tx.send(i).unwrap();
}
@@ -175,7 +180,7 @@ fn coop_disabled_in_block_in_place() {
let jh = tokio::spawn(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})
@@ -195,7 +200,8 @@ fn coop_disabled_in_block_in_place_in_block_on() {
thread::spawn(move || {
let outer = tokio::runtime::Runtime::new().unwrap();
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
+
for i in 0..200 {
tx.send(i).unwrap();
}
@@ -204,7 +210,7 @@ fn coop_disabled_in_block_in_place_in_block_on() {
outer.block_on(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})