diff options
Diffstat (limited to 'tests/sync_broadcast.rs')
-rw-r--r-- | tests/sync_broadcast.rs | 64 |
1 files changed, 59 insertions, 5 deletions
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs index cd66924..16b9a0a 100644 --- a/tests/sync_broadcast.rs +++ b/tests/sync_broadcast.rs @@ -2,7 +2,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "sync")] -#[cfg(tokio_wasm_not_wasi)] +#[cfg(all(target_family = "wasm", not(target_os = "wasi")))] use wasm_bindgen_test::wasm_bindgen_test as test; use tokio::sync::broadcast; @@ -276,14 +276,14 @@ fn send_no_rx() { #[test] #[should_panic] -#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding fn zero_capacity() { broadcast::channel::<()>(0); } #[test] #[should_panic] -#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding fn capacity_too_big() { use std::usize; @@ -292,7 +292,7 @@ fn capacity_too_big() { #[test] #[cfg(panic = "unwind")] -#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding fn panic_in_clone() { use std::panic::{self, AssertUnwindSafe}; @@ -563,7 +563,7 @@ fn sender_len() { } #[test] -#[cfg(not(tokio_wasm_not_wasi))] +#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))] fn sender_len_random() { use rand::Rng; @@ -587,3 +587,57 @@ fn sender_len_random() { assert_eq!(tx.len(), expected_len); } } + +#[test] +fn send_in_waker_drop() { + use futures::task::ArcWake; + use std::future::Future; + use std::task::Context; + + struct SendOnDrop(broadcast::Sender<()>); + + impl Drop for SendOnDrop { + fn drop(&mut self) { + let _ = self.0.send(()); + } + } + + impl ArcWake for SendOnDrop { + fn wake_by_ref(_arc_self: &Arc<Self>) {} + } + + // Test if there is no deadlock when replacing the old waker. + + let (tx, mut rx) = broadcast::channel(16); + + let mut fut = Box::pin(async { + let _ = rx.recv().await; + }); + + // Store our special waker in the receiving future. + let waker = futures::task::waker(Arc::new(SendOnDrop(tx))); + let mut cx = Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + drop(waker); + + // Second poll shouldn't deadlock. + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + let _ = fut.as_mut().poll(&mut cx); + + // Test if there is no deadlock when calling waker.wake(). + + let (tx, mut rx) = broadcast::channel(16); + + let mut fut = Box::pin(async { + let _ = rx.recv().await; + }); + + // Store our special waker in the receiving future. + let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone()))); + let mut cx = Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + drop(waker); + + // Shouldn't deadlock. + let _ = tx.send(()); +} |