aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-11-18 15:38:25 -0800
committerHaibo Huang <hhb@google.com>2020-11-18 15:38:25 -0800
commit142a0689ca6b6326a51c74aa475a460e035438cd (patch)
tree1cce8a9def09a004d9b03d019713eace7571659e /tests
parentd45ebac89067bf60e5226c2a3378bf47186fd82f (diff)
downloadtokio-142a0689ca6b6326a51c74aa475a460e035438cd.tar.gz
Upgrade rust/crates/tokio to 0.3.4
Test: make Change-Id: I6b2c5610921e4208538c51c23566e0f864fe0ad4
Diffstat (limited to 'tests')
-rw-r--r--tests/io_async_fd.rs15
-rw-r--r--tests/sync_notify.rs34
-rw-r--r--tests/tcp_stream.rs236
-rw-r--r--tests/time_sleep.rs1
-rw-r--r--tests/udp.rs90
5 files changed, 373 insertions, 3 deletions
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
index 0303eff..f8dc65f 100644
--- a/tests/io_async_fd.rs
+++ b/tests/io_async_fd.rs
@@ -304,6 +304,15 @@ async fn drop_closes() {
}
#[tokio::test]
+async fn reregister() {
+ let (a, _b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+ let a = afd_a.into_inner();
+ AsyncFd::new(a).unwrap();
+}
+
+#[tokio::test]
async fn with_poll() {
use std::task::Poll;
@@ -491,10 +500,10 @@ fn driver_shutdown_wakes_currently_pending() {
std::mem::drop(rt);
- // Being awoken by a rt drop does not return an error, currently...
- let _ = futures::executor::block_on(readable).unwrap();
+ // The future was initialized **before** dropping the rt
+ assert_err!(futures::executor::block_on(readable));
- // However, attempting to initiate a readiness wait when the rt is dropped is an error
+ // The future is initialized **after** dropping the rt.
assert_err!(futures::executor::block_on(afd_a.readable()));
}
diff --git a/tests/sync_notify.rs b/tests/sync_notify.rs
index 8c70fe3..8ffe020 100644
--- a/tests/sync_notify.rs
+++ b/tests/sync_notify.rs
@@ -100,3 +100,37 @@ fn notified_multi_notify_drop_one() {
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}
+
+#[test]
+fn notify_in_drop_after_wake() {
+ use futures::task::ArcWake;
+ use std::future::Future;
+ use std::sync::Arc;
+
+ let notify = Arc::new(Notify::new());
+
+ struct NotifyOnDrop(Arc<Notify>);
+
+ impl ArcWake for NotifyOnDrop {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {}
+ }
+
+ impl Drop for NotifyOnDrop {
+ fn drop(&mut self) {
+ self.0.notify_waiters();
+ }
+ }
+
+ let mut fut = Box::pin(async {
+ notify.notified().await;
+ });
+
+ {
+ let waker = futures::task::waker(Arc::new(NotifyOnDrop(notify.clone())));
+ let mut cx = std::task::Context::from_waker(&waker);
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+ }
+
+ // Now, notifying **should not** deadlock
+ notify.notify_waiters();
+}
diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs
new file mode 100644
index 0000000..58b06ee
--- /dev/null
+++ b/tests/tcp_stream.rs
@@ -0,0 +1,236 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::try_join;
+use tokio_test::task;
+use tokio_test::{assert_ok, assert_pending, assert_ready_ok};
+
+use std::io;
+use std::task::Poll;
+use std::time::Duration;
+
+use futures::future::poll_fn;
+
+#[tokio::test]
+async fn set_linger() {
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ let stream = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+
+ assert_ok!(stream.set_linger(Some(Duration::from_secs(1))));
+ assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1);
+
+ assert_ok!(stream.set_linger(None));
+ assert!(stream.linger().unwrap().is_none());
+}
+
+#[tokio::test]
+async fn try_read_write() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ match server.try_read(&mut read[i..]) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}
+
+#[test]
+fn buffer_not_included_in_future() {
+ use std::mem;
+
+ const N: usize = 4096;
+
+ let fut = async {
+ let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
+
+ loop {
+ stream.readable().await.unwrap();
+
+ let mut buf = [0; N];
+ let n = stream.try_read(&mut buf[..]).unwrap();
+
+ if n == 0 {
+ break;
+ }
+ }
+ };
+
+ let n = mem::size_of_val(&fut);
+ assert!(n < 1000);
+}
+
+macro_rules! assert_readable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_readable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_read_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+macro_rules! assert_writable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_writable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_write_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+#[tokio::test]
+async fn poll_read_ready() {
+ let (mut client, mut server) = create_pair().await;
+
+ // Initial state - not readable.
+ assert_not_readable_by_polling!(server);
+
+ // There is data in the buffer - readable.
+ assert_ok!(client.write_all(b"ping").await);
+ assert_readable_by_polling!(server);
+
+ // Readable until calls to `poll_read` return `Poll::Pending`.
+ let mut buf = [0u8; 4];
+ assert_ok!(server.read_exact(&mut buf).await);
+ assert_readable_by_polling!(server);
+ read_until_pending(&mut server);
+ assert_not_readable_by_polling!(server);
+
+ // Detect the client disconnect.
+ drop(client);
+ assert_readable_by_polling!(server);
+}
+
+#[tokio::test]
+async fn poll_write_ready() {
+ let (mut client, server) = create_pair().await;
+
+ // Initial state - writable.
+ assert_writable_by_polling!(client);
+
+ // No space to write - not writable.
+ write_until_pending(&mut client);
+ assert_not_writable_by_polling!(client);
+
+ // Detect the server disconnect.
+ drop(server);
+ assert_writable_by_polling!(client);
+}
+
+async fn create_pair() -> (TcpStream, TcpStream) {
+ let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+ let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept()));
+ (client, server)
+}
+
+fn read_until_pending(stream: &mut TcpStream) {
+ let mut buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_read(&mut buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}
+
+fn write_until_pending(stream: &mut TcpStream) {
+ let buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_write(&buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}
diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs
index 87d69de..955d833 100644
--- a/tests/time_sleep.rs
+++ b/tests/time_sleep.rs
@@ -182,6 +182,7 @@ const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
#[tokio::test]
async fn exactly_max() {
// TODO: this should not panic but `time::ms()` is acting up
+ // If fixed, make sure to update documentation on `time::sleep` too.
time::sleep(ms(MAX_DURATION)).await;
}
diff --git a/tests/udp.rs b/tests/udp.rs
index 8b79cb8..291267e 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -2,6 +2,7 @@
#![cfg(feature = "full")]
use futures::future::poll_fn;
+use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
@@ -238,6 +239,8 @@ async fn try_send_spawn() {
.await
.unwrap();
+ sender.writable().await.unwrap();
+
let sent = &sender
.try_send_to(MSG, receiver.local_addr().unwrap())
.unwrap();
@@ -263,3 +266,90 @@ async fn try_send_spawn() {
assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
}
+
+#[tokio::test]
+async fn try_send_recv() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Connect the two
+ client.connect(server.local_addr().unwrap()).await.unwrap();
+ server.connect(client.local_addr().unwrap()).await.unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send(b"hello world") {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv(&mut buf) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_send_to_recv_from() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}