diff options
author | Haibo Huang <hhb@google.com> | 2020-11-18 15:38:25 -0800 |
---|---|---|
committer | Haibo Huang <hhb@google.com> | 2020-11-18 15:38:25 -0800 |
commit | 142a0689ca6b6326a51c74aa475a460e035438cd (patch) | |
tree | 1cce8a9def09a004d9b03d019713eace7571659e /tests | |
parent | d45ebac89067bf60e5226c2a3378bf47186fd82f (diff) | |
download | tokio-142a0689ca6b6326a51c74aa475a460e035438cd.tar.gz |
Upgrade rust/crates/tokio to 0.3.4
Test: make
Change-Id: I6b2c5610921e4208538c51c23566e0f864fe0ad4
Diffstat (limited to 'tests')
-rw-r--r-- | tests/io_async_fd.rs | 15 | ||||
-rw-r--r-- | tests/sync_notify.rs | 34 | ||||
-rw-r--r-- | tests/tcp_stream.rs | 236 | ||||
-rw-r--r-- | tests/time_sleep.rs | 1 | ||||
-rw-r--r-- | tests/udp.rs | 90 |
5 files changed, 373 insertions, 3 deletions
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs index 0303eff..f8dc65f 100644 --- a/tests/io_async_fd.rs +++ b/tests/io_async_fd.rs @@ -304,6 +304,15 @@ async fn drop_closes() { } #[tokio::test] +async fn reregister() { + let (a, _b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + let a = afd_a.into_inner(); + AsyncFd::new(a).unwrap(); +} + +#[tokio::test] async fn with_poll() { use std::task::Poll; @@ -491,10 +500,10 @@ fn driver_shutdown_wakes_currently_pending() { std::mem::drop(rt); - // Being awoken by a rt drop does not return an error, currently... - let _ = futures::executor::block_on(readable).unwrap(); + // The future was initialized **before** dropping the rt + assert_err!(futures::executor::block_on(readable)); - // However, attempting to initiate a readiness wait when the rt is dropped is an error + // The future is initialized **after** dropping the rt. assert_err!(futures::executor::block_on(afd_a.readable())); } diff --git a/tests/sync_notify.rs b/tests/sync_notify.rs index 8c70fe3..8ffe020 100644 --- a/tests/sync_notify.rs +++ b/tests/sync_notify.rs @@ -100,3 +100,37 @@ fn notified_multi_notify_drop_one() { assert!(notified2.is_woken()); assert_ready!(notified2.poll()); } + +#[test] +fn notify_in_drop_after_wake() { + use futures::task::ArcWake; + use std::future::Future; + use std::sync::Arc; + + let notify = Arc::new(Notify::new()); + + struct NotifyOnDrop(Arc<Notify>); + + impl ArcWake for NotifyOnDrop { + fn wake_by_ref(_arc_self: &Arc<Self>) {} + } + + impl Drop for NotifyOnDrop { + fn drop(&mut self) { + self.0.notify_waiters(); + } + } + + let mut fut = Box::pin(async { + notify.notified().await; + }); + + { + let waker = futures::task::waker(Arc::new(NotifyOnDrop(notify.clone()))); + let mut cx = std::task::Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + } + + // Now, notifying **should not** deadlock + notify.notify_waiters(); +} diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs new file mode 100644 index 0000000..58b06ee --- /dev/null +++ b/tests/tcp_stream.rs @@ -0,0 +1,236 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::try_join; +use tokio_test::task; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; + +use std::io; +use std::task::Poll; +use std::time::Duration; + +use futures::future::poll_fn; + +#[tokio::test] +async fn set_linger() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + let stream = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); + + assert_ok!(stream.set_linger(Some(Duration::from_secs(1)))); + assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1); + + assert_ok!(stream.set_linger(None)); + assert!(stream.linger().unwrap().is_none()); +} + +#[tokio::test] +async fn try_read_write() { + const DATA: &[u8] = b"this is some data to write to the socket"; + + // Create listener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); + let (server, _) = listener.accept().await.unwrap(); + let mut written = DATA.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await.unwrap(); + assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(DATA) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + match server.try_read(&mut read[i..]) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await.unwrap(); + + if ready.is_read_closed() { + return; + } else { + tokio::task::yield_now().await; + } + } +} + +#[test] +fn buffer_not_included_in_future() { + use std::mem; + + const N: usize = 4096; + + let fut = async { + let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); + + loop { + stream.readable().await.unwrap(); + + let mut buf = [0; N]; + let n = stream.try_read(&mut buf[..]).unwrap(); + + if n == 0 { + break; + } + } + }; + + let n = mem::size_of_val(&fut); + assert!(n < 1000); +} + +macro_rules! assert_readable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await); + }; +} + +macro_rules! assert_not_readable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_read_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +macro_rules! assert_writable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); + }; +} + +macro_rules! assert_not_writable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_write_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +#[tokio::test] +async fn poll_read_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - not readable. + assert_not_readable_by_polling!(server); + + // There is data in the buffer - readable. + assert_ok!(client.write_all(b"ping").await); + assert_readable_by_polling!(server); + + // Readable until calls to `poll_read` return `Poll::Pending`. + let mut buf = [0u8; 4]; + assert_ok!(server.read_exact(&mut buf).await); + assert_readable_by_polling!(server); + read_until_pending(&mut server); + assert_not_readable_by_polling!(server); + + // Detect the client disconnect. + drop(client); + assert_readable_by_polling!(server); +} + +#[tokio::test] +async fn poll_write_ready() { + let (mut client, server) = create_pair().await; + + // Initial state - writable. + assert_writable_by_polling!(client); + + // No space to write - not writable. + write_until_pending(&mut client); + assert_not_writable_by_polling!(client); + + // Detect the server disconnect. + drop(server); + assert_writable_by_polling!(client); +} + +async fn create_pair() -> (TcpStream, TcpStream) { + let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); + (client, server) +} + +fn read_until_pending(stream: &mut TcpStream) { + let mut buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_read(&mut buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} + +fn write_until_pending(stream: &mut TcpStream) { + let buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_write(&buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs index 87d69de..955d833 100644 --- a/tests/time_sleep.rs +++ b/tests/time_sleep.rs @@ -182,6 +182,7 @@ const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; #[tokio::test] async fn exactly_max() { // TODO: this should not panic but `time::ms()` is acting up + // If fixed, make sure to update documentation on `time::sleep` too. time::sleep(ms(MAX_DURATION)).await; } diff --git a/tests/udp.rs b/tests/udp.rs index 8b79cb8..291267e 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -2,6 +2,7 @@ #![cfg(feature = "full")] use futures::future::poll_fn; +use std::io; use std::sync::Arc; use tokio::{io::ReadBuf, net::UdpSocket}; @@ -238,6 +239,8 @@ async fn try_send_spawn() { .await .unwrap(); + sender.writable().await.unwrap(); + let sent = &sender .try_send_to(MSG, receiver.local_addr().unwrap()) .unwrap(); @@ -263,3 +266,90 @@ async fn try_send_spawn() { assert_eq!(received, MSG_LEN * 2 + MSG2_LEN); } + +#[tokio::test] +async fn try_send_recv() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Connect the two + client.connect(server.local_addr().unwrap()).await.unwrap(); + server.connect(client.local_addr().unwrap()).await.unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send(b"hello world") { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = [0; 512]; + + match server.try_recv(&mut buf) { + Ok(n) => { + assert_eq!(n, 11); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} + +#[tokio::test] +async fn try_send_to_recv_from() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let saddr = server.local_addr().unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let caddr = client.local_addr().unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send_to(b"hello world", saddr) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = [0; 512]; + + match server.try_recv_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr, caddr); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} |