diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/compat.rs | 43 | ||||
-rw-r--r-- | tests/io_sync_bridge.rs | 12 | ||||
-rw-r--r-- | tests/length_delimited.rs | 91 | ||||
-rw-r--r-- | tests/mpsc.rs | 23 | ||||
-rw-r--r-- | tests/task_join_map.rs | 24 | ||||
-rw-r--r-- | tests/task_tracker.rs | 178 | ||||
-rw-r--r-- | tests/time_delay_queue.rs | 63 | ||||
-rw-r--r-- | tests/udp.rs | 7 |
8 files changed, 392 insertions, 49 deletions
diff --git a/tests/compat.rs b/tests/compat.rs new file mode 100644 index 0000000..278ebfc --- /dev/null +++ b/tests/compat.rs @@ -0,0 +1,43 @@ +#![cfg(all(feature = "compat"))] +#![cfg(not(target_os = "wasi"))] // WASI does not support all fs operations +#![warn(rust_2018_idioms)] + +use futures_io::SeekFrom; +use futures_util::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tempfile::NamedTempFile; +use tokio::fs::OpenOptions; +use tokio_util::compat::TokioAsyncWriteCompatExt; + +#[tokio::test] +async fn compat_file_seek() -> futures_util::io::Result<()> { + let temp_file = NamedTempFile::new()?; + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(temp_file) + .await? + .compat_write(); + + file.write_all(&[0, 1, 2, 3, 4, 5]).await?; + file.write_all(&[6, 7]).await?; + + assert_eq!(file.stream_position().await?, 8); + + // Modify elements at position 2. + assert_eq!(file.seek(SeekFrom::Start(2)).await?, 2); + file.write_all(&[8, 9]).await?; + + file.flush().await?; + + // Verify we still have 8 elements. + assert_eq!(file.seek(SeekFrom::End(0)).await?, 8); + // Seek back to the start of the file to read and verify contents. + file.seek(SeekFrom::Start(0)).await?; + + let mut buf = Vec::new(); + let num_bytes = file.read_to_end(&mut buf).await?; + assert_eq!(&buf[..num_bytes], &[0, 1, 8, 9, 4, 5, 6, 7]); + + Ok(()) +} diff --git a/tests/io_sync_bridge.rs b/tests/io_sync_bridge.rs index 76bbd0b..50d0e89 100644 --- a/tests/io_sync_bridge.rs +++ b/tests/io_sync_bridge.rs @@ -44,6 +44,18 @@ async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> { } #[tokio::test] +async fn test_into_inner() -> Result<(), Box<dyn Error>> { + let mut buf = Vec::new(); + SyncIoBridge::new(tokio::io::empty()) + .into_inner() + .read_to_end(&mut buf) + .await + .unwrap(); + assert_eq!(buf.len(), 0); + Ok(()) +} + +#[tokio::test] async fn test_shutdown() -> Result<(), Box<dyn Error>> { let (s1, mut s2) = tokio::io::duplex(1024); let (_rh, wh) = tokio::io::split(s1); diff --git a/tests/length_delimited.rs b/tests/length_delimited.rs index 126e41b..ed5590f 100644 --- a/tests/length_delimited.rs +++ b/tests/length_delimited.rs @@ -12,7 +12,6 @@ use futures::{pin_mut, Sink, Stream}; use std::collections::VecDeque; use std::io; use std::pin::Pin; -use std::task::Poll::*; use std::task::{Context, Poll}; macro_rules! mock { @@ -39,10 +38,10 @@ macro_rules! assert_next_eq { macro_rules! assert_next_pending { ($io:ident) => {{ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { - Ready(Some(Ok(v))) => panic!("value = {:?}", v), - Ready(Some(Err(e))) => panic!("error = {:?}", e), - Ready(None) => panic!("done"), - Pending => {} + Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e), + Poll::Ready(None) => panic!("done"), + Poll::Pending => {} }); }}; } @@ -50,10 +49,10 @@ macro_rules! assert_next_pending { macro_rules! assert_next_err { ($io:ident) => {{ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { - Ready(Some(Ok(v))) => panic!("value = {:?}", v), - Ready(Some(Err(_))) => {} - Ready(None) => panic!("done"), - Pending => panic!("pending"), + Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Poll::Ready(Some(Err(_))) => {} + Poll::Ready(None) => panic!("done"), + Poll::Pending => panic!("pending"), }); }}; } @@ -186,11 +185,11 @@ fn read_single_frame_multi_packet_wait() { let io = FramedRead::new( mock! { data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09abc"), - Pending, + Poll::Pending, data(b"defghi"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -208,15 +207,15 @@ fn read_multi_frame_multi_packet_wait() { let io = FramedRead::new( mock! { data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09abc"), - Pending, + Poll::Pending, data(b"defghi"), - Pending, + Poll::Pending, data(b"\x00\x00\x00\x0312"), - Pending, + Poll::Pending, data(b"3\x00\x00\x00\x0bhello world"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -250,9 +249,9 @@ fn read_incomplete_head() { fn read_incomplete_head_multi() { let io = FramedRead::new( mock! { - Pending, + Poll::Pending, data(b"\x00"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -268,9 +267,9 @@ fn read_incomplete_payload() { let io = FramedRead::new( mock! { data(b"\x00\x00\x00\x09ab"), - Pending, + Poll::Pending, data(b"cd"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -310,7 +309,7 @@ fn read_update_max_frame_len_at_rest() { fn read_update_max_frame_len_in_flight() { let io = length_delimited::Builder::new().new_read(mock! { data(b"\x00\x00\x00\x09abcd"), - Pending, + Poll::Pending, data(b"efghi"), data(b"\x00\x00\x00\x09abcdefghi"), }); @@ -533,9 +532,9 @@ fn write_single_multi_frame_multi_packet() { fn write_single_frame_would_block() { let io = FramedWrite::new( mock! { - Pending, + Poll::Pending, data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09"), data(b"abcdefghi"), flush(), @@ -640,7 +639,7 @@ fn write_update_max_frame_len_in_flight() { let io = length_delimited::Builder::new().new_write(mock! { data(b"\x00\x00\x00\x06"), data(b"ab"), - Pending, + Poll::Pending, data(b"cdef"), flush(), }); @@ -701,8 +700,6 @@ enum Op { Flush, } -use self::Op::*; - impl AsyncRead for Mock { fn poll_read( mut self: Pin<&mut Self>, @@ -710,15 +707,15 @@ impl AsyncRead for Mock { dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Data(data)))) => { + Some(Poll::Ready(Ok(Op::Data(data)))) => { debug_assert!(dst.remaining() >= data.len()); dst.put_slice(&data); - Ready(Ok(())) + Poll::Ready(Ok(())) } - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(())), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(())), } } } @@ -730,31 +727,31 @@ impl AsyncWrite for Mock { src: &[u8], ) -> Poll<Result<usize, io::Error>> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Data(data)))) => { + Some(Poll::Ready(Ok(Op::Data(data)))) => { let len = data.len(); assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); assert_eq!(&data[..], &src[..len]); - Ready(Ok(len)) + Poll::Ready(Ok(len)) } - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(0)), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(0)), } } fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Flush))) => Ready(Ok(())), - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(())), + Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(())), } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - Ready(Ok(())) + Poll::Ready(Ok(())) } } @@ -771,9 +768,9 @@ impl From<Vec<u8>> for Op { } fn data(bytes: &[u8]) -> Poll<io::Result<Op>> { - Ready(Ok(bytes.into())) + Poll::Ready(Ok(bytes.into())) } fn flush() -> Poll<io::Result<Op>> { - Ready(Ok(Flush)) + Poll::Ready(Ok(Op::Flush)) } diff --git a/tests/mpsc.rs b/tests/mpsc.rs index a3c164d..74b83c2 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -28,6 +28,29 @@ async fn simple() { } #[tokio::test] +async fn simple_ref() { + let v = vec![1, 2, 3i32]; + + let (send, mut recv) = channel(3); + let mut send = PollSender::new(send); + + for vi in v.iter() { + let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); + assert_ready_ok!(reserve.poll()); + send.send_item(vi).unwrap(); + } + + let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); + assert_pending!(reserve.poll()); + + assert_eq!(*recv.recv().await.unwrap(), 1); + assert!(reserve.is_woken()); + assert_ready_ok!(reserve.poll()); + drop(recv); + send.send_item(&42).unwrap(); +} + +#[tokio::test] async fn repeated_poll_reserve() { let (send, mut recv) = channel::<i32>(1); let mut send = PollSender::new(send); diff --git a/tests/task_join_map.rs b/tests/task_join_map.rs index cef08b2..1ab5f9b 100644 --- a/tests/task_join_map.rs +++ b/tests/task_join_map.rs @@ -109,6 +109,30 @@ async fn alternating() { } } +#[tokio::test] +async fn test_keys() { + use std::collections::HashSet; + + let mut map = JoinMap::new(); + + assert_eq!(map.len(), 0); + map.spawn(1, async {}); + assert_eq!(map.len(), 1); + map.spawn(2, async {}); + assert_eq!(map.len(), 2); + + let keys = map.keys().collect::<HashSet<&u32>>(); + assert!(keys.contains(&1)); + assert!(keys.contains(&2)); + + let _ = map.join_next().await.unwrap(); + let _ = map.join_next().await.unwrap(); + + assert_eq!(map.len(), 0); + let keys = map.keys().collect::<HashSet<&u32>>(); + assert!(keys.is_empty()); +} + #[tokio::test(start_paused = true)] async fn abort_by_key() { let mut map = JoinMap::new(); diff --git a/tests/task_tracker.rs b/tests/task_tracker.rs new file mode 100644 index 0000000..f0eb244 --- /dev/null +++ b/tests/task_tracker.rs @@ -0,0 +1,178 @@ +#![warn(rust_2018_idioms)] + +use tokio_test::{assert_pending, assert_ready, task}; +use tokio_util::task::TaskTracker; + +#[test] +fn open_close() { + let tracker = TaskTracker::new(); + assert!(!tracker.is_closed()); + assert!(tracker.is_empty()); + assert_eq!(tracker.len(), 0); + + tracker.close(); + assert!(tracker.is_closed()); + assert!(tracker.is_empty()); + assert_eq!(tracker.len(), 0); + + tracker.reopen(); + assert!(!tracker.is_closed()); + tracker.reopen(); + assert!(!tracker.is_closed()); + + assert!(tracker.is_empty()); + assert_eq!(tracker.len(), 0); + + tracker.close(); + assert!(tracker.is_closed()); + tracker.close(); + assert!(tracker.is_closed()); + + assert!(tracker.is_empty()); + assert_eq!(tracker.len(), 0); +} + +#[test] +fn token_len() { + let tracker = TaskTracker::new(); + + let mut tokens = Vec::new(); + for i in 0..10 { + assert_eq!(tracker.len(), i); + tokens.push(tracker.token()); + } + + assert!(!tracker.is_empty()); + assert_eq!(tracker.len(), 10); + + for (i, token) in tokens.into_iter().enumerate() { + drop(token); + assert_eq!(tracker.len(), 9 - i); + } +} + +#[test] +fn notify_immediately() { + let tracker = TaskTracker::new(); + tracker.close(); + + let mut wait = task::spawn(tracker.wait()); + assert_ready!(wait.poll()); +} + +#[test] +fn notify_immediately_on_reopen() { + let tracker = TaskTracker::new(); + tracker.close(); + + let mut wait = task::spawn(tracker.wait()); + tracker.reopen(); + assert_ready!(wait.poll()); +} + +#[test] +fn notify_on_close() { + let tracker = TaskTracker::new(); + + let mut wait = task::spawn(tracker.wait()); + + assert_pending!(wait.poll()); + tracker.close(); + assert_ready!(wait.poll()); +} + +#[test] +fn notify_on_close_reopen() { + let tracker = TaskTracker::new(); + + let mut wait = task::spawn(tracker.wait()); + + assert_pending!(wait.poll()); + tracker.close(); + tracker.reopen(); + assert_ready!(wait.poll()); +} + +#[test] +fn notify_on_last_task() { + let tracker = TaskTracker::new(); + tracker.close(); + let token = tracker.token(); + + let mut wait = task::spawn(tracker.wait()); + assert_pending!(wait.poll()); + drop(token); + assert_ready!(wait.poll()); +} + +#[test] +fn notify_on_last_task_respawn() { + let tracker = TaskTracker::new(); + tracker.close(); + let token = tracker.token(); + + let mut wait = task::spawn(tracker.wait()); + assert_pending!(wait.poll()); + drop(token); + let token2 = tracker.token(); + assert_ready!(wait.poll()); + drop(token2); +} + +#[test] +fn no_notify_on_respawn_if_open() { + let tracker = TaskTracker::new(); + let token = tracker.token(); + + let mut wait = task::spawn(tracker.wait()); + assert_pending!(wait.poll()); + drop(token); + let token2 = tracker.token(); + assert_pending!(wait.poll()); + drop(token2); +} + +#[test] +fn close_during_exit() { + const ITERS: usize = 5; + + for close_spot in 0..=ITERS { + let tracker = TaskTracker::new(); + let tokens: Vec<_> = (0..ITERS).map(|_| tracker.token()).collect(); + + let mut wait = task::spawn(tracker.wait()); + + for (i, token) in tokens.into_iter().enumerate() { + assert_pending!(wait.poll()); + if i == close_spot { + tracker.close(); + assert_pending!(wait.poll()); + } + drop(token); + } + + if close_spot == ITERS { + assert_pending!(wait.poll()); + tracker.close(); + } + + assert_ready!(wait.poll()); + } +} + +#[test] +fn notify_many() { + let tracker = TaskTracker::new(); + + let mut waits: Vec<_> = (0..10).map(|_| task::spawn(tracker.wait())).collect(); + + for wait in &mut waits { + assert_pending!(wait.poll()); + } + + tracker.close(); + + for wait in &mut waits { + assert_ready!(wait.poll()); + } +} diff --git a/tests/time_delay_queue.rs b/tests/time_delay_queue.rs index 9ceae34..9b7b6cc 100644 --- a/tests/time_delay_queue.rs +++ b/tests/time_delay_queue.rs @@ -2,6 +2,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use futures::StreamExt; use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; @@ -257,6 +258,10 @@ async fn reset_twice() { #[tokio::test] async fn repeatedly_reset_entry_inserted_as_expired() { time::pause(); + + // Instants before the start of the test seem to break in wasm. + time::sleep(ms(1000)).await; + let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); @@ -556,6 +561,10 @@ async fn reset_later_after_slot_starts() { #[tokio::test] async fn reset_inserted_expired() { time::pause(); + + // Instants before the start of the test seem to break in wasm. + time::sleep(ms(1000)).await; + let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); @@ -778,6 +787,22 @@ async fn compact_change_deadline() { assert!(entry.is_none()); } +#[tokio::test(start_paused = true)] +async fn item_expiry_greater_than_wheel() { + // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted. + let mut queue = DelayQueue::new(); + for _ in 0..2 { + tokio::time::advance(Duration::from_millis(1 << 35)).await; + queue.insert(0, Duration::from_millis(0)); + queue.next().await; + } + // This should not panic + let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + queue.insert(1, Duration::from_millis(1)); + })); + assert!(no_panic.is_ok()); +} + #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] #[tokio::test(start_paused = true)] async fn remove_after_compact() { @@ -815,6 +840,44 @@ async fn remove_after_compact_poll() { assert!(panic.is_err()); } +#[tokio::test(start_paused = true)] +async fn peek() { + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let key = queue.insert_at("foo", now + ms(5)); + let key2 = queue.insert_at("bar", now); + let key3 = queue.insert_at("baz", now + ms(10)); + + assert_eq!(queue.peek(), Some(key2)); + + sleep(ms(6)).await; + + assert_eq!(queue.peek(), Some(key2)); + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(entry.get_ref(), &"bar"); + + assert_eq!(queue.peek(), Some(key)); + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(entry.get_ref(), &"foo"); + + assert_eq!(queue.peek(), Some(key3)); + + assert_pending!(poll!(queue)); + + sleep(ms(5)).await; + + assert_eq!(queue.peek(), Some(key3)); + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(entry.get_ref(), &"baz"); + + assert!(queue.peek().is_none()); +} + fn ms(n: u64) -> Duration { Duration::from_millis(n) } diff --git a/tests/udp.rs b/tests/udp.rs index 1b99806..db726a3 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -13,7 +13,10 @@ use futures::sink::SinkExt; use std::io; use std::sync::Arc; -#[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))] +#[cfg_attr( + any(target_os = "macos", target_os = "ios", target_os = "tvos"), + allow(unused_assignments) +)] #[tokio::test] async fn send_framed_byte_codec() -> std::io::Result<()> { let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; @@ -41,7 +44,7 @@ async fn send_framed_byte_codec() -> std::io::Result<()> { b_soc = b.into_inner(); } - #[cfg(not(any(target_os = "macos", target_os = "ios")))] + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "tvos")))] // test sending & receiving an empty message { let mut a = UdpFramed::new(a_soc, ByteCodec); |