aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/compat.rs43
-rw-r--r--tests/io_sync_bridge.rs12
-rw-r--r--tests/length_delimited.rs91
-rw-r--r--tests/mpsc.rs23
-rw-r--r--tests/task_join_map.rs24
-rw-r--r--tests/task_tracker.rs178
-rw-r--r--tests/time_delay_queue.rs63
-rw-r--r--tests/udp.rs7
8 files changed, 392 insertions, 49 deletions
diff --git a/tests/compat.rs b/tests/compat.rs
new file mode 100644
index 0000000..278ebfc
--- /dev/null
+++ b/tests/compat.rs
@@ -0,0 +1,43 @@
+#![cfg(all(feature = "compat"))]
+#![cfg(not(target_os = "wasi"))] // WASI does not support all fs operations
+#![warn(rust_2018_idioms)]
+
+use futures_io::SeekFrom;
+use futures_util::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
+use tempfile::NamedTempFile;
+use tokio::fs::OpenOptions;
+use tokio_util::compat::TokioAsyncWriteCompatExt;
+
+#[tokio::test]
+async fn compat_file_seek() -> futures_util::io::Result<()> {
+ let temp_file = NamedTempFile::new()?;
+ let mut file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .open(temp_file)
+ .await?
+ .compat_write();
+
+ file.write_all(&[0, 1, 2, 3, 4, 5]).await?;
+ file.write_all(&[6, 7]).await?;
+
+ assert_eq!(file.stream_position().await?, 8);
+
+ // Modify elements at position 2.
+ assert_eq!(file.seek(SeekFrom::Start(2)).await?, 2);
+ file.write_all(&[8, 9]).await?;
+
+ file.flush().await?;
+
+ // Verify we still have 8 elements.
+ assert_eq!(file.seek(SeekFrom::End(0)).await?, 8);
+ // Seek back to the start of the file to read and verify contents.
+ file.seek(SeekFrom::Start(0)).await?;
+
+ let mut buf = Vec::new();
+ let num_bytes = file.read_to_end(&mut buf).await?;
+ assert_eq!(&buf[..num_bytes], &[0, 1, 8, 9, 4, 5, 6, 7]);
+
+ Ok(())
+}
diff --git a/tests/io_sync_bridge.rs b/tests/io_sync_bridge.rs
index 76bbd0b..50d0e89 100644
--- a/tests/io_sync_bridge.rs
+++ b/tests/io_sync_bridge.rs
@@ -44,6 +44,18 @@ async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
}
#[tokio::test]
+async fn test_into_inner() -> Result<(), Box<dyn Error>> {
+ let mut buf = Vec::new();
+ SyncIoBridge::new(tokio::io::empty())
+ .into_inner()
+ .read_to_end(&mut buf)
+ .await
+ .unwrap();
+ assert_eq!(buf.len(), 0);
+ Ok(())
+}
+
+#[tokio::test]
async fn test_shutdown() -> Result<(), Box<dyn Error>> {
let (s1, mut s2) = tokio::io::duplex(1024);
let (_rh, wh) = tokio::io::split(s1);
diff --git a/tests/length_delimited.rs b/tests/length_delimited.rs
index 126e41b..ed5590f 100644
--- a/tests/length_delimited.rs
+++ b/tests/length_delimited.rs
@@ -12,7 +12,6 @@ use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
-use std::task::Poll::*;
use std::task::{Context, Poll};
macro_rules! mock {
@@ -39,10 +38,10 @@ macro_rules! assert_next_eq {
macro_rules! assert_next_pending {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
- Ready(Some(Ok(v))) => panic!("value = {:?}", v),
- Ready(Some(Err(e))) => panic!("error = {:?}", e),
- Ready(None) => panic!("done"),
- Pending => {}
+ Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e),
+ Poll::Ready(None) => panic!("done"),
+ Poll::Pending => {}
});
}};
}
@@ -50,10 +49,10 @@ macro_rules! assert_next_pending {
macro_rules! assert_next_err {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
- Ready(Some(Ok(v))) => panic!("value = {:?}", v),
- Ready(Some(Err(_))) => {}
- Ready(None) => panic!("done"),
- Pending => panic!("pending"),
+ Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Poll::Ready(Some(Err(_))) => {}
+ Poll::Ready(None) => panic!("done"),
+ Poll::Pending => panic!("pending"),
});
}};
}
@@ -186,11 +185,11 @@ fn read_single_frame_multi_packet_wait() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
- Pending,
+ Poll::Pending,
data(b"\x00\x09abc"),
- Pending,
+ Poll::Pending,
data(b"defghi"),
- Pending,
+ Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@@ -208,15 +207,15 @@ fn read_multi_frame_multi_packet_wait() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
- Pending,
+ Poll::Pending,
data(b"\x00\x09abc"),
- Pending,
+ Poll::Pending,
data(b"defghi"),
- Pending,
+ Poll::Pending,
data(b"\x00\x00\x00\x0312"),
- Pending,
+ Poll::Pending,
data(b"3\x00\x00\x00\x0bhello world"),
- Pending,
+ Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@@ -250,9 +249,9 @@ fn read_incomplete_head() {
fn read_incomplete_head_multi() {
let io = FramedRead::new(
mock! {
- Pending,
+ Poll::Pending,
data(b"\x00"),
- Pending,
+ Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@@ -268,9 +267,9 @@ fn read_incomplete_payload() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00\x00\x09ab"),
- Pending,
+ Poll::Pending,
data(b"cd"),
- Pending,
+ Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@@ -310,7 +309,7 @@ fn read_update_max_frame_len_at_rest() {
fn read_update_max_frame_len_in_flight() {
let io = length_delimited::Builder::new().new_read(mock! {
data(b"\x00\x00\x00\x09abcd"),
- Pending,
+ Poll::Pending,
data(b"efghi"),
data(b"\x00\x00\x00\x09abcdefghi"),
});
@@ -533,9 +532,9 @@ fn write_single_multi_frame_multi_packet() {
fn write_single_frame_would_block() {
let io = FramedWrite::new(
mock! {
- Pending,
+ Poll::Pending,
data(b"\x00\x00"),
- Pending,
+ Poll::Pending,
data(b"\x00\x09"),
data(b"abcdefghi"),
flush(),
@@ -640,7 +639,7 @@ fn write_update_max_frame_len_in_flight() {
let io = length_delimited::Builder::new().new_write(mock! {
data(b"\x00\x00\x00\x06"),
data(b"ab"),
- Pending,
+ Poll::Pending,
data(b"cdef"),
flush(),
});
@@ -701,8 +700,6 @@ enum Op {
Flush,
}
-use self::Op::*;
-
impl AsyncRead for Mock {
fn poll_read(
mut self: Pin<&mut Self>,
@@ -710,15 +707,15 @@ impl AsyncRead for Mock {
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.calls.pop_front() {
- Some(Ready(Ok(Op::Data(data)))) => {
+ Some(Poll::Ready(Ok(Op::Data(data)))) => {
debug_assert!(dst.remaining() >= data.len());
dst.put_slice(&data);
- Ready(Ok(()))
+ Poll::Ready(Ok(()))
}
- Some(Ready(Ok(_))) => panic!(),
- Some(Ready(Err(e))) => Ready(Err(e)),
- Some(Pending) => Pending,
- None => Ready(Ok(())),
+ Some(Poll::Ready(Ok(_))) => panic!(),
+ Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
+ Some(Poll::Pending) => Poll::Pending,
+ None => Poll::Ready(Ok(())),
}
}
}
@@ -730,31 +727,31 @@ impl AsyncWrite for Mock {
src: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.calls.pop_front() {
- Some(Ready(Ok(Op::Data(data)))) => {
+ Some(Poll::Ready(Ok(Op::Data(data)))) => {
let len = data.len();
assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
assert_eq!(&data[..], &src[..len]);
- Ready(Ok(len))
+ Poll::Ready(Ok(len))
}
- Some(Ready(Ok(_))) => panic!(),
- Some(Ready(Err(e))) => Ready(Err(e)),
- Some(Pending) => Pending,
- None => Ready(Ok(0)),
+ Some(Poll::Ready(Ok(_))) => panic!(),
+ Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
+ Some(Poll::Pending) => Poll::Pending,
+ None => Poll::Ready(Ok(0)),
}
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match self.calls.pop_front() {
- Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
- Some(Ready(Ok(_))) => panic!(),
- Some(Ready(Err(e))) => Ready(Err(e)),
- Some(Pending) => Pending,
- None => Ready(Ok(())),
+ Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())),
+ Some(Poll::Ready(Ok(_))) => panic!(),
+ Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
+ Some(Poll::Pending) => Poll::Pending,
+ None => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- Ready(Ok(()))
+ Poll::Ready(Ok(()))
}
}
@@ -771,9 +768,9 @@ impl From<Vec<u8>> for Op {
}
fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
- Ready(Ok(bytes.into()))
+ Poll::Ready(Ok(bytes.into()))
}
fn flush() -> Poll<io::Result<Op>> {
- Ready(Ok(Flush))
+ Poll::Ready(Ok(Op::Flush))
}
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index a3c164d..74b83c2 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -28,6 +28,29 @@ async fn simple() {
}
#[tokio::test]
+async fn simple_ref() {
+ let v = vec![1, 2, 3i32];
+
+ let (send, mut recv) = channel(3);
+ let mut send = PollSender::new(send);
+
+ for vi in v.iter() {
+ let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
+ assert_ready_ok!(reserve.poll());
+ send.send_item(vi).unwrap();
+ }
+
+ let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
+ assert_pending!(reserve.poll());
+
+ assert_eq!(*recv.recv().await.unwrap(), 1);
+ assert!(reserve.is_woken());
+ assert_ready_ok!(reserve.poll());
+ drop(recv);
+ send.send_item(&42).unwrap();
+}
+
+#[tokio::test]
async fn repeated_poll_reserve() {
let (send, mut recv) = channel::<i32>(1);
let mut send = PollSender::new(send);
diff --git a/tests/task_join_map.rs b/tests/task_join_map.rs
index cef08b2..1ab5f9b 100644
--- a/tests/task_join_map.rs
+++ b/tests/task_join_map.rs
@@ -109,6 +109,30 @@ async fn alternating() {
}
}
+#[tokio::test]
+async fn test_keys() {
+ use std::collections::HashSet;
+
+ let mut map = JoinMap::new();
+
+ assert_eq!(map.len(), 0);
+ map.spawn(1, async {});
+ assert_eq!(map.len(), 1);
+ map.spawn(2, async {});
+ assert_eq!(map.len(), 2);
+
+ let keys = map.keys().collect::<HashSet<&u32>>();
+ assert!(keys.contains(&1));
+ assert!(keys.contains(&2));
+
+ let _ = map.join_next().await.unwrap();
+ let _ = map.join_next().await.unwrap();
+
+ assert_eq!(map.len(), 0);
+ let keys = map.keys().collect::<HashSet<&u32>>();
+ assert!(keys.is_empty());
+}
+
#[tokio::test(start_paused = true)]
async fn abort_by_key() {
let mut map = JoinMap::new();
diff --git a/tests/task_tracker.rs b/tests/task_tracker.rs
new file mode 100644
index 0000000..f0eb244
--- /dev/null
+++ b/tests/task_tracker.rs
@@ -0,0 +1,178 @@
+#![warn(rust_2018_idioms)]
+
+use tokio_test::{assert_pending, assert_ready, task};
+use tokio_util::task::TaskTracker;
+
+#[test]
+fn open_close() {
+ let tracker = TaskTracker::new();
+ assert!(!tracker.is_closed());
+ assert!(tracker.is_empty());
+ assert_eq!(tracker.len(), 0);
+
+ tracker.close();
+ assert!(tracker.is_closed());
+ assert!(tracker.is_empty());
+ assert_eq!(tracker.len(), 0);
+
+ tracker.reopen();
+ assert!(!tracker.is_closed());
+ tracker.reopen();
+ assert!(!tracker.is_closed());
+
+ assert!(tracker.is_empty());
+ assert_eq!(tracker.len(), 0);
+
+ tracker.close();
+ assert!(tracker.is_closed());
+ tracker.close();
+ assert!(tracker.is_closed());
+
+ assert!(tracker.is_empty());
+ assert_eq!(tracker.len(), 0);
+}
+
+#[test]
+fn token_len() {
+ let tracker = TaskTracker::new();
+
+ let mut tokens = Vec::new();
+ for i in 0..10 {
+ assert_eq!(tracker.len(), i);
+ tokens.push(tracker.token());
+ }
+
+ assert!(!tracker.is_empty());
+ assert_eq!(tracker.len(), 10);
+
+ for (i, token) in tokens.into_iter().enumerate() {
+ drop(token);
+ assert_eq!(tracker.len(), 9 - i);
+ }
+}
+
+#[test]
+fn notify_immediately() {
+ let tracker = TaskTracker::new();
+ tracker.close();
+
+ let mut wait = task::spawn(tracker.wait());
+ assert_ready!(wait.poll());
+}
+
+#[test]
+fn notify_immediately_on_reopen() {
+ let tracker = TaskTracker::new();
+ tracker.close();
+
+ let mut wait = task::spawn(tracker.wait());
+ tracker.reopen();
+ assert_ready!(wait.poll());
+}
+
+#[test]
+fn notify_on_close() {
+ let tracker = TaskTracker::new();
+
+ let mut wait = task::spawn(tracker.wait());
+
+ assert_pending!(wait.poll());
+ tracker.close();
+ assert_ready!(wait.poll());
+}
+
+#[test]
+fn notify_on_close_reopen() {
+ let tracker = TaskTracker::new();
+
+ let mut wait = task::spawn(tracker.wait());
+
+ assert_pending!(wait.poll());
+ tracker.close();
+ tracker.reopen();
+ assert_ready!(wait.poll());
+}
+
+#[test]
+fn notify_on_last_task() {
+ let tracker = TaskTracker::new();
+ tracker.close();
+ let token = tracker.token();
+
+ let mut wait = task::spawn(tracker.wait());
+ assert_pending!(wait.poll());
+ drop(token);
+ assert_ready!(wait.poll());
+}
+
+#[test]
+fn notify_on_last_task_respawn() {
+ let tracker = TaskTracker::new();
+ tracker.close();
+ let token = tracker.token();
+
+ let mut wait = task::spawn(tracker.wait());
+ assert_pending!(wait.poll());
+ drop(token);
+ let token2 = tracker.token();
+ assert_ready!(wait.poll());
+ drop(token2);
+}
+
+#[test]
+fn no_notify_on_respawn_if_open() {
+ let tracker = TaskTracker::new();
+ let token = tracker.token();
+
+ let mut wait = task::spawn(tracker.wait());
+ assert_pending!(wait.poll());
+ drop(token);
+ let token2 = tracker.token();
+ assert_pending!(wait.poll());
+ drop(token2);
+}
+
+#[test]
+fn close_during_exit() {
+ const ITERS: usize = 5;
+
+ for close_spot in 0..=ITERS {
+ let tracker = TaskTracker::new();
+ let tokens: Vec<_> = (0..ITERS).map(|_| tracker.token()).collect();
+
+ let mut wait = task::spawn(tracker.wait());
+
+ for (i, token) in tokens.into_iter().enumerate() {
+ assert_pending!(wait.poll());
+ if i == close_spot {
+ tracker.close();
+ assert_pending!(wait.poll());
+ }
+ drop(token);
+ }
+
+ if close_spot == ITERS {
+ assert_pending!(wait.poll());
+ tracker.close();
+ }
+
+ assert_ready!(wait.poll());
+ }
+}
+
+#[test]
+fn notify_many() {
+ let tracker = TaskTracker::new();
+
+ let mut waits: Vec<_> = (0..10).map(|_| task::spawn(tracker.wait())).collect();
+
+ for wait in &mut waits {
+ assert_pending!(wait.poll());
+ }
+
+ tracker.close();
+
+ for wait in &mut waits {
+ assert_ready!(wait.poll());
+ }
+}
diff --git a/tests/time_delay_queue.rs b/tests/time_delay_queue.rs
index 9ceae34..9b7b6cc 100644
--- a/tests/time_delay_queue.rs
+++ b/tests/time_delay_queue.rs
@@ -2,6 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use futures::StreamExt;
use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;
@@ -257,6 +258,10 @@ async fn reset_twice() {
#[tokio::test]
async fn repeatedly_reset_entry_inserted_as_expired() {
time::pause();
+
+ // Instants before the start of the test seem to break in wasm.
+ time::sleep(ms(1000)).await;
+
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();
@@ -556,6 +561,10 @@ async fn reset_later_after_slot_starts() {
#[tokio::test]
async fn reset_inserted_expired() {
time::pause();
+
+ // Instants before the start of the test seem to break in wasm.
+ time::sleep(ms(1000)).await;
+
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();
@@ -778,6 +787,22 @@ async fn compact_change_deadline() {
assert!(entry.is_none());
}
+#[tokio::test(start_paused = true)]
+async fn item_expiry_greater_than_wheel() {
+ // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
+ let mut queue = DelayQueue::new();
+ for _ in 0..2 {
+ tokio::time::advance(Duration::from_millis(1 << 35)).await;
+ queue.insert(0, Duration::from_millis(0));
+ queue.next().await;
+ }
+ // This should not panic
+ let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ queue.insert(1, Duration::from_millis(1));
+ }));
+ assert!(no_panic.is_ok());
+}
+
#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
#[tokio::test(start_paused = true)]
async fn remove_after_compact() {
@@ -815,6 +840,44 @@ async fn remove_after_compact_poll() {
assert!(panic.is_err());
}
+#[tokio::test(start_paused = true)]
+async fn peek() {
+ let mut queue = task::spawn(DelayQueue::new());
+
+ let now = Instant::now();
+
+ let key = queue.insert_at("foo", now + ms(5));
+ let key2 = queue.insert_at("bar", now);
+ let key3 = queue.insert_at("baz", now + ms(10));
+
+ assert_eq!(queue.peek(), Some(key2));
+
+ sleep(ms(6)).await;
+
+ assert_eq!(queue.peek(), Some(key2));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"bar");
+
+ assert_eq!(queue.peek(), Some(key));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"foo");
+
+ assert_eq!(queue.peek(), Some(key3));
+
+ assert_pending!(poll!(queue));
+
+ sleep(ms(5)).await;
+
+ assert_eq!(queue.peek(), Some(key3));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"baz");
+
+ assert!(queue.peek().is_none());
+}
+
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}
diff --git a/tests/udp.rs b/tests/udp.rs
index 1b99806..db726a3 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -13,7 +13,10 @@ use futures::sink::SinkExt;
use std::io;
use std::sync::Arc;
-#[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))]
+#[cfg_attr(
+ any(target_os = "macos", target_os = "ios", target_os = "tvos"),
+ allow(unused_assignments)
+)]
#[tokio::test]
async fn send_framed_byte_codec() -> std::io::Result<()> {
let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
@@ -41,7 +44,7 @@ async fn send_framed_byte_codec() -> std::io::Result<()> {
b_soc = b.into_inner();
}
- #[cfg(not(any(target_os = "macos", target_os = "ios")))]
+ #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "tvos")))]
// test sending & receiving an empty message
{
let mut a = UdpFramed::new(a_soc, ByteCodec);