diff options
author | Joel Galenson <jgalenson@google.com> | 2021-08-17 08:33:38 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-08-17 08:40:48 -0700 |
commit | 642961436a727d51930e5839e3dbfee04ba4af95 (patch) | |
tree | 9da006d6d1c0e4667e8d848673b13cc7d2bb62ca /tests | |
parent | 1c33108b3901dd464f81acf08b5268ec294b3876 (diff) | |
download | tokio-642961436a727d51930e5839e3dbfee04ba4af95.tar.gz |
Upgrade rust/crates/tokio to 1.10.0
Test: make
Change-Id: I4ec984178af20297aae0ed51f0b1c6410876a51b
Diffstat (limited to 'tests')
31 files changed, 1692 insertions, 1515 deletions
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index 01e6081..aa14970 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -4,13 +4,30 @@ use std::cell::Cell; use std::future::Future; -use std::io::{Cursor, SeekFrom}; +use std::io::SeekFrom; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; +// The names of these structs behaves better when sorted. +// Send: Yes, Sync: Yes +#[derive(Clone)] +struct YY {} + +// Send: Yes, Sync: No +#[derive(Clone)] +struct YN { + _value: Cell<u8>, +} + +// Send: No, Sync: No +#[derive(Clone)] +struct NN { + _value: Rc<u8>, +} + #[allow(dead_code)] type BoxFutureSync<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + Sync>>; #[allow(dead_code)] @@ -19,11 +36,11 @@ type BoxFutureSend<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T>>>; #[allow(dead_code)] -type BoxAsyncRead = std::pin::Pin<Box<dyn tokio::io::AsyncBufRead>>; +type BoxAsyncRead = std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Sync>>; #[allow(dead_code)] -type BoxAsyncSeek = std::pin::Pin<Box<dyn tokio::io::AsyncSeek>>; +type BoxAsyncSeek = std::pin::Pin<Box<dyn tokio::io::AsyncSeek + Send + Sync>>; #[allow(dead_code)] -type BoxAsyncWrite = std::pin::Pin<Box<dyn tokio::io::AsyncWrite>>; +type BoxAsyncWrite = std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Sync>>; #[allow(dead_code)] fn require_send<T: Send>(_t: &T) {} @@ -59,310 +76,594 @@ macro_rules! into_todo { x }}; } -macro_rules! assert_value { - ($type:ty: Send & Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f: $type = todo!(); - require_send(&f); - require_sync(&f); - }; - }; - ($type:ty: !Send & Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f: $type = todo!(); - AmbiguousIfSend::some_item(&f); - require_sync(&f); - }; - }; - ($type:ty: Send & !Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f: $type = todo!(); - require_send(&f); - AmbiguousIfSync::some_item(&f); - }; + +macro_rules! async_assert_fn_send { + (Send & $(!)?Sync & $(!)?Unpin, $value:expr) => { + require_send(&$value); }; - ($type:ty: !Send & !Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f: $type = todo!(); - AmbiguousIfSend::some_item(&f); - AmbiguousIfSync::some_item(&f); - }; - }; - ($type:ty: Unpin) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f: $type = todo!(); - require_unpin(&f); - }; + (!Send & $(!)?Sync & $(!)?Unpin, $value:expr) => { + AmbiguousIfSend::some_item(&$value); }; } -macro_rules! async_assert_fn { - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - require_send(&f); - require_sync(&f); - }; +macro_rules! async_assert_fn_sync { + ($(!)?Send & Sync & $(!)?Unpin, $value:expr) => { + require_sync(&$value); }; - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - require_send(&f); - AmbiguousIfSync::some_item(&f); - }; + ($(!)?Send & !Sync & $(!)?Unpin, $value:expr) => { + AmbiguousIfSync::some_item(&$value); }; - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - AmbiguousIfSend::some_item(&f); - require_sync(&f); - }; +} +macro_rules! async_assert_fn_unpin { + ($(!)?Send & $(!)?Sync & Unpin, $value:expr) => { + require_unpin(&$value); }; - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => { - #[allow(unreachable_code)] - #[allow(unused_variables)] - const _: fn() = || { - let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - AmbiguousIfSend::some_item(&f); - AmbiguousIfSync::some_item(&f); - }; + ($(!)?Send & $(!)?Sync & !Unpin, $value:expr) => { + AmbiguousIfUnpin::some_item(&$value); }; - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => { +} + +macro_rules! async_assert_fn { + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): $($tok:tt)*) => { #[allow(unreachable_code)] #[allow(unused_variables)] const _: fn() = || { let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - AmbiguousIfUnpin::some_item(&f); + async_assert_fn_send!($($tok)*, f); + async_assert_fn_sync!($($tok)*, f); + async_assert_fn_unpin!($($tok)*, f); }; }; - ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => { +} +macro_rules! assert_value { + ($type:ty: $($tok:tt)*) => { #[allow(unreachable_code)] #[allow(unused_variables)] const _: fn() = || { - let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); - require_unpin(&f); + let f: $type = todo!(); + async_assert_fn_send!($($tok)*, f); + async_assert_fn_sync!($($tok)*, f); + async_assert_fn_unpin!($($tok)*, f); }; }; } -async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync); -async_assert_fn!(tokio::io::empty(): Send & Sync); -async_assert_fn!(tokio::io::repeat(u8): Send & Sync); -async_assert_fn!(tokio::io::sink(): Send & Sync); -async_assert_fn!(tokio::io::split(TcpStream): Send & Sync); -async_assert_fn!(tokio::io::stderr(): Send & Sync); -async_assert_fn!(tokio::io::stdin(): Send & Sync); -async_assert_fn!(tokio::io::stdout(): Send & Sync); -async_assert_fn!(tokio::io::Split<Cursor<Vec<u8>>>::next_segment(_): Send & Sync); - -async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync); -async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync); -async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync); -async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync); -async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync); -async_assert_fn!(tokio::fs::metadata(&str): Send & Sync); -async_assert_fn!(tokio::fs::read(&str): Send & Sync); -async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync); -async_assert_fn!(tokio::fs::read_link(&str): Send & Sync); -async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync); -async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync); -async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync); -async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync); -async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync); -async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync); -async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync); -async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync); -async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync); -async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync); -async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync); -async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync); +assert_value!(tokio::fs::DirBuilder: Send & Sync & Unpin); +assert_value!(tokio::fs::DirEntry: Send & Sync & Unpin); +assert_value!(tokio::fs::File: Send & Sync & Unpin); +assert_value!(tokio::fs::OpenOptions: Send & Sync & Unpin); +assert_value!(tokio::fs::ReadDir: Send & Sync & Unpin); -async_assert_fn!(tokio::fs::File::open(&str): Send & Sync); -async_assert_fn!(tokio::fs::File::create(&str): Send & Sync); -async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync); -async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync); -async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync); -async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync); -async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync); -async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync); -async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync); +async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::metadata(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::read(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::read_link(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::DirBuilder::create(_, &str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::open(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::create(&str): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync & !Unpin); -async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync); -async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync); -async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync); +assert_value!(tokio::net::TcpListener: Send & Sync & Unpin); +assert_value!(tokio::net::TcpSocket: Send & Sync & Unpin); +assert_value!(tokio::net::TcpStream: Send & Sync & Unpin); +assert_value!(tokio::net::UdpSocket: Send & Sync & Unpin); +assert_value!(tokio::net::tcp::OwnedReadHalf: Send & Sync & Unpin); +assert_value!(tokio::net::tcp::OwnedWriteHalf: Send & Sync & Unpin); +assert_value!(tokio::net::tcp::ReadHalf<'_>: Send & Sync & Unpin); +assert_value!(tokio::net::tcp::ReuniteError: Send & Sync & Unpin); +assert_value!(tokio::net::tcp::WriteHalf<'_>: Send & Sync & Unpin); +async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpStream::readable(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpStream::ready(_, tokio::io::Interest): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::TcpStream::writable(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::peek_from(_, &mut [u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::readable(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::ready(_, tokio::io::Interest): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::UdpSocket::writable(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync & !Unpin); +async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync & !Unpin); #[cfg(unix)] mod unix_datagram { use super::*; - async_assert_fn!(tokio::net::UnixListener::bind(&str): Send & Sync); - async_assert_fn!(tokio::net::UnixListener::accept(_): Send & Sync); - async_assert_fn!(tokio::net::UnixDatagram::send(_, &[u8]): Send & Sync); - async_assert_fn!(tokio::net::UnixDatagram::recv(_, &mut [u8]): Send & Sync); - async_assert_fn!(tokio::net::UnixDatagram::send_to(_, &[u8], &str): Send & Sync); - async_assert_fn!(tokio::net::UnixDatagram::recv_from(_, &mut [u8]): Send & Sync); - async_assert_fn!(tokio::net::UnixStream::connect(&str): Send & Sync); + use tokio::net::*; + assert_value!(UnixDatagram: Send & Sync & Unpin); + assert_value!(UnixListener: Send & Sync & Unpin); + assert_value!(UnixStream: Send & Sync & Unpin); + assert_value!(unix::OwnedReadHalf: Send & Sync & Unpin); + assert_value!(unix::OwnedWriteHalf: Send & Sync & Unpin); + assert_value!(unix::ReadHalf<'_>: Send & Sync & Unpin); + assert_value!(unix::ReuniteError: Send & Sync & Unpin); + assert_value!(unix::SocketAddr: Send & Sync & Unpin); + assert_value!(unix::UCred: Send & Sync & Unpin); + assert_value!(unix::WriteHalf<'_>: Send & Sync & Unpin); + async_assert_fn!(UnixDatagram::readable(_): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::recv(_, &mut [u8]): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::recv_from(_, &mut [u8]): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::send(_, &[u8]): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::send_to(_, &[u8], &str): Send & Sync & !Unpin); + async_assert_fn!(UnixDatagram::writable(_): Send & Sync & !Unpin); + async_assert_fn!(UnixListener::accept(_): Send & Sync & !Unpin); + async_assert_fn!(UnixStream::connect(&str): Send & Sync & !Unpin); + async_assert_fn!(UnixStream::readable(_): Send & Sync & !Unpin); + async_assert_fn!(UnixStream::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(UnixStream::writable(_): Send & Sync & !Unpin); } -async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync); -async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync); -#[cfg(unix)] -async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync); +#[cfg(windows)] +mod windows_named_pipe { + use super::*; + use tokio::net::windows::named_pipe::*; + assert_value!(ClientOptions: Send & Sync & Unpin); + assert_value!(NamedPipeClient: Send & Sync & Unpin); + assert_value!(NamedPipeServer: Send & Sync & Unpin); + assert_value!(PipeEnd: Send & Sync & Unpin); + assert_value!(PipeInfo: Send & Sync & Unpin); + assert_value!(PipeMode: Send & Sync & Unpin); + assert_value!(ServerOptions: Send & Sync & Unpin); + async_assert_fn!(NamedPipeClient::readable(_): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeClient::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeClient::writable(_): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeServer::connect(_): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeServer::readable(_): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeServer::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(NamedPipeServer::writable(_): Send & Sync & !Unpin); +} -async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync); -async_assert_fn!(tokio::sync::Mutex<u8>::lock(_): Send & Sync); -async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock(_): Send & Sync); -async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Mutex<u8>::lock_owned(_): Send & Sync); -async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock_owned(_): Send & Sync); -async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); -async_assert_fn!(tokio::sync::RwLock<u8>::read(_): Send & Sync); -async_assert_fn!(tokio::sync::RwLock<u8>::write(_): Send & Sync); -async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::read(_): !Send & !Sync); -async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::write(_): !Send & !Sync); -async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::read(_): !Send & !Sync); -async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::write(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Semaphore::acquire(_): Send & Sync); +assert_value!(tokio::process::Child: Send & Sync & Unpin); +assert_value!(tokio::process::ChildStderr: Send & Sync & Unpin); +assert_value!(tokio::process::ChildStdin: Send & Sync & Unpin); +assert_value!(tokio::process::ChildStdout: Send & Sync & Unpin); +assert_value!(tokio::process::Command: Send & Sync & Unpin); +async_assert_fn!(tokio::process::Child::kill(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::process::Child::wait(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync & !Unpin); -async_assert_fn!(tokio::sync::broadcast::Receiver<u8>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::broadcast::Receiver<Cell<u8>>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::broadcast::Receiver<Rc<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync & !Unpin); +#[cfg(unix)] +mod unix_signal { + use super::*; + assert_value!(tokio::signal::unix::Signal: Send & Sync & Unpin); + assert_value!(tokio::signal::unix::SignalKind: Send & Sync & Unpin); + async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync & !Unpin); +} +#[cfg(windows)] +mod windows_signal { + use super::*; + assert_value!(tokio::signal::windows::CtrlC: Send & Sync & Unpin); + assert_value!(tokio::signal::windows::CtrlBreak: Send & Sync & Unpin); + async_assert_fn!(tokio::signal::windows::CtrlC::recv(_): Send & Sync & !Unpin); + async_assert_fn!(tokio::signal::windows::CtrlBreak::recv(_): Send & Sync & !Unpin); +} -async_assert_fn!(tokio::sync::mpsc::Receiver<u8>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::mpsc::Receiver<Cell<u8>>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::mpsc::Receiver<Rc<u8>>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::mpsc::Sender<u8>::send(_, u8): Send & Sync); -async_assert_fn!(tokio::sync::mpsc::Sender<Cell<u8>>::send(_, Cell<u8>): Send & !Sync); -async_assert_fn!(tokio::sync::mpsc::Sender<Rc<u8>>::send(_, Rc<u8>): !Send & !Sync); +assert_value!(tokio::sync::AcquireError: Send & Sync & Unpin); +assert_value!(tokio::sync::Barrier: Send & Sync & Unpin); +assert_value!(tokio::sync::BarrierWaitResult: Send & Sync & Unpin); +assert_value!(tokio::sync::MappedMutexGuard<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::MappedMutexGuard<'_, YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::MappedMutexGuard<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::Mutex<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::Mutex<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::Mutex<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::MutexGuard<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::MutexGuard<'_, YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::MutexGuard<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::Notify: Send & Sync & Unpin); +assert_value!(tokio::sync::OnceCell<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OnceCell<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::OnceCell<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::OwnedMutexGuard<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedMutexGuard<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedMutexGuard<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockReadGuard<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockReadGuard<YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockReadGuard<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockWriteGuard<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockWriteGuard<YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::OwnedRwLockWriteGuard<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::OwnedSemaphorePermit: Send & Sync & Unpin); +assert_value!(tokio::sync::RwLock<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLock<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLock<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::RwLockReadGuard<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockReadGuard<'_, YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockReadGuard<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::RwLockWriteGuard<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockWriteGuard<'_, YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::RwLockWriteGuard<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::Semaphore: Send & Sync & Unpin); +assert_value!(tokio::sync::SemaphorePermit<'_>: Send & Sync & Unpin); +assert_value!(tokio::sync::TryAcquireError: Send & Sync & Unpin); +assert_value!(tokio::sync::TryLockError: Send & Sync & Unpin); +assert_value!(tokio::sync::broadcast::Receiver<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::broadcast::Receiver<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::broadcast::Receiver<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::broadcast::Sender<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::broadcast::Sender<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::broadcast::Sender<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::futures::Notified<'_>: Send & Sync & !Unpin); +assert_value!(tokio::sync::mpsc::OwnedPermit<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::OwnedPermit<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::OwnedPermit<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Permit<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::Permit<'_, YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Permit<'_, YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Receiver<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::Receiver<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Receiver<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Sender<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::Sender<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::Sender<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedReceiver<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedReceiver<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedReceiver<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedSender<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedSender<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::UnboundedSender<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendError<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendError<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendError<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendTimeoutError<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendTimeoutError<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::SendTimeoutError<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::TrySendError<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::TrySendError<YN>: Send & !Sync & Unpin); +assert_value!(tokio::sync::mpsc::error::TrySendError<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::oneshot::Receiver<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::oneshot::Receiver<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::oneshot::Receiver<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::oneshot::Sender<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::oneshot::Sender<YN>: Send & Sync & Unpin); +assert_value!(tokio::sync::oneshot::Sender<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::watch::Receiver<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Receiver<YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Receiver<YY>: Send & Sync & Unpin); +assert_value!(tokio::sync::watch::Ref<'_, NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Ref<'_, YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Ref<'_, YY>: !Send & Sync & Unpin); +assert_value!(tokio::sync::watch::Sender<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Sender<YN>: !Send & !Sync & Unpin); +assert_value!(tokio::sync::watch::Sender<YY>: Send & Sync & Unpin); +async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<NN>::lock(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<NN>::lock_owned(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<YN>::lock(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<YN>::lock_owned(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<YY>::lock(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Mutex<YY>::lock_owned(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN> + Send + Sync>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN> + Send>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>> + Send + Sync>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>> + Send>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN> + Send + Sync>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN> + Send>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>> + Send + Sync>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>> + Send>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY> + Send + Sync>>): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY> + Send>>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>> + Send + Sync>>): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>> + Send>>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>>>>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<NN>::read(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<NN>::write(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<YN>::read(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<YN>::write(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<YY>::read(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::RwLock<YY>::write(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Semaphore::acquire(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Semaphore::acquire_many(_, u32): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Semaphore::acquire_many_owned(_, u32): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::Semaphore::acquire_owned(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::broadcast::Receiver<NN>::recv(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::broadcast::Receiver<YN>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::broadcast::Receiver<YY>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Receiver<NN>::recv(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Receiver<YN>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Receiver<YY>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<NN>::closed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<NN>::reserve(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<NN>::reserve_owned(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<NN>::send(_, NN): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<NN>::send_timeout(_, NN, Duration): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YN>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YN>::reserve(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YN>::reserve_owned(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YN>::send(_, YN): Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YN>::send_timeout(_, YN, Duration): Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YY>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YY>::reserve(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YY>::reserve_owned(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YY>::send(_, YY): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::Sender<YY>::send_timeout(_, YY, Duration): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<NN>::recv(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<YN>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<YY>::recv(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedSender<NN>::closed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedSender<YN>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::mpsc::UnboundedSender<YY>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::oneshot::Sender<NN>::closed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::oneshot::Sender<YN>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::oneshot::Sender<YY>::closed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Receiver<NN>::changed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Receiver<YN>::changed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Receiver<YY>::changed(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Sender<NN>::closed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Sender<YN>::closed(_): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::sync::watch::Sender<YY>::closed(_): Send & Sync & !Unpin); -async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<u8>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Cell<u8>>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Rc<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSync<()>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSend<()>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFuture<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSync<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSend<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFuture<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::LocalSet::run_until(_, BoxFutureSync<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::task::unconstrained(BoxFuture<()>): !Send & !Sync & Unpin); +async_assert_fn!(tokio::task::unconstrained(BoxFutureSend<()>): Send & !Sync & Unpin); +async_assert_fn!(tokio::task::unconstrained(BoxFutureSync<()>): Send & Sync & Unpin); +assert_value!(tokio::task::LocalSet: !Send & !Sync & Unpin); +assert_value!(tokio::task::JoinHandle<YY>: Send & Sync & Unpin); +assert_value!(tokio::task::JoinHandle<YN>: Send & Sync & Unpin); +assert_value!(tokio::task::JoinHandle<NN>: !Send & !Sync & Unpin); +assert_value!(tokio::task::JoinError: Send & Sync & Unpin); -async_assert_fn!(tokio::sync::watch::Receiver<u8>::changed(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync); +assert_value!(tokio::runtime::Builder: Send & Sync & Unpin); +assert_value!(tokio::runtime::EnterGuard<'_>: Send & Sync & Unpin); +assert_value!(tokio::runtime::Handle: Send & Sync & Unpin); +assert_value!(tokio::runtime::Runtime: Send & Sync & Unpin); -async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = u8> + Send + Sync>>): Send & Sync); -async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = u8>>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init( - _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync); -assert_value!(tokio::sync::OnceCell<u8>: Send & Sync); -assert_value!(tokio::sync::OnceCell<Cell<u8>>: Send & !Sync); -assert_value!(tokio::sync::OnceCell<Rc<u8>>: !Send & !Sync); +assert_value!(tokio::time::Interval: Send & Sync & Unpin); +assert_value!(tokio::time::Instant: Send & Sync & Unpin); +assert_value!(tokio::time::Sleep: Send & Sync & !Unpin); +assert_value!(tokio::time::Timeout<BoxFutureSync<()>>: Send & Sync & !Unpin); +assert_value!(tokio::time::Timeout<BoxFutureSend<()>>: Send & !Sync & !Unpin); +assert_value!(tokio::time::Timeout<BoxFuture<()>>: !Send & !Sync & !Unpin); +assert_value!(tokio::time::error::Elapsed: Send & Sync & Unpin); +assert_value!(tokio::time::error::Error: Send & Sync & Unpin); +async_assert_fn!(tokio::time::advance(Duration): Send & Sync & !Unpin); +async_assert_fn!(tokio::time::sleep(Duration): Send & Sync & !Unpin); +async_assert_fn!(tokio::time::sleep_until(Instant): Send & Sync & !Unpin); +async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSync<()>): Send & Sync & !Unpin); +async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSend<()>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Sync & !Unpin); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync & !Unpin); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync & !Unpin); +async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync & !Unpin); -async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync); -async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSync<()>): Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSend<()>): Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFuture<()>): !Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSync<()>): !Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSend<()>): !Send & !Sync); -async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFuture<()>): !Send & !Sync); -async_assert_fn!(tokio::task::LocalSet::run_until(_, BoxFutureSync<()>): !Send & !Sync); -assert_value!(tokio::task::LocalSet: !Send & !Sync); +assert_value!(tokio::io::BufReader<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::BufStream<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::BufWriter<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::DuplexStream: Send & Sync & Unpin); +assert_value!(tokio::io::Empty: Send & Sync & Unpin); +assert_value!(tokio::io::Interest: Send & Sync & Unpin); +assert_value!(tokio::io::Lines<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::ReadBuf<'_>: Send & Sync & Unpin); +assert_value!(tokio::io::ReadHalf<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::Ready: Send & Sync & Unpin); +assert_value!(tokio::io::Repeat: Send & Sync & Unpin); +assert_value!(tokio::io::Sink: Send & Sync & Unpin); +assert_value!(tokio::io::Split<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::Stderr: Send & Sync & Unpin); +assert_value!(tokio::io::Stdin: Send & Sync & Unpin); +assert_value!(tokio::io::Stdout: Send & Sync & Unpin); +assert_value!(tokio::io::Take<TcpStream>: Send & Sync & Unpin); +assert_value!(tokio::io::WriteHalf<TcpStream>: Send & Sync & Unpin); +async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::copy_bidirectional(&mut TcpStream, &mut TcpStream): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::copy_buf(&mut tokio::io::BufReader<TcpStream>, &mut TcpStream): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::empty(): Send & Sync & Unpin); +async_assert_fn!(tokio::io::repeat(u8): Send & Sync & Unpin); +async_assert_fn!(tokio::io::sink(): Send & Sync & Unpin); +async_assert_fn!(tokio::io::split(TcpStream): Send & Sync & Unpin); +async_assert_fn!(tokio::io::stderr(): Send & Sync & Unpin); +async_assert_fn!(tokio::io::stdin(): Send & Sync & Unpin); +async_assert_fn!(tokio::io::stdout(): Send & Sync & Unpin); +async_assert_fn!(tokio::io::Split<tokio::io::BufReader<TcpStream>>::next_segment(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::Lines<tokio::io::BufReader<TcpStream>>::next_line(_): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec<u8>): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::AsyncBufReadExt::fill_buf(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_buf(&mut BoxAsyncRead, &mut Vec<u8>): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::AsyncReadExt::read_exact(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::AsyncReadExt::read_u8(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i8(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u16(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i16(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u32(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i32(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u64(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i64(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u128(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i128(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_f32(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_f64(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u16_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i16_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u32_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i32_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u64_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i64_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u128_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i128_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_f32_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_f64_le(&mut BoxAsyncRead): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_to_end(&mut BoxAsyncRead, &mut Vec<u8>): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::AsyncReadExt::read_to_string(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::AsyncSeekExt::seek(&mut BoxAsyncSeek, SeekFrom): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncSeekExt::stream_position(&mut BoxAsyncSeek): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write(&mut BoxAsyncWrite, &[u8]): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_vectored(&mut BoxAsyncWrite, _): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_buf(&mut BoxAsyncWrite, &mut bytes::Bytes): Send + & Sync + & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_all_buf(&mut BoxAsyncWrite, &mut bytes::Bytes): Send + & Sync + & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_all(&mut BoxAsyncWrite, &[u8]): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u8(&mut BoxAsyncWrite, u8): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i8(&mut BoxAsyncWrite, i8): Send & Sync & !Unpin); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u16(&mut BoxAsyncWrite, u16): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i16(&mut BoxAsyncWrite, i16): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u32(&mut BoxAsyncWrite, u32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i32(&mut BoxAsyncWrite, i32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u64(&mut BoxAsyncWrite, u64): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i64(&mut BoxAsyncWrite, i64): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u128(&mut BoxAsyncWrite, u128): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i128(&mut BoxAsyncWrite, i128): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_f32(&mut BoxAsyncWrite, f32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_f64(&mut BoxAsyncWrite, f64): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u16_le(&mut BoxAsyncWrite, u16): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i16_le(&mut BoxAsyncWrite, i16): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u32_le(&mut BoxAsyncWrite, u32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i32_le(&mut BoxAsyncWrite, i32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u64_le(&mut BoxAsyncWrite, u64): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i64_le(&mut BoxAsyncWrite, i64): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_u128_le(&mut BoxAsyncWrite, u128): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_i128_le(&mut BoxAsyncWrite, i128): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_f32_le(&mut BoxAsyncWrite, f32): Send & Sync & !Unpin +); +async_assert_fn!( + tokio::io::AsyncWriteExt::write_f64_le(&mut BoxAsyncWrite, f64): Send & Sync & !Unpin +); +async_assert_fn!(tokio::io::AsyncWriteExt::flush(&mut BoxAsyncWrite): Send & Sync & !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::shutdown(&mut BoxAsyncWrite): Send & Sync & !Unpin); -async_assert_fn!(tokio::time::advance(Duration): Send & Sync); -async_assert_fn!(tokio::time::sleep(Duration): Send & Sync); -async_assert_fn!(tokio::time::sleep_until(Instant): Send & Sync); -async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSync<()>): Send & Sync); -async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSend<()>): Send & !Sync); -async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Send & !Sync); -async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Sync); -async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync); -async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync); -async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync); +#[cfg(unix)] +mod unix_asyncfd { + use super::*; + use tokio::io::unix::*; -assert_value!(tokio::time::Interval: Unpin); -async_assert_fn!(tokio::time::sleep(Duration): !Unpin); -async_assert_fn!(tokio::time::sleep_until(Instant): !Unpin); -async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Unpin); -async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Unpin); -async_assert_fn!(tokio::time::Interval::tick(_): !Unpin); -async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec<u8>): !Unpin); -async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_exact(&mut BoxAsyncRead, &mut [u8]): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u8(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i8(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u16(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i16(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u32(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i32(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u64(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i64(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u128(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i128(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u16_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i16_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u32_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i32_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u64_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i64_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_u128_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_i128_le(&mut BoxAsyncRead): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_to_end(&mut BoxAsyncRead, &mut Vec<u8>): !Unpin); -async_assert_fn!(tokio::io::AsyncReadExt::read_to_string(&mut BoxAsyncRead, &mut String): !Unpin); -async_assert_fn!(tokio::io::AsyncSeekExt::seek(&mut BoxAsyncSeek, SeekFrom): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write(&mut BoxAsyncWrite, &[u8]): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_all(&mut BoxAsyncWrite, &[u8]): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u8(&mut BoxAsyncWrite, u8): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i8(&mut BoxAsyncWrite, i8): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u16(&mut BoxAsyncWrite, u16): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i16(&mut BoxAsyncWrite, i16): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u32(&mut BoxAsyncWrite, u32): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i32(&mut BoxAsyncWrite, i32): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u64(&mut BoxAsyncWrite, u64): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i64(&mut BoxAsyncWrite, i64): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u128(&mut BoxAsyncWrite, u128): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i128(&mut BoxAsyncWrite, i128): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u16_le(&mut BoxAsyncWrite, u16): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i16_le(&mut BoxAsyncWrite, i16): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u32_le(&mut BoxAsyncWrite, u32): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i32_le(&mut BoxAsyncWrite, i32): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u64_le(&mut BoxAsyncWrite, u64): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i64_le(&mut BoxAsyncWrite, i64): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_u128_le(&mut BoxAsyncWrite, u128): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::write_i128_le(&mut BoxAsyncWrite, i128): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::flush(&mut BoxAsyncWrite): !Unpin); -async_assert_fn!(tokio::io::AsyncWriteExt::shutdown(&mut BoxAsyncWrite): !Unpin); + struct ImplsFd<T> { + _t: T, + } + impl<T> std::os::unix::io::AsRawFd for ImplsFd<T> { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + unreachable!() + } + } + + assert_value!(AsyncFd<ImplsFd<YY>>: Send & Sync & Unpin); + assert_value!(AsyncFd<ImplsFd<YN>>: Send & !Sync & Unpin); + assert_value!(AsyncFd<ImplsFd<NN>>: !Send & !Sync & Unpin); + assert_value!(AsyncFdReadyGuard<'_, ImplsFd<YY>>: Send & Sync & Unpin); + assert_value!(AsyncFdReadyGuard<'_, ImplsFd<YN>>: !Send & !Sync & Unpin); + assert_value!(AsyncFdReadyGuard<'_, ImplsFd<NN>>: !Send & !Sync & Unpin); + assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<YY>>: Send & Sync & Unpin); + assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<YN>>: Send & !Sync & Unpin); + assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<NN>>: !Send & !Sync & Unpin); + assert_value!(TryIoError: Send & Sync & Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YY>>::readable(_): Send & Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YY>>::readable_mut(_): Send & Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YY>>::writable(_): Send & Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YY>>::writable_mut(_): Send & Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YN>>::readable(_): !Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YN>>::readable_mut(_): Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YN>>::writable(_): !Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<YN>>::writable_mut(_): Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<NN>>::readable(_): !Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<NN>>::readable_mut(_): !Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<NN>>::writable(_): !Send & !Sync & !Unpin); + async_assert_fn!(AsyncFd<ImplsFd<NN>>::writable_mut(_): !Send & !Sync & !Unpin); +} diff --git a/tests/fs_file_mocked.rs b/tests/fs_file_mocked.rs deleted file mode 100644 index 7771532..0000000 --- a/tests/fs_file_mocked.rs +++ /dev/null @@ -1,780 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "full")] - -macro_rules! ready { - ($e:expr $(,)?) => { - match $e { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - } - }; -} - -#[macro_export] -macro_rules! cfg_fs { - ($($item:item)*) => { $($item)* } -} - -#[macro_export] -macro_rules! cfg_io_std { - ($($item:item)*) => { $($item)* } -} - -use futures::future; - -// Load source -#[allow(warnings)] -#[path = "../src/fs/file.rs"] -mod file; -use file::File; - -#[allow(warnings)] -#[path = "../src/io/blocking.rs"] -mod blocking; - -// Load mocked types -mod support { - pub(crate) mod mock_file; - pub(crate) mod mock_pool; -} -pub(crate) use support::mock_pool as pool; - -// Place them where the source expects them -pub(crate) mod io { - pub(crate) use tokio::io::*; - - pub(crate) use crate::blocking; - - pub(crate) mod sys { - pub(crate) use crate::support::mock_pool::{run, Blocking}; - } -} -pub(crate) mod fs { - pub(crate) mod sys { - pub(crate) use crate::support::mock_file::File; - pub(crate) use crate::support::mock_pool::{run, Blocking}; - } - - pub(crate) use crate::support::mock_pool::asyncify; -} -pub(crate) mod sync { - pub(crate) use tokio::sync::Mutex; -} -use fs::sys; - -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; - -use std::io::SeekFrom; - -const HELLO: &[u8] = b"hello world..."; -const FOO: &[u8] = b"foo bar baz..."; - -#[test] -fn open_read() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO); - - let mut file = File::from_std(file); - - let mut buf = [0; 1024]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_eq!(0, pool::len()); - assert_pending!(t.poll()); - - assert_eq!(1, mock.remaining()); - assert_eq!(1, pool::len()); - - pool::run_one(); - - assert_eq!(0, mock.remaining()); - assert!(t.is_woken()); - - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, HELLO.len()); - assert_eq!(&buf[..n], HELLO); -} - -#[test] -fn read_twice_before_dispatch() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO); - - let mut file = File::from_std(file); - - let mut buf = [0; 1024]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_pending!(t.poll()); - assert_pending!(t.poll()); - - assert_eq!(pool::len(), 1); - pool::run_one(); - - assert!(t.is_woken()); - - let n = assert_ready_ok!(t.poll()); - assert_eq!(&buf[..n], HELLO); -} - -#[test] -fn read_with_smaller_buf() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO); - - let mut file = File::from_std(file); - - { - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - } - - pool::run_one(); - - { - let mut buf = [0; 4]; - let mut t = task::spawn(file.read(&mut buf)); - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, 4); - assert_eq!(&buf[..], &HELLO[..n]); - } - - // Calling again immediately succeeds with the rest of the buffer - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, 10); - assert_eq!(&buf[..n], &HELLO[4..]); - - assert_eq!(0, pool::len()); -} - -#[test] -fn read_with_bigger_buf() { - let (mock, file) = sys::File::mock(); - mock.read(&HELLO[..4]).read(&HELLO[4..]); - - let mut file = File::from_std(file); - - { - let mut buf = [0; 4]; - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - } - - pool::run_one(); - - { - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, 4); - assert_eq!(&buf[..n], &HELLO[..n]); - } - - // Calling again immediately succeeds with the rest of the buffer - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, 10); - assert_eq!(&buf[..n], &HELLO[4..]); - - assert_eq!(0, pool::len()); -} - -#[test] -fn read_err_then_read_success() { - let (mock, file) = sys::File::mock(); - mock.read_err().read(&HELLO); - - let mut file = File::from_std(file); - - { - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - - pool::run_one(); - - assert_ready_err!(t.poll()); - } - - { - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - - pool::run_one(); - - let n = assert_ready_ok!(t.poll()); - - assert_eq!(n, HELLO.len()); - assert_eq!(&buf[..n], HELLO); - } -} - -#[test] -fn open_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - - assert_eq!(0, pool::len()); - assert_ready_ok!(t.poll()); - - assert_eq!(1, mock.remaining()); - assert_eq!(1, pool::len()); - - pool::run_one(); - - assert_eq!(0, mock.remaining()); - assert!(!t.is_woken()); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn flush_while_idle() { - let (_mock, file) = sys::File::mock(); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn read_with_buffer_larger_than_max() { - // Chunks - let chunk_a = 16 * 1024; - let chunk_b = chunk_a * 2; - let chunk_c = chunk_a * 3; - let chunk_d = chunk_a * 4; - - assert_eq!(chunk_d / 1024, 64); - - let mut data = vec![]; - for i in 0..(chunk_d - 1) { - data.push((i % 151) as u8); - } - - let (mock, file) = sys::File::mock(); - mock.read(&data[0..chunk_a]) - .read(&data[chunk_a..chunk_b]) - .read(&data[chunk_b..chunk_c]) - .read(&data[chunk_c..]); - - let mut file = File::from_std(file); - - let mut actual = vec![0; chunk_d]; - let mut pos = 0; - - while pos < data.len() { - let mut t = task::spawn(file.read(&mut actual[pos..])); - - assert_pending!(t.poll()); - pool::run_one(); - assert!(t.is_woken()); - - let n = assert_ready_ok!(t.poll()); - assert!(n <= chunk_a); - - pos += n; - } - - assert_eq!(mock.remaining(), 0); - assert_eq!(data, &actual[..data.len()]); -} - -#[test] -fn write_with_buffer_larger_than_max() { - // Chunks - let chunk_a = 16 * 1024; - let chunk_b = chunk_a * 2; - let chunk_c = chunk_a * 3; - let chunk_d = chunk_a * 4; - - assert_eq!(chunk_d / 1024, 64); - - let mut data = vec![]; - for i in 0..(chunk_d - 1) { - data.push((i % 151) as u8); - } - - let (mock, file) = sys::File::mock(); - mock.write(&data[0..chunk_a]) - .write(&data[chunk_a..chunk_b]) - .write(&data[chunk_b..chunk_c]) - .write(&data[chunk_c..]); - - let mut file = File::from_std(file); - - let mut rem = &data[..]; - - let mut first = true; - - while !rem.is_empty() { - let mut task = task::spawn(file.write(rem)); - - if !first { - assert_pending!(task.poll()); - pool::run_one(); - assert!(task.is_woken()); - } - - first = false; - - let n = assert_ready_ok!(task.poll()); - - rem = &rem[n..]; - } - - pool::run_one(); - - assert_eq!(mock.remaining(), 0); -} - -#[test] -fn write_twice_before_dispatch() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).write(FOO); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.write(FOO)); - assert_pending!(t.poll()); - - assert_eq!(pool::len(), 1); - pool::run_one(); - - assert!(t.is_woken()); - - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.flush()); - assert_pending!(t.poll()); - - assert_eq!(pool::len(), 1); - pool::run_one(); - - assert!(t.is_woken()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn incomplete_read_followed_by_write() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO) - .seek_current_ok(-(HELLO.len() as i64), 0) - .write(FOO); - - let mut file = File::from_std(file); - - let mut buf = [0; 32]; - - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_ok!(t.poll()); - - assert_eq!(pool::len(), 1); - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn incomplete_partial_read_followed_by_write() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO).seek_current_ok(-10, 0).write(FOO); - - let mut file = File::from_std(file); - - let mut buf = [0; 32]; - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - - pool::run_one(); - - let mut buf = [0; 4]; - let mut t = task::spawn(file.read(&mut buf)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_ok!(t.poll()); - - assert_eq!(pool::len(), 1); - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn incomplete_read_followed_by_flush() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO) - .seek_current_ok(-(HELLO.len() as i64), 0) - .write(FOO); - - let mut file = File::from_std(file); - - let mut buf = [0; 32]; - - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); -} - -#[test] -fn incomplete_flush_followed_by_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).write(FOO); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - let n = assert_ready_ok!(t.poll()); - assert_eq!(n, HELLO.len()); - - let mut t = task::spawn(file.flush()); - assert_pending!(t.poll()); - - // TODO: Move under write - pool::run_one(); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn read_err() { - let (mock, file) = sys::File::mock(); - mock.read_err(); - - let mut file = File::from_std(file); - - let mut buf = [0; 1024]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_pending!(t.poll()); - - pool::run_one(); - assert!(t.is_woken()); - - assert_ready_err!(t.poll()); -} - -#[test] -fn write_write_err() { - let (mock, file) = sys::File::mock(); - mock.write_err(); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_err!(t.poll()); -} - -#[test] -fn write_read_write_err() { - let (mock, file) = sys::File::mock(); - mock.write_err().read(HELLO); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - let mut buf = [0; 1024]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_pending!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_err!(t.poll()); -} - -#[test] -fn write_read_flush_err() { - let (mock, file) = sys::File::mock(); - mock.write_err().read(HELLO); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - let mut buf = [0; 1024]; - let mut t = task::spawn(file.read(&mut buf)); - - assert_pending!(t.poll()); - - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_err!(t.poll()); -} - -#[test] -fn write_seek_write_err() { - let (mock, file) = sys::File::mock(); - mock.write_err().seek_start_ok(0); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - { - let mut t = task::spawn(file.seek(SeekFrom::Start(0))); - assert_pending!(t.poll()); - } - - pool::run_one(); - - let mut t = task::spawn(file.write(FOO)); - assert_ready_err!(t.poll()); -} - -#[test] -fn write_seek_flush_err() { - let (mock, file) = sys::File::mock(); - mock.write_err().seek_start_ok(0); - - let mut file = File::from_std(file); - - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - pool::run_one(); - - { - let mut t = task::spawn(file.seek(SeekFrom::Start(0))); - assert_pending!(t.poll()); - } - - pool::run_one(); - - let mut t = task::spawn(file.flush()); - assert_ready_err!(t.poll()); -} - -#[test] -fn sync_all_ordered_after_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).sync_all(); - - let mut file = File::from_std(file); - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.sync_all()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn sync_all_err_ordered_after_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).sync_all_err(); - - let mut file = File::from_std(file); - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.sync_all()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_ready_err!(t.poll()); -} - -#[test] -fn sync_data_ordered_after_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).sync_data(); - - let mut file = File::from_std(file); - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.sync_data()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn sync_data_err_ordered_after_write() { - let (mock, file) = sys::File::mock(); - mock.write(HELLO).sync_data_err(); - - let mut file = File::from_std(file); - let mut t = task::spawn(file.write(HELLO)); - assert_ready_ok!(t.poll()); - - let mut t = task::spawn(file.sync_data()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_pending!(t.poll()); - - assert_eq!(1, pool::len()); - pool::run_one(); - - assert!(t.is_woken()); - assert_ready_err!(t.poll()); -} - -#[test] -fn open_set_len_ok() { - let (mock, file) = sys::File::mock(); - mock.set_len(123); - - let file = File::from_std(file); - let mut t = task::spawn(file.set_len(123)); - - assert_pending!(t.poll()); - assert_eq!(1, mock.remaining()); - - pool::run_one(); - assert_eq!(0, mock.remaining()); - - assert!(t.is_woken()); - assert_ready_ok!(t.poll()); -} - -#[test] -fn open_set_len_err() { - let (mock, file) = sys::File::mock(); - mock.set_len_err(123); - - let file = File::from_std(file); - let mut t = task::spawn(file.set_len(123)); - - assert_pending!(t.poll()); - assert_eq!(1, mock.remaining()); - - pool::run_one(); - assert_eq!(0, mock.remaining()); - - assert!(t.is_woken()); - assert_ready_err!(t.poll()); -} - -#[test] -fn partial_read_set_len_ok() { - let (mock, file) = sys::File::mock(); - mock.read(HELLO) - .seek_current_ok(-14, 0) - .set_len(123) - .read(FOO); - - let mut buf = [0; 32]; - let mut file = File::from_std(file); - - { - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - } - - pool::run_one(); - - { - let mut t = task::spawn(file.set_len(123)); - - assert_pending!(t.poll()); - pool::run_one(); - assert_ready_ok!(t.poll()); - } - - let mut t = task::spawn(file.read(&mut buf)); - assert_pending!(t.poll()); - pool::run_one(); - let n = assert_ready_ok!(t.poll()); - - assert_eq!(n, FOO.len()); - assert_eq!(&buf[..n], FOO); -} diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs index d1586bb..dc21e42 100644 --- a/tests/io_async_fd.rs +++ b/tests/io_async_fd.rs @@ -13,7 +13,6 @@ use std::{ task::{Context, Waker}, }; -use nix::errno::Errno; use nix::unistd::{close, read, write}; use futures::{poll, FutureExt}; @@ -56,10 +55,6 @@ impl TestWaker { } } -fn is_blocking(e: &nix::Error) -> bool { - Some(Errno::EAGAIN) == e.as_errno() -} - #[derive(Debug)] struct FileDescriptor { fd: RawFd, @@ -73,11 +68,7 @@ impl AsRawFd for FileDescriptor { impl Read for &FileDescriptor { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - match read(self.fd, buf) { - Ok(n) => Ok(n), - Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()), - Err(e) => Err(io::Error::new(ErrorKind::Other, e)), - } + read(self.fd, buf).map_err(io::Error::from) } } @@ -89,11 +80,7 @@ impl Read for FileDescriptor { impl Write for &FileDescriptor { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - match write(self.fd, buf) { - Ok(n) => Ok(n), - Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()), - Err(e) => Err(io::Error::new(ErrorKind::Other, e)), - } + write(self.fd, buf).map_err(io::Error::from) } fn flush(&mut self) -> io::Result<()> { diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs index c72c058..0d3f6ba 100644 --- a/tests/io_buf_reader.rs +++ b/tests/io_buf_reader.rs @@ -8,9 +8,11 @@ use std::cmp; use std::io::{self, Cursor}; use std::pin::Pin; use tokio::io::{ - AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, - ReadBuf, SeekFrom, + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, + BufReader, ReadBuf, SeekFrom, }; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; macro_rules! run_fill_buf { ($reader:expr) => {{ @@ -348,3 +350,30 @@ async fn maybe_pending_seek() { Pin::new(&mut reader).consume(1); assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); } + +// This tests the AsyncBufReadExt::fill_buf wrapper. +#[tokio::test] +async fn test_fill_buf_wrapper() { + let (mut write, read) = tokio::io::duplex(16); + + let mut read = BufReader::new(read); + write.write_all(b"hello world").await.unwrap(); + + assert_eq!(read.fill_buf().await.unwrap(), b"hello world"); + read.consume(b"hello ".len()); + assert_eq!(read.fill_buf().await.unwrap(), b"world"); + assert_eq!(read.fill_buf().await.unwrap(), b"world"); + read.consume(b"world".len()); + + let mut fill = spawn(read.fill_buf()); + assert_pending!(fill.poll()); + + write.write_all(b"foo bar").await.unwrap(); + assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar"); + drop(fill); + + drop(write); + assert_eq!(read.fill_buf().await.unwrap(), b"foo bar"); + read.consume(b"foo bar".len()); + assert_eq!(read.fill_buf().await.unwrap(), b""); +} diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs index 6f4f10a..47a0d46 100644 --- a/tests/io_buf_writer.rs +++ b/tests/io_buf_writer.rs @@ -8,6 +8,17 @@ use std::io::{self, Cursor}; use std::pin::Pin; use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom}; +use futures::future; +use tokio_test::assert_ok; + +use std::cmp; +use std::io::IoSlice; + +mod support { + pub(crate) mod io_vec; +} +use support::io_vec::IoBufs; + struct MaybePending { inner: Vec<u8>, ready: bool, @@ -47,6 +58,14 @@ impl AsyncWrite for MaybePending { } } +async fn write_vectored<W>(writer: &mut W, bufs: &[IoSlice<'_>]) -> io::Result<usize> +where + W: AsyncWrite + Unpin, +{ + let mut writer = Pin::new(writer); + future::poll_fn(|cx| writer.as_mut().poll_write_vectored(cx, bufs)).await +} + #[tokio::test] async fn buf_writer() { let mut writer = BufWriter::with_capacity(2, Vec::new()); @@ -249,3 +268,270 @@ async fn maybe_pending_buf_writer_seek() { &[0, 1, 8, 9, 4, 5, 6, 7] ); } + +struct MockWriter { + data: Vec<u8>, + write_len: usize, + vectored: bool, +} + +impl MockWriter { + fn new(write_len: usize) -> Self { + MockWriter { + data: Vec::new(), + write_len, + vectored: false, + } + } + + fn vectored(write_len: usize) -> Self { + MockWriter { + data: Vec::new(), + write_len, + vectored: true, + } + } + + fn write_up_to(&mut self, buf: &[u8], limit: usize) -> usize { + let len = cmp::min(buf.len(), limit); + self.data.extend_from_slice(&buf[..len]); + len + } +} + +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, io::Error>> { + let this = self.get_mut(); + let n = this.write_up_to(buf, this.write_len); + Ok(n).into() + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<Result<usize, io::Error>> { + let this = self.get_mut(); + let mut total_written = 0; + for buf in bufs { + let n = this.write_up_to(buf, this.write_len - total_written); + total_written += n; + if total_written == this.write_len { + break; + } + } + Ok(total_written).into() + } + + fn is_write_vectored(&self) -> bool { + self.vectored + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Ok(()).into() + } +} + +#[tokio::test] +async fn write_vectored_empty_on_non_vectored() { + let mut w = BufWriter::new(MockWriter::new(4)); + let n = assert_ok!(write_vectored(&mut w, &[]).await); + assert_eq!(n, 0); + + let io_vec = [IoSlice::new(&[]); 3]; + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 0); + + assert_ok!(w.flush().await); + assert!(w.get_ref().data.is_empty()); +} + +#[tokio::test] +async fn write_vectored_empty_on_vectored() { + let mut w = BufWriter::new(MockWriter::vectored(4)); + let n = assert_ok!(write_vectored(&mut w, &[]).await); + assert_eq!(n, 0); + + let io_vec = [IoSlice::new(&[]); 3]; + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 0); + + assert_ok!(w.flush().await); + assert!(w.get_ref().data.is_empty()); +} + +#[tokio::test] +async fn write_vectored_basic_on_non_vectored() { + let msg = b"foo bar baz"; + let bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&msg[4..8]), + IoSlice::new(&msg[8..]), + ]; + let mut w = BufWriter::new(MockWriter::new(4)); + let n = assert_ok!(write_vectored(&mut w, &bufs).await); + assert_eq!(n, msg.len()); + assert!(w.buffer() == &msg[..]); + assert_ok!(w.flush().await); + assert_eq!(w.get_ref().data, msg); +} + +#[tokio::test] +async fn write_vectored_basic_on_vectored() { + let msg = b"foo bar baz"; + let bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&msg[4..8]), + IoSlice::new(&msg[8..]), + ]; + let mut w = BufWriter::new(MockWriter::vectored(4)); + let n = assert_ok!(write_vectored(&mut w, &bufs).await); + assert_eq!(n, msg.len()); + assert!(w.buffer() == &msg[..]); + assert_ok!(w.flush().await); + assert_eq!(w.get_ref().data, msg); +} + +#[tokio::test] +async fn write_vectored_large_total_on_non_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&msg[4..8]), + IoSlice::new(&msg[8..]), + ]; + let io_vec = IoBufs::new(&mut bufs); + let mut w = BufWriter::with_capacity(8, MockWriter::new(4)); + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 8); + assert!(w.buffer() == &msg[..8]); + let io_vec = io_vec.advance(n); + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 3); + assert!(w.get_ref().data.as_slice() == &msg[..8]); + assert!(w.buffer() == &msg[8..]); +} + +#[tokio::test] +async fn write_vectored_large_total_on_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&msg[4..8]), + IoSlice::new(&msg[8..]), + ]; + let io_vec = IoBufs::new(&mut bufs); + let mut w = BufWriter::with_capacity(8, MockWriter::vectored(10)); + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 10); + assert!(w.buffer().is_empty()); + let io_vec = io_vec.advance(n); + let n = assert_ok!(write_vectored(&mut w, &io_vec).await); + assert_eq!(n, 1); + assert!(w.get_ref().data.as_slice() == &msg[..10]); + assert!(w.buffer() == &msg[10..]); +} + +struct VectoredWriteHarness { + writer: BufWriter<MockWriter>, + buf_capacity: usize, +} + +impl VectoredWriteHarness { + fn new(buf_capacity: usize) -> Self { + VectoredWriteHarness { + writer: BufWriter::with_capacity(buf_capacity, MockWriter::new(4)), + buf_capacity, + } + } + + fn with_vectored_backend(buf_capacity: usize) -> Self { + VectoredWriteHarness { + writer: BufWriter::with_capacity(buf_capacity, MockWriter::vectored(4)), + buf_capacity, + } + } + + async fn write_all<'a, 'b>(&mut self, mut io_vec: IoBufs<'a, 'b>) -> usize { + let mut total_written = 0; + while !io_vec.is_empty() { + let n = assert_ok!(write_vectored(&mut self.writer, &io_vec).await); + assert!(n != 0); + assert!(self.writer.buffer().len() <= self.buf_capacity); + total_written += n; + io_vec = io_vec.advance(n); + } + total_written + } + + async fn flush(&mut self) -> &[u8] { + assert_ok!(self.writer.flush().await); + &self.writer.get_ref().data + } +} + +#[tokio::test] +async fn write_vectored_odd_on_non_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&[]), + IoSlice::new(&msg[4..9]), + IoSlice::new(&msg[9..]), + ]; + let mut h = VectoredWriteHarness::new(8); + let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; + assert_eq!(bytes_written, msg.len()); + assert_eq!(h.flush().await, msg); +} + +#[tokio::test] +async fn write_vectored_odd_on_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&msg[0..4]), + IoSlice::new(&[]), + IoSlice::new(&msg[4..9]), + IoSlice::new(&msg[9..]), + ]; + let mut h = VectoredWriteHarness::with_vectored_backend(8); + let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; + assert_eq!(bytes_written, msg.len()); + assert_eq!(h.flush().await, msg); +} + +#[tokio::test] +async fn write_vectored_large_slice_on_non_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&[]), + IoSlice::new(&msg[..9]), + IoSlice::new(&msg[9..]), + ]; + let mut h = VectoredWriteHarness::new(8); + let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; + assert_eq!(bytes_written, msg.len()); + assert_eq!(h.flush().await, msg); +} + +#[tokio::test] +async fn write_vectored_large_slice_on_vectored() { + let msg = b"foo bar baz"; + let mut bufs = [ + IoSlice::new(&[]), + IoSlice::new(&msg[..9]), + IoSlice::new(&msg[9..]), + ]; + let mut h = VectoredWriteHarness::with_vectored_backend(8); + let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; + assert_eq!(bytes_written, msg.len()); + assert_eq!(h.flush().await, msg); +} diff --git a/tests/io_copy.rs b/tests/io_copy.rs index 9ed7995..005e170 100644 --- a/tests/io_copy.rs +++ b/tests/io_copy.rs @@ -1,7 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::{self, AsyncRead, ReadBuf}; +use bytes::BytesMut; +use futures::ready; +use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_test::assert_ok; use std::pin::Pin; @@ -34,3 +36,52 @@ async fn copy() { assert_eq!(n, 11); assert_eq!(wr, b"hello world"); } + +#[tokio::test] +async fn proxy() { + struct BufferedWd { + buf: BytesMut, + writer: io::DuplexStream, + } + + impl AsyncWrite for BufferedWd { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.get_mut().buf.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let this = self.get_mut(); + + while !this.buf.is_empty() { + let n = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf))?; + let _ = this.buf.split_to(n); + } + + Pin::new(&mut this.writer).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.writer).poll_shutdown(cx) + } + } + + let (rd, wd) = io::duplex(1024); + let mut rd = rd.take(1024); + let mut wd = BufferedWd { + buf: BytesMut::new(), + writer: wd, + }; + + // write start bytes + assert_ok!(wd.write_all(&[0x42; 512]).await); + assert_ok!(wd.flush().await); + + let n = assert_ok!(io::copy(&mut rd, &mut wd).await); + + assert_eq!(n, 1024); +} diff --git a/tests/io_copy_bidirectional.rs b/tests/io_copy_bidirectional.rs index 17c0597..0e82b29 100644 --- a/tests/io_copy_bidirectional.rs +++ b/tests/io_copy_bidirectional.rs @@ -26,7 +26,7 @@ async fn block_write(s: &mut TcpStream) -> usize { result = s.write(&BUF) => { copied += result.expect("write error") }, - _ = tokio::time::sleep(Duration::from_millis(100)) => { + _ = tokio::time::sleep(Duration::from_millis(10)) => { break; } } @@ -42,7 +42,7 @@ where { // We run the test twice, with streams passed to copy_bidirectional in // different orders, in order to ensure that the two arguments are - // interchangable. + // interchangeable. let (a, mut a1) = make_socketpair().await; let (b, mut b1) = make_socketpair().await; diff --git a/tests/io_split.rs b/tests/io_split.rs index db168e9..a012166 100644 --- a/tests/io_split.rs +++ b/tests/io_split.rs @@ -50,10 +50,10 @@ fn is_send_and_sync() { fn split_stream_id() { let (r1, w1) = split(RW); let (r2, w2) = split(RW); - assert_eq!(r1.is_pair_of(&w1), true); - assert_eq!(r1.is_pair_of(&w2), false); - assert_eq!(r2.is_pair_of(&w2), true); - assert_eq!(r2.is_pair_of(&w1), false); + assert!(r1.is_pair_of(&w1)); + assert!(!r1.is_pair_of(&w2)); + assert!(r2.is_pair_of(&w2)); + assert!(!r2.is_pair_of(&w1)); } #[test] diff --git a/tests/io_write_all_buf.rs b/tests/io_write_all_buf.rs index b49a58e..7c8b619 100644 --- a/tests/io_write_all_buf.rs +++ b/tests/io_write_all_buf.rs @@ -52,7 +52,7 @@ async fn write_all_buf() { assert_eq!(wr.buf, b"helloworld"[..]); // expect 4 writes, [hell],[o],[worl],[d] assert_eq!(wr.cnt, 4); - assert_eq!(buf.has_remaining(), false); + assert!(!buf.has_remaining()); } #[tokio::test] diff --git a/tests/macros_select.rs b/tests/macros_select.rs index a089602..4da88fb 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -360,7 +360,21 @@ async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; tokio::select! { - _ = time::sleep(Duration::from_millis(50)), if false => { + _ = time::sleep(Duration::from_millis(10)), if false => { + panic!("if condition ignored") + } + _ = async { 1u32 } => { + } + } +} + +#[tokio::test] +async fn use_future_in_if_condition_biased() { + use tokio::time::{self, Duration}; + + tokio::select! { + biased; + _ = time::sleep(Duration::from_millis(10)), if false => { panic!("if condition ignored") } _ = async { 1u32 } => { @@ -456,10 +470,7 @@ async fn require_mutable(_: &mut i32) {} async fn async_noop() {} async fn async_never() -> ! { - use tokio::time::Duration; - loop { - tokio::time::sleep(Duration::from_millis(10)).await; - } + futures::future::pending().await } // From https://github.com/tokio-rs/tokio/issues/2857 diff --git a/tests/named_pipe.rs b/tests/named_pipe.rs index 3f26767..2055c3c 100644 --- a/tests/named_pipe.rs +++ b/tests/named_pipe.rs @@ -126,7 +126,7 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { } // Wait for a named pipe to become available. - time::sleep(Duration::from_millis(50)).await; + time::sleep(Duration::from_millis(10)).await; }; let mut client = BufReader::new(client); @@ -148,6 +148,185 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { Ok(()) } +#[tokio::test] +async fn test_named_pipe_multi_client_ready() -> io::Result<()> { + use tokio::io::Interest; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; + const N: usize = 10; + + // The first server needs to be constructed early so that clients can + // be correctly connected. Otherwise calling .wait will cause the client to + // error. + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + for _ in 0..N { + // Wait for client to connect. + server.connect().await?; + + let inner_server = server; + + // Construct the next server to be connected before sending the one + // we already have of onto a task. This ensures that the server + // isn't closed (after it's done in the task) before a new one is + // available. Otherwise the client might error with + // `io::ErrorKind::NotFound`. + server = ServerOptions::new().create(PIPE_NAME)?; + + let _ = tokio::spawn(async move { + let server = inner_server; + + { + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + + loop { + server.readable().await?; + + let buf = &mut read_buf[read_buf_cursor..]; + + match server.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + }; + + { + let write_buf = b"pong\n"; + let mut write_buf_cursor = 0; + + loop { + server.writable().await?; + let buf = &write_buf[write_buf_cursor..]; + + match server.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + + if write_buf_cursor == write_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + Ok::<_, io::Error>(()) + }); + } + + Ok::<_, io::Error>(()) + }); + + let mut clients = Vec::new(); + + for _ in 0..N { + clients.push(tokio::spawn(async move { + // This showcases a generic connect loop. + // + // We immediately try to create a client, if it's not found or the + // pipe is busy we use the specialized wait function on the client + // builder. + let client = loop { + match ClientOptions::new().open(PIPE_NAME) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), + Err(e) if e.kind() == io::ErrorKind::NotFound => (), + Err(e) => return Err(e), + } + + // Wait for a named pipe to become available. + time::sleep(Duration::from_millis(10)).await; + }; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + let mut interest = Interest::READABLE; + if write_buf_cursor < write_buf.len() { + interest |= Interest::WRITABLE; + } + + let ready = client.ready(interest).await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + })); + } + + for client in clients { + let result = client.await?; + assert_eq!(result?, "pong\n"); + } + + server.await??; + Ok(()) +} + // This tests what happens when a client tries to disconnect. #[tokio::test] async fn test_named_pipe_mode_message() -> io::Result<()> { diff --git a/tests/no_rt.rs b/tests/no_rt.rs index 8437b80..6845850 100644 --- a/tests/no_rt.rs +++ b/tests/no_rt.rs @@ -26,7 +26,7 @@ fn panics_when_no_reactor() { async fn timeout_value() { let (_tx, rx) = oneshot::channel::<()>(); - let dur = Duration::from_millis(20); + let dur = Duration::from_millis(10); let _ = timeout(dur, rx).await; } diff --git a/tests/process_arg0.rs b/tests/process_arg0.rs new file mode 100644 index 0000000..4fabea0 --- /dev/null +++ b/tests/process_arg0.rs @@ -0,0 +1,13 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", unix))] + +use tokio::process::Command; + +#[tokio::test] +async fn arg0() { + let mut cmd = Command::new("sh"); + cmd.arg0("test_string").arg("-c").arg("echo $0"); + + let output = cmd.output().await.unwrap(); + assert_eq!(output.stdout, b"test_string\n"); +} diff --git a/tests/process_raw_handle.rs b/tests/process_raw_handle.rs new file mode 100644 index 0000000..727e66d --- /dev/null +++ b/tests/process_raw_handle.rs @@ -0,0 +1,23 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(windows)] + +use tokio::process::Command; +use winapi::um::processthreadsapi::GetProcessId; + +#[tokio::test] +async fn obtain_raw_handle() { + let mut cmd = Command::new("cmd"); + cmd.kill_on_drop(true); + cmd.arg("/c"); + cmd.arg("pause"); + + let child = cmd.spawn().unwrap(); + + let orig_id = child.id().expect("missing id"); + assert!(orig_id > 0); + + let handle = child.raw_handle().expect("process stopped"); + let handled_id = unsafe { GetProcessId(handle as _) }; + assert_eq!(handled_id, orig_id); +} diff --git a/tests/support/io_vec.rs b/tests/support/io_vec.rs new file mode 100644 index 0000000..4ea47c7 --- /dev/null +++ b/tests/support/io_vec.rs @@ -0,0 +1,45 @@ +use std::io::IoSlice; +use std::ops::Deref; +use std::slice; + +pub struct IoBufs<'a, 'b>(&'b mut [IoSlice<'a>]); + +impl<'a, 'b> IoBufs<'a, 'b> { + pub fn new(slices: &'b mut [IoSlice<'a>]) -> Self { + IoBufs(slices) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn advance(mut self, n: usize) -> IoBufs<'a, 'b> { + let mut to_remove = 0; + let mut remaining_len = n; + for slice in self.0.iter() { + if remaining_len < slice.len() { + break; + } else { + remaining_len -= slice.len(); + to_remove += 1; + } + } + self.0 = self.0.split_at_mut(to_remove).1; + if let Some(slice) = self.0.first_mut() { + let tail = &slice[remaining_len..]; + // Safety: recasts slice to the original lifetime + let tail = unsafe { slice::from_raw_parts(tail.as_ptr(), tail.len()) }; + *slice = IoSlice::new(tail); + } else if remaining_len != 0 { + panic!("advance past the end of the slice vector"); + } + self + } +} + +impl<'a, 'b> Deref for IoBufs<'a, 'b> { + type Target = [IoSlice<'a>]; + fn deref(&self) -> &[IoSlice<'a>] { + self.0 + } +} diff --git a/tests/support/mock_file.rs b/tests/support/mock_file.rs deleted file mode 100644 index 1ce326b..0000000 --- a/tests/support/mock_file.rs +++ /dev/null @@ -1,295 +0,0 @@ -#![allow(clippy::unnecessary_operation)] - -use std::collections::VecDeque; -use std::fmt; -use std::fs::{Metadata, Permissions}; -use std::io; -use std::io::prelude::*; -use std::io::SeekFrom; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - -pub struct File { - shared: Arc<Mutex<Shared>>, -} - -pub struct Handle { - shared: Arc<Mutex<Shared>>, -} - -struct Shared { - calls: VecDeque<Call>, -} - -#[derive(Debug)] -enum Call { - Read(io::Result<Vec<u8>>), - Write(io::Result<Vec<u8>>), - Seek(SeekFrom, io::Result<u64>), - SyncAll(io::Result<()>), - SyncData(io::Result<()>), - SetLen(u64, io::Result<()>), -} - -impl Handle { - pub fn read(&self, data: &[u8]) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls.push_back(Call::Read(Ok(data.to_owned()))); - self - } - - pub fn read_err(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::Read(Err(io::ErrorKind::Other.into()))); - self - } - - pub fn write(&self, data: &[u8]) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls.push_back(Call::Write(Ok(data.to_owned()))); - self - } - - pub fn write_err(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::Write(Err(io::ErrorKind::Other.into()))); - self - } - - pub fn seek_start_ok(&self, offset: u64) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::Seek(SeekFrom::Start(offset), Ok(offset))); - self - } - - pub fn seek_current_ok(&self, offset: i64, ret: u64) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::Seek(SeekFrom::Current(offset), Ok(ret))); - self - } - - pub fn sync_all(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls.push_back(Call::SyncAll(Ok(()))); - self - } - - pub fn sync_all_err(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::SyncAll(Err(io::ErrorKind::Other.into()))); - self - } - - pub fn sync_data(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls.push_back(Call::SyncData(Ok(()))); - self - } - - pub fn sync_data_err(&self) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::SyncData(Err(io::ErrorKind::Other.into()))); - self - } - - pub fn set_len(&self, size: u64) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls.push_back(Call::SetLen(size, Ok(()))); - self - } - - pub fn set_len_err(&self, size: u64) -> &Self { - let mut s = self.shared.lock().unwrap(); - s.calls - .push_back(Call::SetLen(size, Err(io::ErrorKind::Other.into()))); - self - } - - pub fn remaining(&self) -> usize { - let s = self.shared.lock().unwrap(); - s.calls.len() - } -} - -impl Drop for Handle { - fn drop(&mut self) { - if !std::thread::panicking() { - let s = self.shared.lock().unwrap(); - assert_eq!(0, s.calls.len()); - } - } -} - -impl File { - pub fn open(_: PathBuf) -> io::Result<File> { - unimplemented!(); - } - - pub fn create(_: PathBuf) -> io::Result<File> { - unimplemented!(); - } - - pub fn mock() -> (Handle, File) { - let shared = Arc::new(Mutex::new(Shared { - calls: VecDeque::new(), - })); - - let handle = Handle { - shared: shared.clone(), - }; - let file = File { shared }; - - (handle, file) - } - - pub fn sync_all(&self) -> io::Result<()> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(SyncAll(ret)) => ret, - Some(op) => panic!("expected next call to be {:?}; was sync_all", op), - None => panic!("did not expect call"), - } - } - - pub fn sync_data(&self) -> io::Result<()> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(SyncData(ret)) => ret, - Some(op) => panic!("expected next call to be {:?}; was sync_all", op), - None => panic!("did not expect call"), - } - } - - pub fn set_len(&self, size: u64) -> io::Result<()> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(SetLen(arg, ret)) => { - assert_eq!(arg, size); - ret - } - Some(op) => panic!("expected next call to be {:?}; was sync_all", op), - None => panic!("did not expect call"), - } - } - - pub fn metadata(&self) -> io::Result<Metadata> { - unimplemented!(); - } - - pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()> { - unimplemented!(); - } - - pub fn try_clone(&self) -> io::Result<Self> { - unimplemented!(); - } -} - -impl Read for &'_ File { - fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(Read(Ok(data))) => { - assert!(dst.len() >= data.len()); - assert!(dst.len() <= 16 * 1024, "actual = {}", dst.len()); // max buffer - - &mut dst[..data.len()].copy_from_slice(&data); - Ok(data.len()) - } - Some(Read(Err(e))) => Err(e), - Some(op) => panic!("expected next call to be {:?}; was a read", op), - None => panic!("did not expect call"), - } - } -} - -impl Write for &'_ File { - fn write(&mut self, src: &[u8]) -> io::Result<usize> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(Write(Ok(data))) => { - assert_eq!(src, &data[..]); - Ok(src.len()) - } - Some(Write(Err(e))) => Err(e), - Some(op) => panic!("expected next call to be {:?}; was write", op), - None => panic!("did not expect call"), - } - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Seek for &'_ File { - fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { - use self::Call::*; - - let mut s = self.shared.lock().unwrap(); - - match s.calls.pop_front() { - Some(Seek(expect, res)) => { - assert_eq!(expect, pos); - res - } - Some(op) => panic!("expected call {:?}; was `seek`", op), - None => panic!("did not expect call; was `seek`"), - } - } -} - -impl fmt::Debug for File { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("mock::File").finish() - } -} - -#[cfg(unix)] -impl std::os::unix::io::AsRawFd for File { - fn as_raw_fd(&self) -> std::os::unix::io::RawFd { - unimplemented!(); - } -} - -#[cfg(unix)] -impl std::os::unix::io::FromRawFd for File { - unsafe fn from_raw_fd(_: std::os::unix::io::RawFd) -> Self { - unimplemented!(); - } -} - -#[cfg(windows)] -impl std::os::windows::io::AsRawHandle for File { - fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { - unimplemented!(); - } -} - -#[cfg(windows)] -impl std::os::windows::io::FromRawHandle for File { - unsafe fn from_raw_handle(_: std::os::windows::io::RawHandle) -> Self { - unimplemented!(); - } -} diff --git a/tests/support/mock_pool.rs b/tests/support/mock_pool.rs deleted file mode 100644 index e1fdb42..0000000 --- a/tests/support/mock_pool.rs +++ /dev/null @@ -1,66 +0,0 @@ -use tokio::sync::oneshot; - -use std::cell::RefCell; -use std::collections::VecDeque; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -thread_local! { - static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new()) -} - -#[derive(Debug)] -pub(crate) struct Blocking<T> { - rx: oneshot::Receiver<T>, -} - -pub(crate) fn run<F, R>(f: F) -> Blocking<R> -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let (tx, rx) = oneshot::channel(); - let task = Box::new(move || { - let _ = tx.send(f()); - }); - - QUEUE.with(|cell| cell.borrow_mut().push_back(task)); - - Blocking { rx } -} - -impl<T> Future for Blocking<T> { - type Output = Result<T, io::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - use std::task::Poll::*; - - match Pin::new(&mut self.rx).poll(cx) { - Ready(Ok(v)) => Ready(Ok(v)), - Ready(Err(e)) => panic!("error = {:?}", e), - Pending => Pending, - } - } -} - -pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T> -where - F: FnOnce() -> io::Result<T> + Send + 'static, - T: Send + 'static, -{ - run(f).await? -} - -pub(crate) fn len() -> usize { - QUEUE.with(|cell| cell.borrow().len()) -} - -pub(crate) fn run_one() { - let task = QUEUE - .with(|cell| cell.borrow_mut().pop_front()) - .expect("expected task to run, but none ready"); - - task(); -} diff --git a/tests/sync_mutex.rs b/tests/sync_mutex.rs index 0ddb203..090db94 100644 --- a/tests/sync_mutex.rs +++ b/tests/sync_mutex.rs @@ -139,12 +139,12 @@ fn try_lock() { let m: Mutex<usize> = Mutex::new(0); { let g1 = m.try_lock(); - assert_eq!(g1.is_ok(), true); + assert!(g1.is_ok()); let g2 = m.try_lock(); - assert_eq!(g2.is_ok(), false); + assert!(!g2.is_ok()); } let g3 = m.try_lock(); - assert_eq!(g3.is_ok(), true); + assert!(g3.is_ok()); } #[tokio::test] diff --git a/tests/sync_mutex_owned.rs b/tests/sync_mutex_owned.rs index 0f1399c..898bf35 100644 --- a/tests/sync_mutex_owned.rs +++ b/tests/sync_mutex_owned.rs @@ -106,12 +106,12 @@ fn try_lock_owned() { let m: Arc<Mutex<usize>> = Arc::new(Mutex::new(0)); { let g1 = m.clone().try_lock_owned(); - assert_eq!(g1.is_ok(), true); + assert!(g1.is_ok()); let g2 = m.clone().try_lock_owned(); - assert_eq!(g2.is_ok(), false); + assert!(!g2.is_ok()); } let g3 = m.try_lock_owned(); - assert_eq!(g3.is_ok(), true); + assert!(g3.is_ok()); } #[tokio::test] diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs index 60f50d2..18eaf93 100644 --- a/tests/sync_once_cell.rs +++ b/tests/sync_once_cell.rs @@ -266,3 +266,9 @@ fn drop_into_inner_new_with() { let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 1); } + +#[test] +fn from() { + let cell = OnceCell::from(2); + assert_eq!(*cell.get().unwrap(), 2); +} diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs index e12052b..7d05086 100644 --- a/tests/sync_rwlock.rs +++ b/tests/sync_rwlock.rs @@ -50,8 +50,8 @@ fn read_exclusive_pending() { assert_pending!(t2.poll()); } -// If the max shared access is reached and subsquent shared access is pending -// should be made available when one of the shared acesses is dropped +// If the max shared access is reached and subsequent shared access is pending +// should be made available when one of the shared accesses is dropped #[test] fn exhaust_reading() { let rwlock = RwLock::with_max_readers(100, 1024); diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs index 9dcb0c5..a2a276d 100644 --- a/tests/sync_watch.rs +++ b/tests/sync_watch.rs @@ -169,3 +169,20 @@ fn poll_close() { assert!(tx.send("two").is_err()); } + +#[test] +fn borrow_and_update() { + let (tx, mut rx) = watch::channel("one"); + + tx.send("two").unwrap(); + assert_ready!(spawn(rx.changed()).poll()).unwrap(); + assert_pending!(spawn(rx.changed()).poll()); + + tx.send("three").unwrap(); + assert_eq!(*rx.borrow_and_update(), "three"); + assert_pending!(spawn(rx.changed()).poll()); + + drop(tx); + assert_eq!(*rx.borrow_and_update(), "three"); + assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); +} diff --git a/tests/task_abort.rs b/tests/task_abort.rs index 1d72ac3..06c61dc 100644 --- a/tests/task_abort.rs +++ b/tests/task_abort.rs @@ -1,11 +1,25 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::sync::Arc; +use std::thread::sleep; +use tokio::time::Duration; + +use tokio::runtime::Builder; + +struct PanicOnDrop; + +impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Well what did you expect would happen..."); + } +} + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>. #[test] fn test_abort_without_panic_3157() { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = Builder::new_multi_thread() .enable_time() .worker_threads(1) .build() @@ -14,11 +28,11 @@ fn test_abort_without_panic_3157() { rt.block_on(async move { let handle = tokio::spawn(async move { println!("task started"); - tokio::time::sleep(std::time::Duration::new(100, 0)).await + tokio::time::sleep(Duration::new(100, 0)).await }); // wait for task to sleep. - tokio::time::sleep(std::time::Duration::new(1, 0)).await; + tokio::time::sleep(Duration::from_millis(10)).await; handle.abort(); let _ = handle.await; @@ -41,9 +55,7 @@ fn test_abort_without_panic_3662() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); @@ -62,18 +74,16 @@ fn test_abort_without_panic_3662() { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be - // dropped (in this thread no less). + // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); - // TODO: is this guaranteed at this point? - // assert!(drop_flag2.load(Ordering::SeqCst)); j }) .join() .unwrap(); - assert!(drop_flag.load(Ordering::SeqCst)); let result = task.await; + assert!(drop_flag.load(Ordering::SeqCst)); assert!(result.unwrap_err().is_cancelled()); // Note: We do the following to trigger a deferred task cleanup. @@ -82,7 +92,7 @@ fn test_abort_without_panic_3662() { // `Inner::block_on` of `basic_scheduler.rs`. // // We cause the cleanup to happen by having a poll return Pending once - // so that the scheduler can go into the "auxilliary tasks" mode, at + // so that the scheduler can go into the "auxiliary tasks" mode, at // which point the task is removed from the scheduler. let i = tokio::spawn(async move { tokio::task::yield_now().await; @@ -91,3 +101,126 @@ fn test_abort_without_panic_3662() { i.await.unwrap(); }); } + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// <https://github.com/tokio-rs/tokio/issues/3929> +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = Builder::new_current_thread().build().unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(10)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +} + +/// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped. +/// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>. +#[test] +fn test_abort_wakes_task_3964() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let notify_dropped = Arc::new(()); + let weak_notify_dropped = Arc::downgrade(¬ify_dropped); + + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _notify_dropped = notify_dropped; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + drop(handle); + + // wait for task to abort. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Check that the Arc has been dropped. + assert!(weak_notify_dropped.upgrade().is_none()); + }); +} + +/// Checks that aborting a task whose destructor panics does not allow the +/// panic to escape the task. +#[test] +#[cfg(not(target_os = "android"))] +fn test_abort_task_that_panics_on_drop_contained() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _panic_dropped = PanicOnDrop; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + drop(handle); + + // wait for task to abort. + tokio::time::sleep(Duration::from_millis(10)).await; + }); +} + +/// Checks that aborting a task whose destructor panics has the expected result. +#[test] +#[cfg(not(target_os = "android"))] +fn test_abort_task_that_panics_on_drop_returned() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _panic_dropped = PanicOnDrop; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + assert!(handle.await.unwrap_err().is_panic()); + }); +} diff --git a/tests/task_builder.rs b/tests/task_builder.rs new file mode 100644 index 0000000..1499abf --- /dev/null +++ b/tests/task_builder.rs @@ -0,0 +1,67 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +mod tests { + use std::rc::Rc; + use tokio::{ + task::{Builder, LocalSet}, + test, + }; + + #[test] + async fn spawn_with_name() { + let result = Builder::new() + .name("name") + .spawn(async { "task executed" }) + .await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_blocking_with_name() { + let result = Builder::new() + .name("name") + .spawn_blocking(|| "task executed") + .await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_local_with_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { + Builder::new() + .name("name") + .spawn_local(async move { unsend_data }) + .await + }) + .await; + + assert_eq!(*result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_without_name() { + let result = Builder::new().spawn(async { "task executed" }).await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_blocking_without_name() { + let result = Builder::new().spawn_blocking(|| "task executed").await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_local_without_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await }) + .await; + + assert_eq!(*result.unwrap(), "task executed"); + } +} diff --git a/tests/task_local_set.rs b/tests/task_local_set.rs index 8513609..f8a35d0 100644 --- a/tests/task_local_set.rs +++ b/tests/task_local_set.rs @@ -67,11 +67,11 @@ async fn localset_future_timers() { let local = LocalSet::new(); local.spawn_local(async move { - time::sleep(Duration::from_millis(10)).await; + time::sleep(Duration::from_millis(5)).await; RAN1.store(true, Ordering::SeqCst); }); local.spawn_local(async move { - time::sleep(Duration::from_millis(20)).await; + time::sleep(Duration::from_millis(10)).await; RAN2.store(true, Ordering::SeqCst); }); local.await; @@ -299,9 +299,7 @@ fn drop_cancels_tasks() { let _rc2 = rc2; started_tx.send(()).unwrap(); - loop { - time::sleep(Duration::from_secs(3600)).await; - } + futures::future::pending::<()>().await; }); local.block_on(&rt, async { @@ -334,7 +332,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) { // something we can easily make assertions about, we'll run it in a // thread. When the test thread finishes, it will send a message on a // channel to this thread. We'll wait for that message with a fairly - // generous timeout, and if we don't recieve it, we assume the test + // generous timeout, and if we don't receive it, we assume the test // thread has hung. // // Note that it should definitely complete in under a minute, but just @@ -400,13 +398,32 @@ fn local_tasks_wake_join_all() { }); } -#[tokio::test] -async fn local_tasks_are_polled_after_tick() { +#[test] +fn local_tasks_are_polled_after_tick() { + // This test depends on timing, so we run it up to five times. + for _ in 0..4 { + let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner); + if res.is_ok() { + // success + return; + } + } + + // Test failed 4 times. Try one more time without catching panics. If it + // fails again, the test fails. + local_tasks_are_polled_after_tick_inner(); +} + +#[tokio::main(flavor = "current_thread")] +async fn local_tasks_are_polled_after_tick_inner() { // Reproduces issues #1899 and #1900 static RX1: AtomicUsize = AtomicUsize::new(0); static RX2: AtomicUsize = AtomicUsize::new(0); - static EXPECTED: usize = 500; + const EXPECTED: usize = 500; + + RX1.store(0, SeqCst); + RX2.store(0, SeqCst); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -416,7 +433,7 @@ async fn local_tasks_are_polled_after_tick() { .run_until(async { let task2 = task::spawn(async move { // Wait a bit - time::sleep(Duration::from_millis(100)).await; + time::sleep(Duration::from_millis(10)).await; let mut oneshots = Vec::with_capacity(EXPECTED); @@ -427,13 +444,13 @@ async fn local_tasks_are_polled_after_tick() { tx.send(oneshot_rx).unwrap(); } - time::sleep(Duration::from_millis(100)).await; + time::sleep(Duration::from_millis(10)).await; for tx in oneshots.drain(..) { tx.send(()).unwrap(); } - time::sleep(Duration::from_millis(300)).await; + time::sleep(Duration::from_millis(20)).await; let rx1 = RX1.load(SeqCst); let rx2 = RX2.load(SeqCst); println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2); diff --git a/tests/tcp_into_split.rs b/tests/tcp_into_split.rs index b4bb2ee..2e06643 100644 --- a/tests/tcp_into_split.rs +++ b/tests/tcp_into_split.rs @@ -116,7 +116,7 @@ async fn drop_write() -> Result<()> { // drop it while the read is in progress std::thread::spawn(move || { - thread::sleep(std::time::Duration::from_millis(50)); + thread::sleep(std::time::Duration::from_millis(10)); drop(write_half); }); diff --git a/tests/time_interval.rs b/tests/time_interval.rs index a3c7f08..5f7bf55 100644 --- a/tests/time_interval.rs +++ b/tests/time_interval.rs @@ -1,56 +1,173 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, Duration, Instant}; +use tokio::time::{self, Duration, Instant, MissedTickBehavior}; use tokio_test::{assert_pending, assert_ready_eq, task}; -use std::future::Future; use std::task::Poll; +// Takes the `Interval` task, `start` variable, and optional time deltas +// For each time delta, it polls the `Interval` and asserts that the result is +// equal to `start` + the specific time delta. Then it asserts that the +// `Interval` is pending. +macro_rules! check_interval_poll { + ($i:ident, $start:ident, $($delta:expr),*$(,)?) => { + $( + assert_ready_eq!(poll_next(&mut $i), $start + ms($delta)); + )* + assert_pending!(poll_next(&mut $i)); + }; + ($i:ident, $start:ident) => { + check_interval_poll!($i, $start,); + }; +} + #[tokio::test] #[should_panic] async fn interval_zero_duration() { let _ = time::interval_at(Instant::now(), ms(0)); } -#[tokio::test] -async fn usage() { - time::pause(); +// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | +// Actual ticks: | work -----| delay | work | work | work -| work -----| +// Poll behavior: | | | | | | | | +// | | | | | | | | +// Ready(s) | | Ready(s + 2p) | | | | +// Pending | Ready(s + 3p) | | | +// Ready(s + p) Ready(s + 4p) | | +// Ready(s + 5p) | +// Ready(s + 6p) +#[tokio::test(start_paused = true)] +async fn burst() { + let start = Instant::now(); + + // This is necessary because the timer is only so granular, and in order for + // all our ticks to resolve, the time needs to be 1ms ahead of what we + // expect, so that the runtime will see that it is time to resolve the timer + time::advance(ms(1)).await; + + let mut i = task::spawn(time::interval_at(start, ms(300))); + + check_interval_poll!(i, start, 0); + + time::advance(ms(100)).await; + check_interval_poll!(i, start); + + time::advance(ms(200)).await; + check_interval_poll!(i, start, 300); + + time::advance(ms(650)).await; + check_interval_poll!(i, start, 600, 900); + + time::advance(ms(200)).await; + check_interval_poll!(i, start); + + time::advance(ms(100)).await; + check_interval_poll!(i, start, 1200); + + time::advance(ms(250)).await; + check_interval_poll!(i, start, 1500); + + time::advance(ms(300)).await; + check_interval_poll!(i, start, 1800); +} +// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | +// Actual ticks: | work -----| delay | work -----| work -----| work -----| +// Poll behavior: | | | | | | | | +// | | | | | | | | +// Ready(s) | | Ready(s + 2p) | | | | +// Pending | Pending | | | +// Ready(s + p) Ready(s + 2p + d) | | +// Ready(s + 3p + d) | +// Ready(s + 4p + d) +#[tokio::test(start_paused = true)] +async fn delay() { let start = Instant::now(); - // TODO: Skip this + // This is necessary because the timer is only so granular, and in order for + // all our ticks to resolve, the time needs to be 1ms ahead of what we + // expect, so that the runtime will see that it is time to resolve the timer time::advance(ms(1)).await; let mut i = task::spawn(time::interval_at(start, ms(300))); + i.set_missed_tick_behavior(MissedTickBehavior::Delay); - assert_ready_eq!(poll_next(&mut i), start); - assert_pending!(poll_next(&mut i)); + check_interval_poll!(i, start, 0); time::advance(ms(100)).await; - assert_pending!(poll_next(&mut i)); + check_interval_poll!(i, start); time::advance(ms(200)).await; - assert_ready_eq!(poll_next(&mut i), start + ms(300)); - assert_pending!(poll_next(&mut i)); + check_interval_poll!(i, start, 300); + + time::advance(ms(650)).await; + check_interval_poll!(i, start, 600); + + time::advance(ms(100)).await; + check_interval_poll!(i, start); + + // We have to add one here for the same reason as is above. + // Because `Interval` has reset its timer according to `Instant::now()`, + // we have to go forward 1 more millisecond than is expected so that the + // runtime realizes that it's time to resolve the timer. + time::advance(ms(201)).await; + // We add one because when using the `Delay` behavior, `Interval` + // adds the `period` from `Instant::now()`, which will always be off by one + // because we have to advance time by 1 (see above). + check_interval_poll!(i, start, 1251); + + time::advance(ms(300)).await; + // Again, we add one. + check_interval_poll!(i, start, 1551); + + time::advance(ms(300)).await; + check_interval_poll!(i, start, 1851); +} + +// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | +// Actual ticks: | work -----| delay | work ---| work -----| work -----| +// Poll behavior: | | | | | | | +// | | | | | | | +// Ready(s) | | Ready(s + 2p) | | | +// Pending | Ready(s + 4p) | | +// Ready(s + p) Ready(s + 5p) | +// Ready(s + 6p) +#[tokio::test(start_paused = true)] +async fn skip() { + let start = Instant::now(); + + // This is necessary because the timer is only so granular, and in order for + // all our ticks to resolve, the time needs to be 1ms ahead of what we + // expect, so that the runtime will see that it is time to resolve the timer + time::advance(ms(1)).await; + + let mut i = task::spawn(time::interval_at(start, ms(300))); + i.set_missed_tick_behavior(MissedTickBehavior::Skip); + + check_interval_poll!(i, start, 0); + + time::advance(ms(100)).await; + check_interval_poll!(i, start); + + time::advance(ms(200)).await; + check_interval_poll!(i, start, 300); + + time::advance(ms(650)).await; + check_interval_poll!(i, start, 600); + + time::advance(ms(250)).await; + check_interval_poll!(i, start, 1200); - time::advance(ms(400)).await; - assert_ready_eq!(poll_next(&mut i), start + ms(600)); - assert_pending!(poll_next(&mut i)); + time::advance(ms(300)).await; + check_interval_poll!(i, start, 1500); - time::advance(ms(500)).await; - assert_ready_eq!(poll_next(&mut i), start + ms(900)); - assert_ready_eq!(poll_next(&mut i), start + ms(1200)); - assert_pending!(poll_next(&mut i)); + time::advance(ms(300)).await; + check_interval_poll!(i, start, 1800); } fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> { - interval.enter(|cx, mut interval| { - tokio::pin! { - let fut = interval.tick(); - } - fut.poll(cx) - }) + interval.enter(|cx, mut interval| interval.poll_tick(cx)) } fn ms(n: u64) -> Duration { diff --git a/tests/time_rt.rs b/tests/time_rt.rs index 0775343..23367be 100644 --- a/tests/time_rt.rs +++ b/tests/time_rt.rs @@ -13,7 +13,7 @@ fn timer_with_threaded_runtime() { let (tx, rx) = mpsc::channel(); rt.spawn(async move { - let when = Instant::now() + Duration::from_millis(100); + let when = Instant::now() + Duration::from_millis(10); sleep_until(when).await; assert!(Instant::now() >= when); @@ -32,7 +32,7 @@ fn timer_with_basic_scheduler() { let (tx, rx) = mpsc::channel(); rt.block_on(async move { - let when = Instant::now() + Duration::from_millis(100); + let when = Instant::now() + Duration::from_millis(10); sleep_until(when).await; assert!(Instant::now() >= when); @@ -67,7 +67,7 @@ async fn starving() { } } - let when = Instant::now() + Duration::from_millis(20); + let when = Instant::now() + Duration::from_millis(10); let starve = Starve(Box::pin(sleep_until(when)), 0); starve.await; @@ -81,7 +81,7 @@ async fn timeout_value() { let (_tx, rx) = oneshot::channel::<()>(); let now = Instant::now(); - let dur = Duration::from_millis(20); + let dur = Duration::from_millis(10); let res = timeout(dur, rx).await; assert!(res.is_err()); diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs index 9c04d22..20477d2 100644 --- a/tests/time_sleep.rs +++ b/tests/time_sleep.rs @@ -24,7 +24,7 @@ async fn immediate_sleep() { async fn is_elapsed() { time::pause(); - let sleep = time::sleep(Duration::from_millis(50)); + let sleep = time::sleep(Duration::from_millis(10)); tokio::pin!(sleep); @@ -349,7 +349,7 @@ async fn drop_from_wake() { assert!( !panicked.load(Ordering::SeqCst), - "paniced when dropping timers" + "panicked when dropping timers" ); #[derive(Clone)] diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs index 10314be..4d28468 100644 --- a/tests/uds_datagram.rs +++ b/tests/uds_datagram.rs @@ -87,9 +87,12 @@ async fn try_send_recv_never_block() -> io::Result<()> { dgram1.writable().await.unwrap(); match dgram1.try_send(payload) { - Err(err) => match err.kind() { - io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, - _ => unreachable!("unexpected error {:?}", err), + Err(err) => match (err.kind(), err.raw_os_error()) { + (io::ErrorKind::WouldBlock, _) => break, + (_, Some(libc::ENOBUFS)) => break, + _ => { + panic!("unexpected error {:?}", err); + } }, Ok(len) => { assert_eq!(len, payload.len()); @@ -291,9 +294,12 @@ async fn try_recv_buf_never_block() -> io::Result<()> { dgram1.writable().await.unwrap(); match dgram1.try_send(payload) { - Err(err) => match err.kind() { - io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, - _ => unreachable!("unexpected error {:?}", err), + Err(err) => match (err.kind(), err.raw_os_error()) { + (io::ErrorKind::WouldBlock, _) => break, + (_, Some(libc::ENOBUFS)) => break, + _ => { + panic!("unexpected error {:?}", err); + } }, Ok(len) => { assert_eq!(len, payload.len()); diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs index 2754e84..5f1b4cf 100644 --- a/tests/uds_stream.rs +++ b/tests/uds_stream.rs @@ -379,3 +379,33 @@ async fn try_read_buf() -> std::io::Result<()> { Ok(()) } + +// https://github.com/tokio-rs/tokio/issues/3879 +#[tokio::test] +#[cfg(not(target_os = "macos"))] +async fn epollhup() -> io::Result<()> { + let dir = tempfile::Builder::new() + .prefix("tokio-uds-tests") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let listener = UnixListener::bind(&sock_path)?; + let connect = UnixStream::connect(&sock_path); + tokio::pin!(connect); + + // Poll `connect` once. + poll_fn(|cx| { + use std::future::Future; + + assert_pending!(connect.as_mut().poll(cx)); + Poll::Ready(()) + }) + .await; + + drop(listener); + + let err = connect.await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::ConnectionReset); + Ok(()) +} |