aboutsummaryrefslogtreecommitdiff
path: root/tests/sync_broadcast.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sync_broadcast.rs')
-rw-r--r--tests/sync_broadcast.rs64
1 files changed, 59 insertions, 5 deletions
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
index cd66924..16b9a0a 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
@@ -2,7 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]
-#[cfg(tokio_wasm_not_wasi)]
+#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
use tokio::sync::broadcast;
@@ -276,14 +276,14 @@ fn send_no_rx() {
#[test]
#[should_panic]
-#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn zero_capacity() {
broadcast::channel::<()>(0);
}
#[test]
#[should_panic]
-#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn capacity_too_big() {
use std::usize;
@@ -292,7 +292,7 @@ fn capacity_too_big() {
#[test]
#[cfg(panic = "unwind")]
-#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn panic_in_clone() {
use std::panic::{self, AssertUnwindSafe};
@@ -563,7 +563,7 @@ fn sender_len() {
}
#[test]
-#[cfg(not(tokio_wasm_not_wasi))]
+#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
fn sender_len_random() {
use rand::Rng;
@@ -587,3 +587,57 @@ fn sender_len_random() {
assert_eq!(tx.len(), expected_len);
}
}
+
+#[test]
+fn send_in_waker_drop() {
+ use futures::task::ArcWake;
+ use std::future::Future;
+ use std::task::Context;
+
+ struct SendOnDrop(broadcast::Sender<()>);
+
+ impl Drop for SendOnDrop {
+ fn drop(&mut self) {
+ let _ = self.0.send(());
+ }
+ }
+
+ impl ArcWake for SendOnDrop {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {}
+ }
+
+ // Test if there is no deadlock when replacing the old waker.
+
+ let (tx, mut rx) = broadcast::channel(16);
+
+ let mut fut = Box::pin(async {
+ let _ = rx.recv().await;
+ });
+
+ // Store our special waker in the receiving future.
+ let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
+ let mut cx = Context::from_waker(&waker);
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+ drop(waker);
+
+ // Second poll shouldn't deadlock.
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
+ let _ = fut.as_mut().poll(&mut cx);
+
+ // Test if there is no deadlock when calling waker.wake().
+
+ let (tx, mut rx) = broadcast::channel(16);
+
+ let mut fut = Box::pin(async {
+ let _ = rx.recv().await;
+ });
+
+ // Store our special waker in the receiving future.
+ let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
+ let mut cx = Context::from_waker(&waker);
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+ drop(waker);
+
+ // Shouldn't deadlock.
+ let _ = tx.send(());
+}