diff options
Diffstat (limited to 'tests/sync_broadcast.rs')
-rw-r--r-- | tests/sync_broadcast.rs | 136 |
1 files changed, 134 insertions, 2 deletions
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs index 9ef7927..cd66924 100644 --- a/tests/sync_broadcast.rs +++ b/tests/sync_broadcast.rs @@ -2,6 +2,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "sync")] +#[cfg(tokio_wasm_not_wasi)] +use wasm_bindgen_test::wasm_bindgen_test as test; + use tokio::sync::broadcast; use tokio_test::task; use tokio_test::{ @@ -44,7 +47,7 @@ macro_rules! assert_closed { ($e:expr) => { match assert_err!($e) { broadcast::error::TryRecvError::Closed => {} - _ => panic!("did not lag"), + _ => panic!("is not closed"), } }; } @@ -273,12 +276,14 @@ fn send_no_rx() { #[test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn zero_capacity() { broadcast::channel::<()>(0); } #[test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn capacity_too_big() { use std::usize; @@ -286,7 +291,8 @@ fn capacity_too_big() { } #[test] -#[cfg(not(target_os = "android"))] +#[cfg(panic = "unwind")] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn panic_in_clone() { use std::panic::{self, AssertUnwindSafe}; @@ -452,6 +458,132 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[test] +fn receiver_len_with_lagged() { + let (tx, mut rx) = broadcast::channel(3); + + tx.send(10).unwrap(); + tx.send(20).unwrap(); + tx.send(30).unwrap(); + tx.send(40).unwrap(); + + assert_eq!(rx.len(), 4); + assert_eq!(assert_recv!(rx), 10); + + tx.send(50).unwrap(); + tx.send(60).unwrap(); + + assert_eq!(rx.len(), 5); + assert_lagged!(rx.try_recv(), 1); +} + fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) } + +#[test] +fn resubscribe_points_to_tail() { + let (tx, mut rx) = broadcast::channel(3); + tx.send(1).unwrap(); + + let mut rx_resub = rx.resubscribe(); + + // verify we're one behind at the start + assert_empty!(rx_resub); + assert_eq!(assert_recv!(rx), 1); + + // verify we do not affect rx + tx.send(2).unwrap(); + assert_eq!(assert_recv!(rx_resub), 2); + tx.send(3).unwrap(); + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + assert_empty!(rx); + + assert_eq!(assert_recv!(rx_resub), 3); + assert_empty!(rx_resub); +} + +#[test] +fn resubscribe_lagged() { + let (tx, mut rx) = broadcast::channel(1); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + let mut rx_resub = rx.resubscribe(); + assert_lagged!(rx.try_recv(), 1); + assert_empty!(rx_resub); + + assert_eq!(assert_recv!(rx), 2); + assert_empty!(rx); + assert_empty!(rx_resub); +} + +#[test] +fn resubscribe_to_closed_channel() { + let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2); + drop(tx); + + let mut rx_resub = rx.resubscribe(); + assert_closed!(rx_resub.try_recv()); +} + +#[test] +fn sender_len() { + let (tx, mut rx1) = broadcast::channel(4); + let mut rx2 = tx.subscribe(); + + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx1); + assert_recv!(rx1); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx2); + + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + tx.send(4).unwrap(); + tx.send(5).unwrap(); + tx.send(6).unwrap(); + + assert_eq!(tx.len(), 4); + assert!(!tx.is_empty()); +} + +#[test] +#[cfg(not(tokio_wasm_not_wasi))] +fn sender_len_random() { + use rand::Rng; + + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + for _ in 0..1000 { + match rand::thread_rng().gen_range(0..4) { + 0 => { + let _ = rx1.try_recv(); + } + 1 => { + let _ = rx2.try_recv(); + } + _ => { + tx.send(0).unwrap(); + } + } + + let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); + assert_eq!(tx.len(), expected_len); + } +} |