aboutsummaryrefslogtreecommitdiff
path: root/tests/sync_broadcast.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sync_broadcast.rs')
-rw-r--r--tests/sync_broadcast.rs136
1 files changed, 134 insertions, 2 deletions
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
index 9ef7927..cd66924 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
@@ -2,6 +2,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]
+#[cfg(tokio_wasm_not_wasi)]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+
use tokio::sync::broadcast;
use tokio_test::task;
use tokio_test::{
@@ -44,7 +47,7 @@ macro_rules! assert_closed {
($e:expr) => {
match assert_err!($e) {
broadcast::error::TryRecvError::Closed => {}
- _ => panic!("did not lag"),
+ _ => panic!("is not closed"),
}
};
}
@@ -273,12 +276,14 @@ fn send_no_rx() {
#[test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn zero_capacity() {
broadcast::channel::<()>(0);
}
#[test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn capacity_too_big() {
use std::usize;
@@ -286,7 +291,8 @@ fn capacity_too_big() {
}
#[test]
-#[cfg(not(target_os = "android"))]
+#[cfg(panic = "unwind")]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn panic_in_clone() {
use std::panic::{self, AssertUnwindSafe};
@@ -452,6 +458,132 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}
+#[test]
+fn receiver_len_with_lagged() {
+ let (tx, mut rx) = broadcast::channel(3);
+
+ tx.send(10).unwrap();
+ tx.send(20).unwrap();
+ tx.send(30).unwrap();
+ tx.send(40).unwrap();
+
+ assert_eq!(rx.len(), 4);
+ assert_eq!(assert_recv!(rx), 10);
+
+ tx.send(50).unwrap();
+ tx.send(60).unwrap();
+
+ assert_eq!(rx.len(), 5);
+ assert_lagged!(rx.try_recv(), 1);
+}
+
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}
+
+#[test]
+fn resubscribe_points_to_tail() {
+ let (tx, mut rx) = broadcast::channel(3);
+ tx.send(1).unwrap();
+
+ let mut rx_resub = rx.resubscribe();
+
+ // verify we're one behind at the start
+ assert_empty!(rx_resub);
+ assert_eq!(assert_recv!(rx), 1);
+
+ // verify we do not affect rx
+ tx.send(2).unwrap();
+ assert_eq!(assert_recv!(rx_resub), 2);
+ tx.send(3).unwrap();
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_empty!(rx);
+
+ assert_eq!(assert_recv!(rx_resub), 3);
+ assert_empty!(rx_resub);
+}
+
+#[test]
+fn resubscribe_lagged() {
+ let (tx, mut rx) = broadcast::channel(1);
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+
+ let mut rx_resub = rx.resubscribe();
+ assert_lagged!(rx.try_recv(), 1);
+ assert_empty!(rx_resub);
+
+ assert_eq!(assert_recv!(rx), 2);
+ assert_empty!(rx);
+ assert_empty!(rx_resub);
+}
+
+#[test]
+fn resubscribe_to_closed_channel() {
+ let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
+ drop(tx);
+
+ let mut rx_resub = rx.resubscribe();
+ assert_closed!(rx_resub.try_recv());
+}
+
+#[test]
+fn sender_len() {
+ let (tx, mut rx1) = broadcast::channel(4);
+ let mut rx2 = tx.subscribe();
+
+ assert_eq!(tx.len(), 0);
+ assert!(tx.is_empty());
+
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ tx.send(3).unwrap();
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx1);
+ assert_recv!(rx1);
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx2);
+
+ assert_eq!(tx.len(), 2);
+ assert!(!tx.is_empty());
+
+ tx.send(4).unwrap();
+ tx.send(5).unwrap();
+ tx.send(6).unwrap();
+
+ assert_eq!(tx.len(), 4);
+ assert!(!tx.is_empty());
+}
+
+#[test]
+#[cfg(not(tokio_wasm_not_wasi))]
+fn sender_len_random() {
+ use rand::Rng;
+
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ for _ in 0..1000 {
+ match rand::thread_rng().gen_range(0..4) {
+ 0 => {
+ let _ = rx1.try_recv();
+ }
+ 1 => {
+ let _ = rx2.try_recv();
+ }
+ _ => {
+ tx.send(0).unwrap();
+ }
+ }
+
+ let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
+ assert_eq!(tx.len(), expected_len);
+ }
+}