aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-08-17 08:33:38 -0700
committerJoel Galenson <jgalenson@google.com>2021-08-17 08:40:48 -0700
commit642961436a727d51930e5839e3dbfee04ba4af95 (patch)
tree9da006d6d1c0e4667e8d848673b13cc7d2bb62ca /tests
parent1c33108b3901dd464f81acf08b5268ec294b3876 (diff)
downloadtokio-642961436a727d51930e5839e3dbfee04ba4af95.tar.gz
Upgrade rust/crates/tokio to 1.10.0
Test: make Change-Id: I4ec984178af20297aae0ed51f0b1c6410876a51b
Diffstat (limited to 'tests')
-rw-r--r--tests/async_send_sync.rs849
-rw-r--r--tests/fs_file_mocked.rs780
-rw-r--r--tests/io_async_fd.rs17
-rw-r--r--tests/io_buf_reader.rs33
-rw-r--r--tests/io_buf_writer.rs286
-rw-r--r--tests/io_copy.rs53
-rw-r--r--tests/io_copy_bidirectional.rs4
-rw-r--r--tests/io_split.rs8
-rw-r--r--tests/io_write_all_buf.rs2
-rw-r--r--tests/macros_select.rs21
-rw-r--r--tests/named_pipe.rs181
-rw-r--r--tests/no_rt.rs2
-rw-r--r--tests/process_arg0.rs13
-rw-r--r--tests/process_raw_handle.rs23
-rw-r--r--tests/support/io_vec.rs45
-rw-r--r--tests/support/mock_file.rs295
-rw-r--r--tests/support/mock_pool.rs66
-rw-r--r--tests/sync_mutex.rs6
-rw-r--r--tests/sync_mutex_owned.rs6
-rw-r--r--tests/sync_once_cell.rs6
-rw-r--r--tests/sync_rwlock.rs4
-rw-r--r--tests/sync_watch.rs17
-rw-r--r--tests/task_abort.rs155
-rw-r--r--tests/task_builder.rs67
-rw-r--r--tests/task_local_set.rs41
-rw-r--r--tests/tcp_into_split.rs2
-rw-r--r--tests/time_interval.rs165
-rw-r--r--tests/time_rt.rs8
-rw-r--r--tests/time_sleep.rs4
-rw-r--r--tests/uds_datagram.rs18
-rw-r--r--tests/uds_stream.rs30
31 files changed, 1692 insertions, 1515 deletions
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index 01e6081..aa14970 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -4,13 +4,30 @@
use std::cell::Cell;
use std::future::Future;
-use std::io::{Cursor, SeekFrom};
+use std::io::SeekFrom;
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
use tokio::net::TcpStream;
use tokio::time::{Duration, Instant};
+// The names of these structs behaves better when sorted.
+// Send: Yes, Sync: Yes
+#[derive(Clone)]
+struct YY {}
+
+// Send: Yes, Sync: No
+#[derive(Clone)]
+struct YN {
+ _value: Cell<u8>,
+}
+
+// Send: No, Sync: No
+#[derive(Clone)]
+struct NN {
+ _value: Rc<u8>,
+}
+
#[allow(dead_code)]
type BoxFutureSync<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + Sync>>;
#[allow(dead_code)]
@@ -19,11 +36,11 @@ type BoxFutureSend<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> +
type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T>>>;
#[allow(dead_code)]
-type BoxAsyncRead = std::pin::Pin<Box<dyn tokio::io::AsyncBufRead>>;
+type BoxAsyncRead = std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Sync>>;
#[allow(dead_code)]
-type BoxAsyncSeek = std::pin::Pin<Box<dyn tokio::io::AsyncSeek>>;
+type BoxAsyncSeek = std::pin::Pin<Box<dyn tokio::io::AsyncSeek + Send + Sync>>;
#[allow(dead_code)]
-type BoxAsyncWrite = std::pin::Pin<Box<dyn tokio::io::AsyncWrite>>;
+type BoxAsyncWrite = std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Sync>>;
#[allow(dead_code)]
fn require_send<T: Send>(_t: &T) {}
@@ -59,310 +76,594 @@ macro_rules! into_todo {
x
}};
}
-macro_rules! assert_value {
- ($type:ty: Send & Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f: $type = todo!();
- require_send(&f);
- require_sync(&f);
- };
- };
- ($type:ty: !Send & Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f: $type = todo!();
- AmbiguousIfSend::some_item(&f);
- require_sync(&f);
- };
- };
- ($type:ty: Send & !Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f: $type = todo!();
- require_send(&f);
- AmbiguousIfSync::some_item(&f);
- };
+
+macro_rules! async_assert_fn_send {
+ (Send & $(!)?Sync & $(!)?Unpin, $value:expr) => {
+ require_send(&$value);
};
- ($type:ty: !Send & !Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f: $type = todo!();
- AmbiguousIfSend::some_item(&f);
- AmbiguousIfSync::some_item(&f);
- };
- };
- ($type:ty: Unpin) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f: $type = todo!();
- require_unpin(&f);
- };
+ (!Send & $(!)?Sync & $(!)?Unpin, $value:expr) => {
+ AmbiguousIfSend::some_item(&$value);
};
}
-macro_rules! async_assert_fn {
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- require_send(&f);
- require_sync(&f);
- };
+macro_rules! async_assert_fn_sync {
+ ($(!)?Send & Sync & $(!)?Unpin, $value:expr) => {
+ require_sync(&$value);
};
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- require_send(&f);
- AmbiguousIfSync::some_item(&f);
- };
+ ($(!)?Send & !Sync & $(!)?Unpin, $value:expr) => {
+ AmbiguousIfSync::some_item(&$value);
};
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- AmbiguousIfSend::some_item(&f);
- require_sync(&f);
- };
+}
+macro_rules! async_assert_fn_unpin {
+ ($(!)?Send & $(!)?Sync & Unpin, $value:expr) => {
+ require_unpin(&$value);
};
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => {
- #[allow(unreachable_code)]
- #[allow(unused_variables)]
- const _: fn() = || {
- let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- AmbiguousIfSend::some_item(&f);
- AmbiguousIfSync::some_item(&f);
- };
+ ($(!)?Send & $(!)?Sync & !Unpin, $value:expr) => {
+ AmbiguousIfUnpin::some_item(&$value);
};
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => {
+}
+
+macro_rules! async_assert_fn {
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): $($tok:tt)*) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- AmbiguousIfUnpin::some_item(&f);
+ async_assert_fn_send!($($tok)*, f);
+ async_assert_fn_sync!($($tok)*, f);
+ async_assert_fn_unpin!($($tok)*, f);
};
};
- ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => {
+}
+macro_rules! assert_value {
+ ($type:ty: $($tok:tt)*) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
- let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
- require_unpin(&f);
+ let f: $type = todo!();
+ async_assert_fn_send!($($tok)*, f);
+ async_assert_fn_sync!($($tok)*, f);
+ async_assert_fn_unpin!($($tok)*, f);
};
};
}
-async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync);
-async_assert_fn!(tokio::io::empty(): Send & Sync);
-async_assert_fn!(tokio::io::repeat(u8): Send & Sync);
-async_assert_fn!(tokio::io::sink(): Send & Sync);
-async_assert_fn!(tokio::io::split(TcpStream): Send & Sync);
-async_assert_fn!(tokio::io::stderr(): Send & Sync);
-async_assert_fn!(tokio::io::stdin(): Send & Sync);
-async_assert_fn!(tokio::io::stdout(): Send & Sync);
-async_assert_fn!(tokio::io::Split<Cursor<Vec<u8>>>::next_segment(_): Send & Sync);
-
-async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync);
-async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync);
-async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync);
-async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync);
-async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync);
-async_assert_fn!(tokio::fs::metadata(&str): Send & Sync);
-async_assert_fn!(tokio::fs::read(&str): Send & Sync);
-async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync);
-async_assert_fn!(tokio::fs::read_link(&str): Send & Sync);
-async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync);
-async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync);
-async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync);
-async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync);
-async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync);
-async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync);
-async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync);
-async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync);
-async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync);
-async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync);
-async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync);
-async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync);
+assert_value!(tokio::fs::DirBuilder: Send & Sync & Unpin);
+assert_value!(tokio::fs::DirEntry: Send & Sync & Unpin);
+assert_value!(tokio::fs::File: Send & Sync & Unpin);
+assert_value!(tokio::fs::OpenOptions: Send & Sync & Unpin);
+assert_value!(tokio::fs::ReadDir: Send & Sync & Unpin);
-async_assert_fn!(tokio::fs::File::open(&str): Send & Sync);
-async_assert_fn!(tokio::fs::File::create(&str): Send & Sync);
-async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync);
-async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync);
-async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync);
-async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync);
-async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync);
-async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync);
-async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync);
+async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::metadata(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::read(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::read_link(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::DirBuilder::create(_, &str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::open(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::create(&str): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync & !Unpin);
-async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync);
-async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync);
-async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync);
+assert_value!(tokio::net::TcpListener: Send & Sync & Unpin);
+assert_value!(tokio::net::TcpSocket: Send & Sync & Unpin);
+assert_value!(tokio::net::TcpStream: Send & Sync & Unpin);
+assert_value!(tokio::net::UdpSocket: Send & Sync & Unpin);
+assert_value!(tokio::net::tcp::OwnedReadHalf: Send & Sync & Unpin);
+assert_value!(tokio::net::tcp::OwnedWriteHalf: Send & Sync & Unpin);
+assert_value!(tokio::net::tcp::ReadHalf<'_>: Send & Sync & Unpin);
+assert_value!(tokio::net::tcp::ReuniteError: Send & Sync & Unpin);
+assert_value!(tokio::net::tcp::WriteHalf<'_>: Send & Sync & Unpin);
+async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpStream::readable(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpStream::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::TcpStream::writable(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::peek_from(_, &mut [u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::readable(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::UdpSocket::writable(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync & !Unpin);
+async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync & !Unpin);
#[cfg(unix)]
mod unix_datagram {
use super::*;
- async_assert_fn!(tokio::net::UnixListener::bind(&str): Send & Sync);
- async_assert_fn!(tokio::net::UnixListener::accept(_): Send & Sync);
- async_assert_fn!(tokio::net::UnixDatagram::send(_, &[u8]): Send & Sync);
- async_assert_fn!(tokio::net::UnixDatagram::recv(_, &mut [u8]): Send & Sync);
- async_assert_fn!(tokio::net::UnixDatagram::send_to(_, &[u8], &str): Send & Sync);
- async_assert_fn!(tokio::net::UnixDatagram::recv_from(_, &mut [u8]): Send & Sync);
- async_assert_fn!(tokio::net::UnixStream::connect(&str): Send & Sync);
+ use tokio::net::*;
+ assert_value!(UnixDatagram: Send & Sync & Unpin);
+ assert_value!(UnixListener: Send & Sync & Unpin);
+ assert_value!(UnixStream: Send & Sync & Unpin);
+ assert_value!(unix::OwnedReadHalf: Send & Sync & Unpin);
+ assert_value!(unix::OwnedWriteHalf: Send & Sync & Unpin);
+ assert_value!(unix::ReadHalf<'_>: Send & Sync & Unpin);
+ assert_value!(unix::ReuniteError: Send & Sync & Unpin);
+ assert_value!(unix::SocketAddr: Send & Sync & Unpin);
+ assert_value!(unix::UCred: Send & Sync & Unpin);
+ assert_value!(unix::WriteHalf<'_>: Send & Sync & Unpin);
+ async_assert_fn!(UnixDatagram::readable(_): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::recv(_, &mut [u8]): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::recv_from(_, &mut [u8]): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::send(_, &[u8]): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::send_to(_, &[u8], &str): Send & Sync & !Unpin);
+ async_assert_fn!(UnixDatagram::writable(_): Send & Sync & !Unpin);
+ async_assert_fn!(UnixListener::accept(_): Send & Sync & !Unpin);
+ async_assert_fn!(UnixStream::connect(&str): Send & Sync & !Unpin);
+ async_assert_fn!(UnixStream::readable(_): Send & Sync & !Unpin);
+ async_assert_fn!(UnixStream::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+ async_assert_fn!(UnixStream::writable(_): Send & Sync & !Unpin);
}
-async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync);
-async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync);
-#[cfg(unix)]
-async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync);
+#[cfg(windows)]
+mod windows_named_pipe {
+ use super::*;
+ use tokio::net::windows::named_pipe::*;
+ assert_value!(ClientOptions: Send & Sync & Unpin);
+ assert_value!(NamedPipeClient: Send & Sync & Unpin);
+ assert_value!(NamedPipeServer: Send & Sync & Unpin);
+ assert_value!(PipeEnd: Send & Sync & Unpin);
+ assert_value!(PipeInfo: Send & Sync & Unpin);
+ assert_value!(PipeMode: Send & Sync & Unpin);
+ assert_value!(ServerOptions: Send & Sync & Unpin);
+ async_assert_fn!(NamedPipeClient::readable(_): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeClient::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeClient::writable(_): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeServer::connect(_): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeServer::readable(_): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeServer::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
+ async_assert_fn!(NamedPipeServer::writable(_): Send & Sync & !Unpin);
+}
-async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync);
-async_assert_fn!(tokio::sync::Mutex<u8>::lock(_): Send & Sync);
-async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock(_): Send & Sync);
-async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::Mutex<u8>::lock_owned(_): Send & Sync);
-async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock_owned(_): Send & Sync);
-async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock_owned(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync);
-async_assert_fn!(tokio::sync::RwLock<u8>::read(_): Send & Sync);
-async_assert_fn!(tokio::sync::RwLock<u8>::write(_): Send & Sync);
-async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::read(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::write(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::read(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::write(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::Semaphore::acquire(_): Send & Sync);
+assert_value!(tokio::process::Child: Send & Sync & Unpin);
+assert_value!(tokio::process::ChildStderr: Send & Sync & Unpin);
+assert_value!(tokio::process::ChildStdin: Send & Sync & Unpin);
+assert_value!(tokio::process::ChildStdout: Send & Sync & Unpin);
+assert_value!(tokio::process::Command: Send & Sync & Unpin);
+async_assert_fn!(tokio::process::Child::kill(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::process::Child::wait(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync & !Unpin);
-async_assert_fn!(tokio::sync::broadcast::Receiver<u8>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::broadcast::Receiver<Cell<u8>>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::broadcast::Receiver<Rc<u8>>::recv(_): !Send & !Sync);
+async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync & !Unpin);
+#[cfg(unix)]
+mod unix_signal {
+ use super::*;
+ assert_value!(tokio::signal::unix::Signal: Send & Sync & Unpin);
+ assert_value!(tokio::signal::unix::SignalKind: Send & Sync & Unpin);
+ async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync & !Unpin);
+}
+#[cfg(windows)]
+mod windows_signal {
+ use super::*;
+ assert_value!(tokio::signal::windows::CtrlC: Send & Sync & Unpin);
+ assert_value!(tokio::signal::windows::CtrlBreak: Send & Sync & Unpin);
+ async_assert_fn!(tokio::signal::windows::CtrlC::recv(_): Send & Sync & !Unpin);
+ async_assert_fn!(tokio::signal::windows::CtrlBreak::recv(_): Send & Sync & !Unpin);
+}
-async_assert_fn!(tokio::sync::mpsc::Receiver<u8>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::mpsc::Receiver<Cell<u8>>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::mpsc::Receiver<Rc<u8>>::recv(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::mpsc::Sender<u8>::send(_, u8): Send & Sync);
-async_assert_fn!(tokio::sync::mpsc::Sender<Cell<u8>>::send(_, Cell<u8>): Send & !Sync);
-async_assert_fn!(tokio::sync::mpsc::Sender<Rc<u8>>::send(_, Rc<u8>): !Send & !Sync);
+assert_value!(tokio::sync::AcquireError: Send & Sync & Unpin);
+assert_value!(tokio::sync::Barrier: Send & Sync & Unpin);
+assert_value!(tokio::sync::BarrierWaitResult: Send & Sync & Unpin);
+assert_value!(tokio::sync::MappedMutexGuard<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::MappedMutexGuard<'_, YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::MappedMutexGuard<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::Mutex<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::Mutex<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::Mutex<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::MutexGuard<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::MutexGuard<'_, YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::MutexGuard<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::Notify: Send & Sync & Unpin);
+assert_value!(tokio::sync::OnceCell<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OnceCell<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::OnceCell<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::OwnedMutexGuard<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedMutexGuard<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedMutexGuard<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockReadGuard<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockReadGuard<YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockReadGuard<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockWriteGuard<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockWriteGuard<YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::OwnedRwLockWriteGuard<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::OwnedSemaphorePermit: Send & Sync & Unpin);
+assert_value!(tokio::sync::RwLock<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLock<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLock<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockMappedWriteGuard<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::RwLockReadGuard<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockReadGuard<'_, YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockReadGuard<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::RwLockWriteGuard<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockWriteGuard<'_, YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::RwLockWriteGuard<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::Semaphore: Send & Sync & Unpin);
+assert_value!(tokio::sync::SemaphorePermit<'_>: Send & Sync & Unpin);
+assert_value!(tokio::sync::TryAcquireError: Send & Sync & Unpin);
+assert_value!(tokio::sync::TryLockError: Send & Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Receiver<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Receiver<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Receiver<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Sender<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Sender<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::broadcast::Sender<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::futures::Notified<'_>: Send & Sync & !Unpin);
+assert_value!(tokio::sync::mpsc::OwnedPermit<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::OwnedPermit<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::OwnedPermit<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Permit<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Permit<'_, YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Permit<'_, YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Receiver<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Receiver<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Receiver<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Sender<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Sender<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::Sender<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedReceiver<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedReceiver<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedReceiver<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedSender<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedSender<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::UnboundedSender<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendError<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendError<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendError<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendTimeoutError<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendTimeoutError<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::SendTimeoutError<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::TrySendError<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::TrySendError<YN>: Send & !Sync & Unpin);
+assert_value!(tokio::sync::mpsc::error::TrySendError<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Receiver<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Receiver<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Receiver<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Sender<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Sender<YN>: Send & Sync & Unpin);
+assert_value!(tokio::sync::oneshot::Sender<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::watch::Receiver<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Receiver<YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Receiver<YY>: Send & Sync & Unpin);
+assert_value!(tokio::sync::watch::Ref<'_, NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Ref<'_, YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Ref<'_, YY>: !Send & Sync & Unpin);
+assert_value!(tokio::sync::watch::Sender<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Sender<YN>: !Send & !Sync & Unpin);
+assert_value!(tokio::sync::watch::Sender<YY>: Send & Sync & Unpin);
+async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<NN>::lock(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<NN>::lock_owned(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<YN>::lock(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<YN>::lock_owned(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<YY>::lock(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Mutex<YY>::lock_owned(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN> + Send + Sync>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN> + Send>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = NN>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>> + Send + Sync>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>> + Send>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<NN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<NN>>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN> + Send + Sync>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN> + Send>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YN>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>> + Send + Sync>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>> + Send>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YN>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YN>>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY> + Send + Sync>>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY> + Send>>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_init( _, fn() -> Pin<Box<dyn Future<Output = YY>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>> + Send + Sync>>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>> + Send>>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::OnceCell<YY>::get_or_try_init( _, fn() -> Pin<Box<dyn Future<Output = std::io::Result<YY>>>>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<NN>::read(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<NN>::write(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<YN>::read(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<YN>::write(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<YY>::read(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::RwLock<YY>::write(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Semaphore::acquire(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Semaphore::acquire_many(_, u32): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Semaphore::acquire_many_owned(_, u32): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::Semaphore::acquire_owned(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::broadcast::Receiver<NN>::recv(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::broadcast::Receiver<YN>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::broadcast::Receiver<YY>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Receiver<NN>::recv(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Receiver<YN>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Receiver<YY>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<NN>::closed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<NN>::reserve(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<NN>::reserve_owned(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<NN>::send(_, NN): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<NN>::send_timeout(_, NN, Duration): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YN>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YN>::reserve(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YN>::reserve_owned(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YN>::send(_, YN): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YN>::send_timeout(_, YN, Duration): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YY>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YY>::reserve(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YY>::reserve_owned(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YY>::send(_, YY): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::Sender<YY>::send_timeout(_, YY, Duration): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<NN>::recv(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<YN>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<YY>::recv(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedSender<NN>::closed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedSender<YN>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::mpsc::UnboundedSender<YY>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::oneshot::Sender<NN>::closed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::oneshot::Sender<YN>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::oneshot::Sender<YY>::closed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Receiver<NN>::changed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Receiver<YN>::changed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Receiver<YY>::changed(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Sender<NN>::closed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Sender<YN>::closed(_): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::sync::watch::Sender<YY>::closed(_): Send & Sync & !Unpin);
-async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<u8>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Cell<u8>>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Rc<u8>>::recv(_): !Send & !Sync);
+async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSync<()>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSend<()>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFuture<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSync<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSend<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFuture<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::LocalSet::run_until(_, BoxFutureSync<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::task::unconstrained(BoxFuture<()>): !Send & !Sync & Unpin);
+async_assert_fn!(tokio::task::unconstrained(BoxFutureSend<()>): Send & !Sync & Unpin);
+async_assert_fn!(tokio::task::unconstrained(BoxFutureSync<()>): Send & Sync & Unpin);
+assert_value!(tokio::task::LocalSet: !Send & !Sync & Unpin);
+assert_value!(tokio::task::JoinHandle<YY>: Send & Sync & Unpin);
+assert_value!(tokio::task::JoinHandle<YN>: Send & Sync & Unpin);
+assert_value!(tokio::task::JoinHandle<NN>: !Send & !Sync & Unpin);
+assert_value!(tokio::task::JoinError: Send & Sync & Unpin);
-async_assert_fn!(tokio::sync::watch::Receiver<u8>::changed(_): Send & Sync);
-async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync);
-async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync);
+assert_value!(tokio::runtime::Builder: Send & Sync & Unpin);
+assert_value!(tokio::runtime::EnterGuard<'_>: Send & Sync & Unpin);
+assert_value!(tokio::runtime::Handle: Send & Sync & Unpin);
+assert_value!(tokio::runtime::Runtime: Send & Sync & Unpin);
-async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = u8> + Send + Sync>>): Send & Sync);
-async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = u8>>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): !Send & !Sync);
-async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
- _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
-assert_value!(tokio::sync::OnceCell<u8>: Send & Sync);
-assert_value!(tokio::sync::OnceCell<Cell<u8>>: Send & !Sync);
-assert_value!(tokio::sync::OnceCell<Rc<u8>>: !Send & !Sync);
+assert_value!(tokio::time::Interval: Send & Sync & Unpin);
+assert_value!(tokio::time::Instant: Send & Sync & Unpin);
+assert_value!(tokio::time::Sleep: Send & Sync & !Unpin);
+assert_value!(tokio::time::Timeout<BoxFutureSync<()>>: Send & Sync & !Unpin);
+assert_value!(tokio::time::Timeout<BoxFutureSend<()>>: Send & !Sync & !Unpin);
+assert_value!(tokio::time::Timeout<BoxFuture<()>>: !Send & !Sync & !Unpin);
+assert_value!(tokio::time::error::Elapsed: Send & Sync & Unpin);
+assert_value!(tokio::time::error::Error: Send & Sync & Unpin);
+async_assert_fn!(tokio::time::advance(Duration): Send & Sync & !Unpin);
+async_assert_fn!(tokio::time::sleep(Duration): Send & Sync & !Unpin);
+async_assert_fn!(tokio::time::sleep_until(Instant): Send & Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSync<()>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSend<()>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync & !Unpin);
+async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync & !Unpin);
+async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync & !Unpin);
-async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync);
-async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSync<()>): Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSend<()>): Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFuture<()>): !Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSync<()>): !Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSend<()>): !Send & !Sync);
-async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFuture<()>): !Send & !Sync);
-async_assert_fn!(tokio::task::LocalSet::run_until(_, BoxFutureSync<()>): !Send & !Sync);
-assert_value!(tokio::task::LocalSet: !Send & !Sync);
+assert_value!(tokio::io::BufReader<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::BufStream<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::BufWriter<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::DuplexStream: Send & Sync & Unpin);
+assert_value!(tokio::io::Empty: Send & Sync & Unpin);
+assert_value!(tokio::io::Interest: Send & Sync & Unpin);
+assert_value!(tokio::io::Lines<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::ReadBuf<'_>: Send & Sync & Unpin);
+assert_value!(tokio::io::ReadHalf<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::Ready: Send & Sync & Unpin);
+assert_value!(tokio::io::Repeat: Send & Sync & Unpin);
+assert_value!(tokio::io::Sink: Send & Sync & Unpin);
+assert_value!(tokio::io::Split<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::Stderr: Send & Sync & Unpin);
+assert_value!(tokio::io::Stdin: Send & Sync & Unpin);
+assert_value!(tokio::io::Stdout: Send & Sync & Unpin);
+assert_value!(tokio::io::Take<TcpStream>: Send & Sync & Unpin);
+assert_value!(tokio::io::WriteHalf<TcpStream>: Send & Sync & Unpin);
+async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::copy_bidirectional(&mut TcpStream, &mut TcpStream): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::copy_buf(&mut tokio::io::BufReader<TcpStream>, &mut TcpStream): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::empty(): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::repeat(u8): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::sink(): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::split(TcpStream): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::stderr(): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::stdin(): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::stdout(): Send & Sync & Unpin);
+async_assert_fn!(tokio::io::Split<tokio::io::BufReader<TcpStream>>::next_segment(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::Lines<tokio::io::BufReader<TcpStream>>::next_line(_): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec<u8>): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::AsyncBufReadExt::fill_buf(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_buf(&mut BoxAsyncRead, &mut Vec<u8>): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::AsyncReadExt::read_exact(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u8(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i8(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u16(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i16(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u32(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i32(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u64(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i64(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u128(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i128(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_f32(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_f64(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u16_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i16_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u32_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i32_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u64_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i64_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_u128_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_i128_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_f32_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_f64_le(&mut BoxAsyncRead): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncReadExt::read_to_end(&mut BoxAsyncRead, &mut Vec<u8>): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::AsyncReadExt::read_to_string(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::AsyncSeekExt::seek(&mut BoxAsyncSeek, SeekFrom): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncSeekExt::stream_position(&mut BoxAsyncSeek): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncWriteExt::write(&mut BoxAsyncWrite, &[u8]): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_vectored(&mut BoxAsyncWrite, _): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_buf(&mut BoxAsyncWrite, &mut bytes::Bytes): Send
+ & Sync
+ & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_all_buf(&mut BoxAsyncWrite, &mut bytes::Bytes): Send
+ & Sync
+ & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_all(&mut BoxAsyncWrite, &[u8]): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::AsyncWriteExt::write_u8(&mut BoxAsyncWrite, u8): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncWriteExt::write_i8(&mut BoxAsyncWrite, i8): Send & Sync & !Unpin);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u16(&mut BoxAsyncWrite, u16): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i16(&mut BoxAsyncWrite, i16): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u32(&mut BoxAsyncWrite, u32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i32(&mut BoxAsyncWrite, i32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u64(&mut BoxAsyncWrite, u64): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i64(&mut BoxAsyncWrite, i64): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u128(&mut BoxAsyncWrite, u128): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i128(&mut BoxAsyncWrite, i128): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_f32(&mut BoxAsyncWrite, f32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_f64(&mut BoxAsyncWrite, f64): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u16_le(&mut BoxAsyncWrite, u16): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i16_le(&mut BoxAsyncWrite, i16): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u32_le(&mut BoxAsyncWrite, u32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i32_le(&mut BoxAsyncWrite, i32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u64_le(&mut BoxAsyncWrite, u64): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i64_le(&mut BoxAsyncWrite, i64): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_u128_le(&mut BoxAsyncWrite, u128): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_i128_le(&mut BoxAsyncWrite, i128): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_f32_le(&mut BoxAsyncWrite, f32): Send & Sync & !Unpin
+);
+async_assert_fn!(
+ tokio::io::AsyncWriteExt::write_f64_le(&mut BoxAsyncWrite, f64): Send & Sync & !Unpin
+);
+async_assert_fn!(tokio::io::AsyncWriteExt::flush(&mut BoxAsyncWrite): Send & Sync & !Unpin);
+async_assert_fn!(tokio::io::AsyncWriteExt::shutdown(&mut BoxAsyncWrite): Send & Sync & !Unpin);
-async_assert_fn!(tokio::time::advance(Duration): Send & Sync);
-async_assert_fn!(tokio::time::sleep(Duration): Send & Sync);
-async_assert_fn!(tokio::time::sleep_until(Instant): Send & Sync);
-async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSync<()>): Send & Sync);
-async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSend<()>): Send & !Sync);
-async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Send & !Sync);
-async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Sync);
-async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync);
-async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync);
-async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync);
+#[cfg(unix)]
+mod unix_asyncfd {
+ use super::*;
+ use tokio::io::unix::*;
-assert_value!(tokio::time::Interval: Unpin);
-async_assert_fn!(tokio::time::sleep(Duration): !Unpin);
-async_assert_fn!(tokio::time::sleep_until(Instant): !Unpin);
-async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Unpin);
-async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Unpin);
-async_assert_fn!(tokio::time::Interval::tick(_): !Unpin);
-async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec<u8>): !Unpin);
-async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_exact(&mut BoxAsyncRead, &mut [u8]): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u8(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i8(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u16(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i16(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u32(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i32(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u64(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i64(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u128(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i128(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u16_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i16_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u32_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i32_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u64_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i64_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_u128_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_i128_le(&mut BoxAsyncRead): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_to_end(&mut BoxAsyncRead, &mut Vec<u8>): !Unpin);
-async_assert_fn!(tokio::io::AsyncReadExt::read_to_string(&mut BoxAsyncRead, &mut String): !Unpin);
-async_assert_fn!(tokio::io::AsyncSeekExt::seek(&mut BoxAsyncSeek, SeekFrom): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write(&mut BoxAsyncWrite, &[u8]): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_all(&mut BoxAsyncWrite, &[u8]): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u8(&mut BoxAsyncWrite, u8): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i8(&mut BoxAsyncWrite, i8): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u16(&mut BoxAsyncWrite, u16): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i16(&mut BoxAsyncWrite, i16): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u32(&mut BoxAsyncWrite, u32): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i32(&mut BoxAsyncWrite, i32): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u64(&mut BoxAsyncWrite, u64): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i64(&mut BoxAsyncWrite, i64): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u128(&mut BoxAsyncWrite, u128): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i128(&mut BoxAsyncWrite, i128): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u16_le(&mut BoxAsyncWrite, u16): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i16_le(&mut BoxAsyncWrite, i16): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u32_le(&mut BoxAsyncWrite, u32): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i32_le(&mut BoxAsyncWrite, i32): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u64_le(&mut BoxAsyncWrite, u64): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i64_le(&mut BoxAsyncWrite, i64): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_u128_le(&mut BoxAsyncWrite, u128): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::write_i128_le(&mut BoxAsyncWrite, i128): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::flush(&mut BoxAsyncWrite): !Unpin);
-async_assert_fn!(tokio::io::AsyncWriteExt::shutdown(&mut BoxAsyncWrite): !Unpin);
+ struct ImplsFd<T> {
+ _t: T,
+ }
+ impl<T> std::os::unix::io::AsRawFd for ImplsFd<T> {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
+ unreachable!()
+ }
+ }
+
+ assert_value!(AsyncFd<ImplsFd<YY>>: Send & Sync & Unpin);
+ assert_value!(AsyncFd<ImplsFd<YN>>: Send & !Sync & Unpin);
+ assert_value!(AsyncFd<ImplsFd<NN>>: !Send & !Sync & Unpin);
+ assert_value!(AsyncFdReadyGuard<'_, ImplsFd<YY>>: Send & Sync & Unpin);
+ assert_value!(AsyncFdReadyGuard<'_, ImplsFd<YN>>: !Send & !Sync & Unpin);
+ assert_value!(AsyncFdReadyGuard<'_, ImplsFd<NN>>: !Send & !Sync & Unpin);
+ assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<YY>>: Send & Sync & Unpin);
+ assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<YN>>: Send & !Sync & Unpin);
+ assert_value!(AsyncFdReadyMutGuard<'_, ImplsFd<NN>>: !Send & !Sync & Unpin);
+ assert_value!(TryIoError: Send & Sync & Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YY>>::readable(_): Send & Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YY>>::readable_mut(_): Send & Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YY>>::writable(_): Send & Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YY>>::writable_mut(_): Send & Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YN>>::readable(_): !Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YN>>::readable_mut(_): Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YN>>::writable(_): !Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<YN>>::writable_mut(_): Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<NN>>::readable(_): !Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<NN>>::readable_mut(_): !Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<NN>>::writable(_): !Send & !Sync & !Unpin);
+ async_assert_fn!(AsyncFd<ImplsFd<NN>>::writable_mut(_): !Send & !Sync & !Unpin);
+}
diff --git a/tests/fs_file_mocked.rs b/tests/fs_file_mocked.rs
deleted file mode 100644
index 7771532..0000000
--- a/tests/fs_file_mocked.rs
+++ /dev/null
@@ -1,780 +0,0 @@
-#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
-
-macro_rules! ready {
- ($e:expr $(,)?) => {
- match $e {
- std::task::Poll::Ready(t) => t,
- std::task::Poll::Pending => return std::task::Poll::Pending,
- }
- };
-}
-
-#[macro_export]
-macro_rules! cfg_fs {
- ($($item:item)*) => { $($item)* }
-}
-
-#[macro_export]
-macro_rules! cfg_io_std {
- ($($item:item)*) => { $($item)* }
-}
-
-use futures::future;
-
-// Load source
-#[allow(warnings)]
-#[path = "../src/fs/file.rs"]
-mod file;
-use file::File;
-
-#[allow(warnings)]
-#[path = "../src/io/blocking.rs"]
-mod blocking;
-
-// Load mocked types
-mod support {
- pub(crate) mod mock_file;
- pub(crate) mod mock_pool;
-}
-pub(crate) use support::mock_pool as pool;
-
-// Place them where the source expects them
-pub(crate) mod io {
- pub(crate) use tokio::io::*;
-
- pub(crate) use crate::blocking;
-
- pub(crate) mod sys {
- pub(crate) use crate::support::mock_pool::{run, Blocking};
- }
-}
-pub(crate) mod fs {
- pub(crate) mod sys {
- pub(crate) use crate::support::mock_file::File;
- pub(crate) use crate::support::mock_pool::{run, Blocking};
- }
-
- pub(crate) use crate::support::mock_pool::asyncify;
-}
-pub(crate) mod sync {
- pub(crate) use tokio::sync::Mutex;
-}
-use fs::sys;
-
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
-
-use std::io::SeekFrom;
-
-const HELLO: &[u8] = b"hello world...";
-const FOO: &[u8] = b"foo bar baz...";
-
-#[test]
-fn open_read() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO);
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 1024];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_eq!(0, pool::len());
- assert_pending!(t.poll());
-
- assert_eq!(1, mock.remaining());
- assert_eq!(1, pool::len());
-
- pool::run_one();
-
- assert_eq!(0, mock.remaining());
- assert!(t.is_woken());
-
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, HELLO.len());
- assert_eq!(&buf[..n], HELLO);
-}
-
-#[test]
-fn read_twice_before_dispatch() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO);
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 1024];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_pending!(t.poll());
- assert_pending!(t.poll());
-
- assert_eq!(pool::len(), 1);
- pool::run_one();
-
- assert!(t.is_woken());
-
- let n = assert_ready_ok!(t.poll());
- assert_eq!(&buf[..n], HELLO);
-}
-
-#[test]
-fn read_with_smaller_buf() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO);
-
- let mut file = File::from_std(file);
-
- {
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
- }
-
- pool::run_one();
-
- {
- let mut buf = [0; 4];
- let mut t = task::spawn(file.read(&mut buf));
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, 4);
- assert_eq!(&buf[..], &HELLO[..n]);
- }
-
- // Calling again immediately succeeds with the rest of the buffer
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, 10);
- assert_eq!(&buf[..n], &HELLO[4..]);
-
- assert_eq!(0, pool::len());
-}
-
-#[test]
-fn read_with_bigger_buf() {
- let (mock, file) = sys::File::mock();
- mock.read(&HELLO[..4]).read(&HELLO[4..]);
-
- let mut file = File::from_std(file);
-
- {
- let mut buf = [0; 4];
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
- }
-
- pool::run_one();
-
- {
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, 4);
- assert_eq!(&buf[..n], &HELLO[..n]);
- }
-
- // Calling again immediately succeeds with the rest of the buffer
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
-
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, 10);
- assert_eq!(&buf[..n], &HELLO[4..]);
-
- assert_eq!(0, pool::len());
-}
-
-#[test]
-fn read_err_then_read_success() {
- let (mock, file) = sys::File::mock();
- mock.read_err().read(&HELLO);
-
- let mut file = File::from_std(file);
-
- {
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
-
- pool::run_one();
-
- assert_ready_err!(t.poll());
- }
-
- {
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let n = assert_ready_ok!(t.poll());
-
- assert_eq!(n, HELLO.len());
- assert_eq!(&buf[..n], HELLO);
- }
-}
-
-#[test]
-fn open_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
-
- assert_eq!(0, pool::len());
- assert_ready_ok!(t.poll());
-
- assert_eq!(1, mock.remaining());
- assert_eq!(1, pool::len());
-
- pool::run_one();
-
- assert_eq!(0, mock.remaining());
- assert!(!t.is_woken());
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn flush_while_idle() {
- let (_mock, file) = sys::File::mock();
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn read_with_buffer_larger_than_max() {
- // Chunks
- let chunk_a = 16 * 1024;
- let chunk_b = chunk_a * 2;
- let chunk_c = chunk_a * 3;
- let chunk_d = chunk_a * 4;
-
- assert_eq!(chunk_d / 1024, 64);
-
- let mut data = vec![];
- for i in 0..(chunk_d - 1) {
- data.push((i % 151) as u8);
- }
-
- let (mock, file) = sys::File::mock();
- mock.read(&data[0..chunk_a])
- .read(&data[chunk_a..chunk_b])
- .read(&data[chunk_b..chunk_c])
- .read(&data[chunk_c..]);
-
- let mut file = File::from_std(file);
-
- let mut actual = vec![0; chunk_d];
- let mut pos = 0;
-
- while pos < data.len() {
- let mut t = task::spawn(file.read(&mut actual[pos..]));
-
- assert_pending!(t.poll());
- pool::run_one();
- assert!(t.is_woken());
-
- let n = assert_ready_ok!(t.poll());
- assert!(n <= chunk_a);
-
- pos += n;
- }
-
- assert_eq!(mock.remaining(), 0);
- assert_eq!(data, &actual[..data.len()]);
-}
-
-#[test]
-fn write_with_buffer_larger_than_max() {
- // Chunks
- let chunk_a = 16 * 1024;
- let chunk_b = chunk_a * 2;
- let chunk_c = chunk_a * 3;
- let chunk_d = chunk_a * 4;
-
- assert_eq!(chunk_d / 1024, 64);
-
- let mut data = vec![];
- for i in 0..(chunk_d - 1) {
- data.push((i % 151) as u8);
- }
-
- let (mock, file) = sys::File::mock();
- mock.write(&data[0..chunk_a])
- .write(&data[chunk_a..chunk_b])
- .write(&data[chunk_b..chunk_c])
- .write(&data[chunk_c..]);
-
- let mut file = File::from_std(file);
-
- let mut rem = &data[..];
-
- let mut first = true;
-
- while !rem.is_empty() {
- let mut task = task::spawn(file.write(rem));
-
- if !first {
- assert_pending!(task.poll());
- pool::run_one();
- assert!(task.is_woken());
- }
-
- first = false;
-
- let n = assert_ready_ok!(task.poll());
-
- rem = &rem[n..];
- }
-
- pool::run_one();
-
- assert_eq!(mock.remaining(), 0);
-}
-
-#[test]
-fn write_twice_before_dispatch() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).write(FOO);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.write(FOO));
- assert_pending!(t.poll());
-
- assert_eq!(pool::len(), 1);
- pool::run_one();
-
- assert!(t.is_woken());
-
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.flush());
- assert_pending!(t.poll());
-
- assert_eq!(pool::len(), 1);
- pool::run_one();
-
- assert!(t.is_woken());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn incomplete_read_followed_by_write() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO)
- .seek_current_ok(-(HELLO.len() as i64), 0)
- .write(FOO);
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 32];
-
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_ok!(t.poll());
-
- assert_eq!(pool::len(), 1);
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn incomplete_partial_read_followed_by_write() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO).seek_current_ok(-10, 0).write(FOO);
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 32];
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let mut buf = [0; 4];
- let mut t = task::spawn(file.read(&mut buf));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_ok!(t.poll());
-
- assert_eq!(pool::len(), 1);
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn incomplete_read_followed_by_flush() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO)
- .seek_current_ok(-(HELLO.len() as i64), 0)
- .write(FOO);
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 32];
-
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-}
-
-#[test]
-fn incomplete_flush_followed_by_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).write(FOO);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- let n = assert_ready_ok!(t.poll());
- assert_eq!(n, HELLO.len());
-
- let mut t = task::spawn(file.flush());
- assert_pending!(t.poll());
-
- // TODO: Move under write
- pool::run_one();
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn read_err() {
- let (mock, file) = sys::File::mock();
- mock.read_err();
-
- let mut file = File::from_std(file);
-
- let mut buf = [0; 1024];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_pending!(t.poll());
-
- pool::run_one();
- assert!(t.is_woken());
-
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn write_write_err() {
- let (mock, file) = sys::File::mock();
- mock.write_err();
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn write_read_write_err() {
- let (mock, file) = sys::File::mock();
- mock.write_err().read(HELLO);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- let mut buf = [0; 1024];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn write_read_flush_err() {
- let (mock, file) = sys::File::mock();
- mock.write_err().read(HELLO);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- let mut buf = [0; 1024];
- let mut t = task::spawn(file.read(&mut buf));
-
- assert_pending!(t.poll());
-
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn write_seek_write_err() {
- let (mock, file) = sys::File::mock();
- mock.write_err().seek_start_ok(0);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- {
- let mut t = task::spawn(file.seek(SeekFrom::Start(0)));
- assert_pending!(t.poll());
- }
-
- pool::run_one();
-
- let mut t = task::spawn(file.write(FOO));
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn write_seek_flush_err() {
- let (mock, file) = sys::File::mock();
- mock.write_err().seek_start_ok(0);
-
- let mut file = File::from_std(file);
-
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- pool::run_one();
-
- {
- let mut t = task::spawn(file.seek(SeekFrom::Start(0)));
- assert_pending!(t.poll());
- }
-
- pool::run_one();
-
- let mut t = task::spawn(file.flush());
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn sync_all_ordered_after_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).sync_all();
-
- let mut file = File::from_std(file);
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.sync_all());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn sync_all_err_ordered_after_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).sync_all_err();
-
- let mut file = File::from_std(file);
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.sync_all());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn sync_data_ordered_after_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).sync_data();
-
- let mut file = File::from_std(file);
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.sync_data());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn sync_data_err_ordered_after_write() {
- let (mock, file) = sys::File::mock();
- mock.write(HELLO).sync_data_err();
-
- let mut file = File::from_std(file);
- let mut t = task::spawn(file.write(HELLO));
- assert_ready_ok!(t.poll());
-
- let mut t = task::spawn(file.sync_data());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_pending!(t.poll());
-
- assert_eq!(1, pool::len());
- pool::run_one();
-
- assert!(t.is_woken());
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn open_set_len_ok() {
- let (mock, file) = sys::File::mock();
- mock.set_len(123);
-
- let file = File::from_std(file);
- let mut t = task::spawn(file.set_len(123));
-
- assert_pending!(t.poll());
- assert_eq!(1, mock.remaining());
-
- pool::run_one();
- assert_eq!(0, mock.remaining());
-
- assert!(t.is_woken());
- assert_ready_ok!(t.poll());
-}
-
-#[test]
-fn open_set_len_err() {
- let (mock, file) = sys::File::mock();
- mock.set_len_err(123);
-
- let file = File::from_std(file);
- let mut t = task::spawn(file.set_len(123));
-
- assert_pending!(t.poll());
- assert_eq!(1, mock.remaining());
-
- pool::run_one();
- assert_eq!(0, mock.remaining());
-
- assert!(t.is_woken());
- assert_ready_err!(t.poll());
-}
-
-#[test]
-fn partial_read_set_len_ok() {
- let (mock, file) = sys::File::mock();
- mock.read(HELLO)
- .seek_current_ok(-14, 0)
- .set_len(123)
- .read(FOO);
-
- let mut buf = [0; 32];
- let mut file = File::from_std(file);
-
- {
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
- }
-
- pool::run_one();
-
- {
- let mut t = task::spawn(file.set_len(123));
-
- assert_pending!(t.poll());
- pool::run_one();
- assert_ready_ok!(t.poll());
- }
-
- let mut t = task::spawn(file.read(&mut buf));
- assert_pending!(t.poll());
- pool::run_one();
- let n = assert_ready_ok!(t.poll());
-
- assert_eq!(n, FOO.len());
- assert_eq!(&buf[..n], FOO);
-}
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
index d1586bb..dc21e42 100644
--- a/tests/io_async_fd.rs
+++ b/tests/io_async_fd.rs
@@ -13,7 +13,6 @@ use std::{
task::{Context, Waker},
};
-use nix::errno::Errno;
use nix::unistd::{close, read, write};
use futures::{poll, FutureExt};
@@ -56,10 +55,6 @@ impl TestWaker {
}
}
-fn is_blocking(e: &nix::Error) -> bool {
- Some(Errno::EAGAIN) == e.as_errno()
-}
-
#[derive(Debug)]
struct FileDescriptor {
fd: RawFd,
@@ -73,11 +68,7 @@ impl AsRawFd for FileDescriptor {
impl Read for &FileDescriptor {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- match read(self.fd, buf) {
- Ok(n) => Ok(n),
- Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
- Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
- }
+ read(self.fd, buf).map_err(io::Error::from)
}
}
@@ -89,11 +80,7 @@ impl Read for FileDescriptor {
impl Write for &FileDescriptor {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- match write(self.fd, buf) {
- Ok(n) => Ok(n),
- Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
- Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
- }
+ write(self.fd, buf).map_err(io::Error::from)
}
fn flush(&mut self) -> io::Result<()> {
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index c72c058..0d3f6ba 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -8,9 +8,11 @@ use std::cmp;
use std::io::{self, Cursor};
use std::pin::Pin;
use tokio::io::{
- AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader,
- ReadBuf, SeekFrom,
+ AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt,
+ BufReader, ReadBuf, SeekFrom,
};
+use tokio_test::task::spawn;
+use tokio_test::{assert_pending, assert_ready};
macro_rules! run_fill_buf {
($reader:expr) => {{
@@ -348,3 +350,30 @@ async fn maybe_pending_seek() {
Pin::new(&mut reader).consume(1);
assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
}
+
+// This tests the AsyncBufReadExt::fill_buf wrapper.
+#[tokio::test]
+async fn test_fill_buf_wrapper() {
+ let (mut write, read) = tokio::io::duplex(16);
+
+ let mut read = BufReader::new(read);
+ write.write_all(b"hello world").await.unwrap();
+
+ assert_eq!(read.fill_buf().await.unwrap(), b"hello world");
+ read.consume(b"hello ".len());
+ assert_eq!(read.fill_buf().await.unwrap(), b"world");
+ assert_eq!(read.fill_buf().await.unwrap(), b"world");
+ read.consume(b"world".len());
+
+ let mut fill = spawn(read.fill_buf());
+ assert_pending!(fill.poll());
+
+ write.write_all(b"foo bar").await.unwrap();
+ assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar");
+ drop(fill);
+
+ drop(write);
+ assert_eq!(read.fill_buf().await.unwrap(), b"foo bar");
+ read.consume(b"foo bar".len());
+ assert_eq!(read.fill_buf().await.unwrap(), b"");
+}
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
index 6f4f10a..47a0d46 100644
--- a/tests/io_buf_writer.rs
+++ b/tests/io_buf_writer.rs
@@ -8,6 +8,17 @@ use std::io::{self, Cursor};
use std::pin::Pin;
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom};
+use futures::future;
+use tokio_test::assert_ok;
+
+use std::cmp;
+use std::io::IoSlice;
+
+mod support {
+ pub(crate) mod io_vec;
+}
+use support::io_vec::IoBufs;
+
struct MaybePending {
inner: Vec<u8>,
ready: bool,
@@ -47,6 +58,14 @@ impl AsyncWrite for MaybePending {
}
}
+async fn write_vectored<W>(writer: &mut W, bufs: &[IoSlice<'_>]) -> io::Result<usize>
+where
+ W: AsyncWrite + Unpin,
+{
+ let mut writer = Pin::new(writer);
+ future::poll_fn(|cx| writer.as_mut().poll_write_vectored(cx, bufs)).await
+}
+
#[tokio::test]
async fn buf_writer() {
let mut writer = BufWriter::with_capacity(2, Vec::new());
@@ -249,3 +268,270 @@ async fn maybe_pending_buf_writer_seek() {
&[0, 1, 8, 9, 4, 5, 6, 7]
);
}
+
+struct MockWriter {
+ data: Vec<u8>,
+ write_len: usize,
+ vectored: bool,
+}
+
+impl MockWriter {
+ fn new(write_len: usize) -> Self {
+ MockWriter {
+ data: Vec::new(),
+ write_len,
+ vectored: false,
+ }
+ }
+
+ fn vectored(write_len: usize) -> Self {
+ MockWriter {
+ data: Vec::new(),
+ write_len,
+ vectored: true,
+ }
+ }
+
+ fn write_up_to(&mut self, buf: &[u8], limit: usize) -> usize {
+ let len = cmp::min(buf.len(), limit);
+ self.data.extend_from_slice(&buf[..len]);
+ len
+ }
+}
+
+impl AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ let this = self.get_mut();
+ let n = this.write_up_to(buf, this.write_len);
+ Ok(n).into()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<Result<usize, io::Error>> {
+ let this = self.get_mut();
+ let mut total_written = 0;
+ for buf in bufs {
+ let n = this.write_up_to(buf, this.write_len - total_written);
+ total_written += n;
+ if total_written == this.write_len {
+ break;
+ }
+ }
+ Ok(total_written).into()
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.vectored
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Ok(()).into()
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Ok(()).into()
+ }
+}
+
+#[tokio::test]
+async fn write_vectored_empty_on_non_vectored() {
+ let mut w = BufWriter::new(MockWriter::new(4));
+ let n = assert_ok!(write_vectored(&mut w, &[]).await);
+ assert_eq!(n, 0);
+
+ let io_vec = [IoSlice::new(&[]); 3];
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 0);
+
+ assert_ok!(w.flush().await);
+ assert!(w.get_ref().data.is_empty());
+}
+
+#[tokio::test]
+async fn write_vectored_empty_on_vectored() {
+ let mut w = BufWriter::new(MockWriter::vectored(4));
+ let n = assert_ok!(write_vectored(&mut w, &[]).await);
+ assert_eq!(n, 0);
+
+ let io_vec = [IoSlice::new(&[]); 3];
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 0);
+
+ assert_ok!(w.flush().await);
+ assert!(w.get_ref().data.is_empty());
+}
+
+#[tokio::test]
+async fn write_vectored_basic_on_non_vectored() {
+ let msg = b"foo bar baz";
+ let bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&msg[4..8]),
+ IoSlice::new(&msg[8..]),
+ ];
+ let mut w = BufWriter::new(MockWriter::new(4));
+ let n = assert_ok!(write_vectored(&mut w, &bufs).await);
+ assert_eq!(n, msg.len());
+ assert!(w.buffer() == &msg[..]);
+ assert_ok!(w.flush().await);
+ assert_eq!(w.get_ref().data, msg);
+}
+
+#[tokio::test]
+async fn write_vectored_basic_on_vectored() {
+ let msg = b"foo bar baz";
+ let bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&msg[4..8]),
+ IoSlice::new(&msg[8..]),
+ ];
+ let mut w = BufWriter::new(MockWriter::vectored(4));
+ let n = assert_ok!(write_vectored(&mut w, &bufs).await);
+ assert_eq!(n, msg.len());
+ assert!(w.buffer() == &msg[..]);
+ assert_ok!(w.flush().await);
+ assert_eq!(w.get_ref().data, msg);
+}
+
+#[tokio::test]
+async fn write_vectored_large_total_on_non_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&msg[4..8]),
+ IoSlice::new(&msg[8..]),
+ ];
+ let io_vec = IoBufs::new(&mut bufs);
+ let mut w = BufWriter::with_capacity(8, MockWriter::new(4));
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 8);
+ assert!(w.buffer() == &msg[..8]);
+ let io_vec = io_vec.advance(n);
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 3);
+ assert!(w.get_ref().data.as_slice() == &msg[..8]);
+ assert!(w.buffer() == &msg[8..]);
+}
+
+#[tokio::test]
+async fn write_vectored_large_total_on_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&msg[4..8]),
+ IoSlice::new(&msg[8..]),
+ ];
+ let io_vec = IoBufs::new(&mut bufs);
+ let mut w = BufWriter::with_capacity(8, MockWriter::vectored(10));
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 10);
+ assert!(w.buffer().is_empty());
+ let io_vec = io_vec.advance(n);
+ let n = assert_ok!(write_vectored(&mut w, &io_vec).await);
+ assert_eq!(n, 1);
+ assert!(w.get_ref().data.as_slice() == &msg[..10]);
+ assert!(w.buffer() == &msg[10..]);
+}
+
+struct VectoredWriteHarness {
+ writer: BufWriter<MockWriter>,
+ buf_capacity: usize,
+}
+
+impl VectoredWriteHarness {
+ fn new(buf_capacity: usize) -> Self {
+ VectoredWriteHarness {
+ writer: BufWriter::with_capacity(buf_capacity, MockWriter::new(4)),
+ buf_capacity,
+ }
+ }
+
+ fn with_vectored_backend(buf_capacity: usize) -> Self {
+ VectoredWriteHarness {
+ writer: BufWriter::with_capacity(buf_capacity, MockWriter::vectored(4)),
+ buf_capacity,
+ }
+ }
+
+ async fn write_all<'a, 'b>(&mut self, mut io_vec: IoBufs<'a, 'b>) -> usize {
+ let mut total_written = 0;
+ while !io_vec.is_empty() {
+ let n = assert_ok!(write_vectored(&mut self.writer, &io_vec).await);
+ assert!(n != 0);
+ assert!(self.writer.buffer().len() <= self.buf_capacity);
+ total_written += n;
+ io_vec = io_vec.advance(n);
+ }
+ total_written
+ }
+
+ async fn flush(&mut self) -> &[u8] {
+ assert_ok!(self.writer.flush().await);
+ &self.writer.get_ref().data
+ }
+}
+
+#[tokio::test]
+async fn write_vectored_odd_on_non_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&[]),
+ IoSlice::new(&msg[4..9]),
+ IoSlice::new(&msg[9..]),
+ ];
+ let mut h = VectoredWriteHarness::new(8);
+ let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await;
+ assert_eq!(bytes_written, msg.len());
+ assert_eq!(h.flush().await, msg);
+}
+
+#[tokio::test]
+async fn write_vectored_odd_on_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&msg[0..4]),
+ IoSlice::new(&[]),
+ IoSlice::new(&msg[4..9]),
+ IoSlice::new(&msg[9..]),
+ ];
+ let mut h = VectoredWriteHarness::with_vectored_backend(8);
+ let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await;
+ assert_eq!(bytes_written, msg.len());
+ assert_eq!(h.flush().await, msg);
+}
+
+#[tokio::test]
+async fn write_vectored_large_slice_on_non_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&[]),
+ IoSlice::new(&msg[..9]),
+ IoSlice::new(&msg[9..]),
+ ];
+ let mut h = VectoredWriteHarness::new(8);
+ let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await;
+ assert_eq!(bytes_written, msg.len());
+ assert_eq!(h.flush().await, msg);
+}
+
+#[tokio::test]
+async fn write_vectored_large_slice_on_vectored() {
+ let msg = b"foo bar baz";
+ let mut bufs = [
+ IoSlice::new(&[]),
+ IoSlice::new(&msg[..9]),
+ IoSlice::new(&msg[9..]),
+ ];
+ let mut h = VectoredWriteHarness::with_vectored_backend(8);
+ let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await;
+ assert_eq!(bytes_written, msg.len());
+ assert_eq!(h.flush().await, msg);
+}
diff --git a/tests/io_copy.rs b/tests/io_copy.rs
index 9ed7995..005e170 100644
--- a/tests/io_copy.rs
+++ b/tests/io_copy.rs
@@ -1,7 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::io::{self, AsyncRead, ReadBuf};
+use bytes::BytesMut;
+use futures::ready;
+use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_test::assert_ok;
use std::pin::Pin;
@@ -34,3 +36,52 @@ async fn copy() {
assert_eq!(n, 11);
assert_eq!(wr, b"hello world");
}
+
+#[tokio::test]
+async fn proxy() {
+ struct BufferedWd {
+ buf: BytesMut,
+ writer: io::DuplexStream,
+ }
+
+ impl AsyncWrite for BufferedWd {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.get_mut().buf.extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = self.get_mut();
+
+ while !this.buf.is_empty() {
+ let n = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf))?;
+ let _ = this.buf.split_to(n);
+ }
+
+ Pin::new(&mut this.writer).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.writer).poll_shutdown(cx)
+ }
+ }
+
+ let (rd, wd) = io::duplex(1024);
+ let mut rd = rd.take(1024);
+ let mut wd = BufferedWd {
+ buf: BytesMut::new(),
+ writer: wd,
+ };
+
+ // write start bytes
+ assert_ok!(wd.write_all(&[0x42; 512]).await);
+ assert_ok!(wd.flush().await);
+
+ let n = assert_ok!(io::copy(&mut rd, &mut wd).await);
+
+ assert_eq!(n, 1024);
+}
diff --git a/tests/io_copy_bidirectional.rs b/tests/io_copy_bidirectional.rs
index 17c0597..0e82b29 100644
--- a/tests/io_copy_bidirectional.rs
+++ b/tests/io_copy_bidirectional.rs
@@ -26,7 +26,7 @@ async fn block_write(s: &mut TcpStream) -> usize {
result = s.write(&BUF) => {
copied += result.expect("write error")
},
- _ = tokio::time::sleep(Duration::from_millis(100)) => {
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {
break;
}
}
@@ -42,7 +42,7 @@ where
{
// We run the test twice, with streams passed to copy_bidirectional in
// different orders, in order to ensure that the two arguments are
- // interchangable.
+ // interchangeable.
let (a, mut a1) = make_socketpair().await;
let (b, mut b1) = make_socketpair().await;
diff --git a/tests/io_split.rs b/tests/io_split.rs
index db168e9..a012166 100644
--- a/tests/io_split.rs
+++ b/tests/io_split.rs
@@ -50,10 +50,10 @@ fn is_send_and_sync() {
fn split_stream_id() {
let (r1, w1) = split(RW);
let (r2, w2) = split(RW);
- assert_eq!(r1.is_pair_of(&w1), true);
- assert_eq!(r1.is_pair_of(&w2), false);
- assert_eq!(r2.is_pair_of(&w2), true);
- assert_eq!(r2.is_pair_of(&w1), false);
+ assert!(r1.is_pair_of(&w1));
+ assert!(!r1.is_pair_of(&w2));
+ assert!(r2.is_pair_of(&w2));
+ assert!(!r2.is_pair_of(&w1));
}
#[test]
diff --git a/tests/io_write_all_buf.rs b/tests/io_write_all_buf.rs
index b49a58e..7c8b619 100644
--- a/tests/io_write_all_buf.rs
+++ b/tests/io_write_all_buf.rs
@@ -52,7 +52,7 @@ async fn write_all_buf() {
assert_eq!(wr.buf, b"helloworld"[..]);
// expect 4 writes, [hell],[o],[worl],[d]
assert_eq!(wr.cnt, 4);
- assert_eq!(buf.has_remaining(), false);
+ assert!(!buf.has_remaining());
}
#[tokio::test]
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index a089602..4da88fb 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -360,7 +360,21 @@ async fn use_future_in_if_condition() {
use tokio::time::{self, Duration};
tokio::select! {
- _ = time::sleep(Duration::from_millis(50)), if false => {
+ _ = time::sleep(Duration::from_millis(10)), if false => {
+ panic!("if condition ignored")
+ }
+ _ = async { 1u32 } => {
+ }
+ }
+}
+
+#[tokio::test]
+async fn use_future_in_if_condition_biased() {
+ use tokio::time::{self, Duration};
+
+ tokio::select! {
+ biased;
+ _ = time::sleep(Duration::from_millis(10)), if false => {
panic!("if condition ignored")
}
_ = async { 1u32 } => {
@@ -456,10 +470,7 @@ async fn require_mutable(_: &mut i32) {}
async fn async_noop() {}
async fn async_never() -> ! {
- use tokio::time::Duration;
- loop {
- tokio::time::sleep(Duration::from_millis(10)).await;
- }
+ futures::future::pending().await
}
// From https://github.com/tokio-rs/tokio/issues/2857
diff --git a/tests/named_pipe.rs b/tests/named_pipe.rs
index 3f26767..2055c3c 100644
--- a/tests/named_pipe.rs
+++ b/tests/named_pipe.rs
@@ -126,7 +126,7 @@ async fn test_named_pipe_multi_client() -> io::Result<()> {
}
// Wait for a named pipe to become available.
- time::sleep(Duration::from_millis(50)).await;
+ time::sleep(Duration::from_millis(10)).await;
};
let mut client = BufReader::new(client);
@@ -148,6 +148,185 @@ async fn test_named_pipe_multi_client() -> io::Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
+ use tokio::io::Interest;
+
+ const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
+ const N: usize = 10;
+
+ // The first server needs to be constructed early so that clients can
+ // be correctly connected. Otherwise calling .wait will cause the client to
+ // error.
+ let mut server = ServerOptions::new().create(PIPE_NAME)?;
+
+ let server = tokio::spawn(async move {
+ for _ in 0..N {
+ // Wait for client to connect.
+ server.connect().await?;
+
+ let inner_server = server;
+
+ // Construct the next server to be connected before sending the one
+ // we already have of onto a task. This ensures that the server
+ // isn't closed (after it's done in the task) before a new one is
+ // available. Otherwise the client might error with
+ // `io::ErrorKind::NotFound`.
+ server = ServerOptions::new().create(PIPE_NAME)?;
+
+ let _ = tokio::spawn(async move {
+ let server = inner_server;
+
+ {
+ let mut read_buf = [0u8; 5];
+ let mut read_buf_cursor = 0;
+
+ loop {
+ server.readable().await?;
+
+ let buf = &mut read_buf[read_buf_cursor..];
+
+ match server.try_read(buf) {
+ Ok(n) => {
+ read_buf_cursor += n;
+
+ if read_buf_cursor == read_buf.len() {
+ break;
+ }
+ }
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue;
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+ };
+
+ {
+ let write_buf = b"pong\n";
+ let mut write_buf_cursor = 0;
+
+ loop {
+ server.writable().await?;
+ let buf = &write_buf[write_buf_cursor..];
+
+ match server.try_write(buf) {
+ Ok(n) => {
+ write_buf_cursor += n;
+
+ if write_buf_cursor == write_buf.len() {
+ break;
+ }
+ }
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue;
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+ }
+
+ Ok::<_, io::Error>(())
+ });
+ }
+
+ Ok::<_, io::Error>(())
+ });
+
+ let mut clients = Vec::new();
+
+ for _ in 0..N {
+ clients.push(tokio::spawn(async move {
+ // This showcases a generic connect loop.
+ //
+ // We immediately try to create a client, if it's not found or the
+ // pipe is busy we use the specialized wait function on the client
+ // builder.
+ let client = loop {
+ match ClientOptions::new().open(PIPE_NAME) {
+ Ok(client) => break client,
+ Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
+ Err(e) if e.kind() == io::ErrorKind::NotFound => (),
+ Err(e) => return Err(e),
+ }
+
+ // Wait for a named pipe to become available.
+ time::sleep(Duration::from_millis(10)).await;
+ };
+
+ let mut read_buf = [0u8; 5];
+ let mut read_buf_cursor = 0;
+ let write_buf = b"ping\n";
+ let mut write_buf_cursor = 0;
+
+ loop {
+ let mut interest = Interest::READABLE;
+ if write_buf_cursor < write_buf.len() {
+ interest |= Interest::WRITABLE;
+ }
+
+ let ready = client.ready(interest).await?;
+
+ if ready.is_readable() {
+ let buf = &mut read_buf[read_buf_cursor..];
+
+ match client.try_read(buf) {
+ Ok(n) => {
+ read_buf_cursor += n;
+
+ if read_buf_cursor == read_buf.len() {
+ break;
+ }
+ }
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue;
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+
+ if ready.is_writable() {
+ let buf = &write_buf[write_buf_cursor..];
+
+ if buf.is_empty() {
+ continue;
+ }
+
+ match client.try_write(buf) {
+ Ok(n) => {
+ write_buf_cursor += n;
+ }
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ continue;
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+ }
+
+ let buf = String::from_utf8_lossy(&read_buf).into_owned();
+
+ Ok::<_, io::Error>(buf)
+ }));
+ }
+
+ for client in clients {
+ let result = client.await?;
+ assert_eq!(result?, "pong\n");
+ }
+
+ server.await??;
+ Ok(())
+}
+
// This tests what happens when a client tries to disconnect.
#[tokio::test]
async fn test_named_pipe_mode_message() -> io::Result<()> {
diff --git a/tests/no_rt.rs b/tests/no_rt.rs
index 8437b80..6845850 100644
--- a/tests/no_rt.rs
+++ b/tests/no_rt.rs
@@ -26,7 +26,7 @@ fn panics_when_no_reactor() {
async fn timeout_value() {
let (_tx, rx) = oneshot::channel::<()>();
- let dur = Duration::from_millis(20);
+ let dur = Duration::from_millis(10);
let _ = timeout(dur, rx).await;
}
diff --git a/tests/process_arg0.rs b/tests/process_arg0.rs
new file mode 100644
index 0000000..4fabea0
--- /dev/null
+++ b/tests/process_arg0.rs
@@ -0,0 +1,13 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "full", unix))]
+
+use tokio::process::Command;
+
+#[tokio::test]
+async fn arg0() {
+ let mut cmd = Command::new("sh");
+ cmd.arg0("test_string").arg("-c").arg("echo $0");
+
+ let output = cmd.output().await.unwrap();
+ assert_eq!(output.stdout, b"test_string\n");
+}
diff --git a/tests/process_raw_handle.rs b/tests/process_raw_handle.rs
new file mode 100644
index 0000000..727e66d
--- /dev/null
+++ b/tests/process_raw_handle.rs
@@ -0,0 +1,23 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+#![cfg(windows)]
+
+use tokio::process::Command;
+use winapi::um::processthreadsapi::GetProcessId;
+
+#[tokio::test]
+async fn obtain_raw_handle() {
+ let mut cmd = Command::new("cmd");
+ cmd.kill_on_drop(true);
+ cmd.arg("/c");
+ cmd.arg("pause");
+
+ let child = cmd.spawn().unwrap();
+
+ let orig_id = child.id().expect("missing id");
+ assert!(orig_id > 0);
+
+ let handle = child.raw_handle().expect("process stopped");
+ let handled_id = unsafe { GetProcessId(handle as _) };
+ assert_eq!(handled_id, orig_id);
+}
diff --git a/tests/support/io_vec.rs b/tests/support/io_vec.rs
new file mode 100644
index 0000000..4ea47c7
--- /dev/null
+++ b/tests/support/io_vec.rs
@@ -0,0 +1,45 @@
+use std::io::IoSlice;
+use std::ops::Deref;
+use std::slice;
+
+pub struct IoBufs<'a, 'b>(&'b mut [IoSlice<'a>]);
+
+impl<'a, 'b> IoBufs<'a, 'b> {
+ pub fn new(slices: &'b mut [IoSlice<'a>]) -> Self {
+ IoBufs(slices)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ pub fn advance(mut self, n: usize) -> IoBufs<'a, 'b> {
+ let mut to_remove = 0;
+ let mut remaining_len = n;
+ for slice in self.0.iter() {
+ if remaining_len < slice.len() {
+ break;
+ } else {
+ remaining_len -= slice.len();
+ to_remove += 1;
+ }
+ }
+ self.0 = self.0.split_at_mut(to_remove).1;
+ if let Some(slice) = self.0.first_mut() {
+ let tail = &slice[remaining_len..];
+ // Safety: recasts slice to the original lifetime
+ let tail = unsafe { slice::from_raw_parts(tail.as_ptr(), tail.len()) };
+ *slice = IoSlice::new(tail);
+ } else if remaining_len != 0 {
+ panic!("advance past the end of the slice vector");
+ }
+ self
+ }
+}
+
+impl<'a, 'b> Deref for IoBufs<'a, 'b> {
+ type Target = [IoSlice<'a>];
+ fn deref(&self) -> &[IoSlice<'a>] {
+ self.0
+ }
+}
diff --git a/tests/support/mock_file.rs b/tests/support/mock_file.rs
deleted file mode 100644
index 1ce326b..0000000
--- a/tests/support/mock_file.rs
+++ /dev/null
@@ -1,295 +0,0 @@
-#![allow(clippy::unnecessary_operation)]
-
-use std::collections::VecDeque;
-use std::fmt;
-use std::fs::{Metadata, Permissions};
-use std::io;
-use std::io::prelude::*;
-use std::io::SeekFrom;
-use std::path::PathBuf;
-use std::sync::{Arc, Mutex};
-
-pub struct File {
- shared: Arc<Mutex<Shared>>,
-}
-
-pub struct Handle {
- shared: Arc<Mutex<Shared>>,
-}
-
-struct Shared {
- calls: VecDeque<Call>,
-}
-
-#[derive(Debug)]
-enum Call {
- Read(io::Result<Vec<u8>>),
- Write(io::Result<Vec<u8>>),
- Seek(SeekFrom, io::Result<u64>),
- SyncAll(io::Result<()>),
- SyncData(io::Result<()>),
- SetLen(u64, io::Result<()>),
-}
-
-impl Handle {
- pub fn read(&self, data: &[u8]) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls.push_back(Call::Read(Ok(data.to_owned())));
- self
- }
-
- pub fn read_err(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::Read(Err(io::ErrorKind::Other.into())));
- self
- }
-
- pub fn write(&self, data: &[u8]) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls.push_back(Call::Write(Ok(data.to_owned())));
- self
- }
-
- pub fn write_err(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::Write(Err(io::ErrorKind::Other.into())));
- self
- }
-
- pub fn seek_start_ok(&self, offset: u64) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::Seek(SeekFrom::Start(offset), Ok(offset)));
- self
- }
-
- pub fn seek_current_ok(&self, offset: i64, ret: u64) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::Seek(SeekFrom::Current(offset), Ok(ret)));
- self
- }
-
- pub fn sync_all(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls.push_back(Call::SyncAll(Ok(())));
- self
- }
-
- pub fn sync_all_err(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::SyncAll(Err(io::ErrorKind::Other.into())));
- self
- }
-
- pub fn sync_data(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls.push_back(Call::SyncData(Ok(())));
- self
- }
-
- pub fn sync_data_err(&self) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::SyncData(Err(io::ErrorKind::Other.into())));
- self
- }
-
- pub fn set_len(&self, size: u64) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls.push_back(Call::SetLen(size, Ok(())));
- self
- }
-
- pub fn set_len_err(&self, size: u64) -> &Self {
- let mut s = self.shared.lock().unwrap();
- s.calls
- .push_back(Call::SetLen(size, Err(io::ErrorKind::Other.into())));
- self
- }
-
- pub fn remaining(&self) -> usize {
- let s = self.shared.lock().unwrap();
- s.calls.len()
- }
-}
-
-impl Drop for Handle {
- fn drop(&mut self) {
- if !std::thread::panicking() {
- let s = self.shared.lock().unwrap();
- assert_eq!(0, s.calls.len());
- }
- }
-}
-
-impl File {
- pub fn open(_: PathBuf) -> io::Result<File> {
- unimplemented!();
- }
-
- pub fn create(_: PathBuf) -> io::Result<File> {
- unimplemented!();
- }
-
- pub fn mock() -> (Handle, File) {
- let shared = Arc::new(Mutex::new(Shared {
- calls: VecDeque::new(),
- }));
-
- let handle = Handle {
- shared: shared.clone(),
- };
- let file = File { shared };
-
- (handle, file)
- }
-
- pub fn sync_all(&self) -> io::Result<()> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(SyncAll(ret)) => ret,
- Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
- None => panic!("did not expect call"),
- }
- }
-
- pub fn sync_data(&self) -> io::Result<()> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(SyncData(ret)) => ret,
- Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
- None => panic!("did not expect call"),
- }
- }
-
- pub fn set_len(&self, size: u64) -> io::Result<()> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(SetLen(arg, ret)) => {
- assert_eq!(arg, size);
- ret
- }
- Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
- None => panic!("did not expect call"),
- }
- }
-
- pub fn metadata(&self) -> io::Result<Metadata> {
- unimplemented!();
- }
-
- pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()> {
- unimplemented!();
- }
-
- pub fn try_clone(&self) -> io::Result<Self> {
- unimplemented!();
- }
-}
-
-impl Read for &'_ File {
- fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(Read(Ok(data))) => {
- assert!(dst.len() >= data.len());
- assert!(dst.len() <= 16 * 1024, "actual = {}", dst.len()); // max buffer
-
- &mut dst[..data.len()].copy_from_slice(&data);
- Ok(data.len())
- }
- Some(Read(Err(e))) => Err(e),
- Some(op) => panic!("expected next call to be {:?}; was a read", op),
- None => panic!("did not expect call"),
- }
- }
-}
-
-impl Write for &'_ File {
- fn write(&mut self, src: &[u8]) -> io::Result<usize> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(Write(Ok(data))) => {
- assert_eq!(src, &data[..]);
- Ok(src.len())
- }
- Some(Write(Err(e))) => Err(e),
- Some(op) => panic!("expected next call to be {:?}; was write", op),
- None => panic!("did not expect call"),
- }
- }
-
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-
-impl Seek for &'_ File {
- fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
- use self::Call::*;
-
- let mut s = self.shared.lock().unwrap();
-
- match s.calls.pop_front() {
- Some(Seek(expect, res)) => {
- assert_eq!(expect, pos);
- res
- }
- Some(op) => panic!("expected call {:?}; was `seek`", op),
- None => panic!("did not expect call; was `seek`"),
- }
- }
-}
-
-impl fmt::Debug for File {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("mock::File").finish()
- }
-}
-
-#[cfg(unix)]
-impl std::os::unix::io::AsRawFd for File {
- fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
- unimplemented!();
- }
-}
-
-#[cfg(unix)]
-impl std::os::unix::io::FromRawFd for File {
- unsafe fn from_raw_fd(_: std::os::unix::io::RawFd) -> Self {
- unimplemented!();
- }
-}
-
-#[cfg(windows)]
-impl std::os::windows::io::AsRawHandle for File {
- fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
- unimplemented!();
- }
-}
-
-#[cfg(windows)]
-impl std::os::windows::io::FromRawHandle for File {
- unsafe fn from_raw_handle(_: std::os::windows::io::RawHandle) -> Self {
- unimplemented!();
- }
-}
diff --git a/tests/support/mock_pool.rs b/tests/support/mock_pool.rs
deleted file mode 100644
index e1fdb42..0000000
--- a/tests/support/mock_pool.rs
+++ /dev/null
@@ -1,66 +0,0 @@
-use tokio::sync::oneshot;
-
-use std::cell::RefCell;
-use std::collections::VecDeque;
-use std::future::Future;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-thread_local! {
- static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
-}
-
-#[derive(Debug)]
-pub(crate) struct Blocking<T> {
- rx: oneshot::Receiver<T>,
-}
-
-pub(crate) fn run<F, R>(f: F) -> Blocking<R>
-where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
-{
- let (tx, rx) = oneshot::channel();
- let task = Box::new(move || {
- let _ = tx.send(f());
- });
-
- QUEUE.with(|cell| cell.borrow_mut().push_back(task));
-
- Blocking { rx }
-}
-
-impl<T> Future for Blocking<T> {
- type Output = Result<T, io::Error>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- use std::task::Poll::*;
-
- match Pin::new(&mut self.rx).poll(cx) {
- Ready(Ok(v)) => Ready(Ok(v)),
- Ready(Err(e)) => panic!("error = {:?}", e),
- Pending => Pending,
- }
- }
-}
-
-pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
-where
- F: FnOnce() -> io::Result<T> + Send + 'static,
- T: Send + 'static,
-{
- run(f).await?
-}
-
-pub(crate) fn len() -> usize {
- QUEUE.with(|cell| cell.borrow().len())
-}
-
-pub(crate) fn run_one() {
- let task = QUEUE
- .with(|cell| cell.borrow_mut().pop_front())
- .expect("expected task to run, but none ready");
-
- task();
-}
diff --git a/tests/sync_mutex.rs b/tests/sync_mutex.rs
index 0ddb203..090db94 100644
--- a/tests/sync_mutex.rs
+++ b/tests/sync_mutex.rs
@@ -139,12 +139,12 @@ fn try_lock() {
let m: Mutex<usize> = Mutex::new(0);
{
let g1 = m.try_lock();
- assert_eq!(g1.is_ok(), true);
+ assert!(g1.is_ok());
let g2 = m.try_lock();
- assert_eq!(g2.is_ok(), false);
+ assert!(!g2.is_ok());
}
let g3 = m.try_lock();
- assert_eq!(g3.is_ok(), true);
+ assert!(g3.is_ok());
}
#[tokio::test]
diff --git a/tests/sync_mutex_owned.rs b/tests/sync_mutex_owned.rs
index 0f1399c..898bf35 100644
--- a/tests/sync_mutex_owned.rs
+++ b/tests/sync_mutex_owned.rs
@@ -106,12 +106,12 @@ fn try_lock_owned() {
let m: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
{
let g1 = m.clone().try_lock_owned();
- assert_eq!(g1.is_ok(), true);
+ assert!(g1.is_ok());
let g2 = m.clone().try_lock_owned();
- assert_eq!(g2.is_ok(), false);
+ assert!(!g2.is_ok());
}
let g3 = m.try_lock_owned();
- assert_eq!(g3.is_ok(), true);
+ assert!(g3.is_ok());
}
#[tokio::test]
diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs
index 60f50d2..18eaf93 100644
--- a/tests/sync_once_cell.rs
+++ b/tests/sync_once_cell.rs
@@ -266,3 +266,9 @@ fn drop_into_inner_new_with() {
let count = NUM_DROPS.load(Ordering::Acquire);
assert!(count == 1);
}
+
+#[test]
+fn from() {
+ let cell = OnceCell::from(2);
+ assert_eq!(*cell.get().unwrap(), 2);
+}
diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs
index e12052b..7d05086 100644
--- a/tests/sync_rwlock.rs
+++ b/tests/sync_rwlock.rs
@@ -50,8 +50,8 @@ fn read_exclusive_pending() {
assert_pending!(t2.poll());
}
-// If the max shared access is reached and subsquent shared access is pending
-// should be made available when one of the shared acesses is dropped
+// If the max shared access is reached and subsequent shared access is pending
+// should be made available when one of the shared accesses is dropped
#[test]
fn exhaust_reading() {
let rwlock = RwLock::with_max_readers(100, 1024);
diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs
index 9dcb0c5..a2a276d 100644
--- a/tests/sync_watch.rs
+++ b/tests/sync_watch.rs
@@ -169,3 +169,20 @@ fn poll_close() {
assert!(tx.send("two").is_err());
}
+
+#[test]
+fn borrow_and_update() {
+ let (tx, mut rx) = watch::channel("one");
+
+ tx.send("two").unwrap();
+ assert_ready!(spawn(rx.changed()).poll()).unwrap();
+ assert_pending!(spawn(rx.changed()).poll());
+
+ tx.send("three").unwrap();
+ assert_eq!(*rx.borrow_and_update(), "three");
+ assert_pending!(spawn(rx.changed()).poll());
+
+ drop(tx);
+ assert_eq!(*rx.borrow_and_update(), "three");
+ assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
+}
diff --git a/tests/task_abort.rs b/tests/task_abort.rs
index 1d72ac3..06c61dc 100644
--- a/tests/task_abort.rs
+++ b/tests/task_abort.rs
@@ -1,11 +1,25 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use std::sync::Arc;
+use std::thread::sleep;
+use tokio::time::Duration;
+
+use tokio::runtime::Builder;
+
+struct PanicOnDrop;
+
+impl Drop for PanicOnDrop {
+ fn drop(&mut self) {
+ panic!("Well what did you expect would happen...");
+ }
+}
+
/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
fn test_abort_without_panic_3157() {
- let rt = tokio::runtime::Builder::new_multi_thread()
+ let rt = Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
@@ -14,11 +28,11 @@ fn test_abort_without_panic_3157() {
rt.block_on(async move {
let handle = tokio::spawn(async move {
println!("task started");
- tokio::time::sleep(std::time::Duration::new(100, 0)).await
+ tokio::time::sleep(Duration::new(100, 0)).await
});
// wait for task to sleep.
- tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+ tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
let _ = handle.await;
@@ -41,9 +55,7 @@ fn test_abort_without_panic_3662() {
}
}
- let rt = tokio::runtime::Builder::new_current_thread()
- .build()
- .unwrap();
+ let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async move {
let drop_flag = Arc::new(AtomicBool::new(false));
@@ -62,18 +74,16 @@ fn test_abort_without_panic_3662() {
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
- // dropped (in this thread no less).
+ // dropped (but not in this thread).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
- // TODO: is this guaranteed at this point?
- // assert!(drop_flag2.load(Ordering::SeqCst));
j
})
.join()
.unwrap();
- assert!(drop_flag.load(Ordering::SeqCst));
let result = task.await;
+ assert!(drop_flag.load(Ordering::SeqCst));
assert!(result.unwrap_err().is_cancelled());
// Note: We do the following to trigger a deferred task cleanup.
@@ -82,7 +92,7 @@ fn test_abort_without_panic_3662() {
// `Inner::block_on` of `basic_scheduler.rs`.
//
// We cause the cleanup to happen by having a poll return Pending once
- // so that the scheduler can go into the "auxilliary tasks" mode, at
+ // so that the scheduler can go into the "auxiliary tasks" mode, at
// which point the task is removed from the scheduler.
let i = tokio::spawn(async move {
tokio::task::yield_now().await;
@@ -91,3 +101,126 @@ fn test_abort_without_panic_3662() {
i.await.unwrap();
});
}
+
+/// Checks that a suspended LocalSet task can be aborted from a remote thread
+/// without panicking and without running the tasks destructor on the wrong thread.
+/// <https://github.com/tokio-rs/tokio/issues/3929>
+#[test]
+fn remote_abort_local_set_3929() {
+ struct DropCheck {
+ created_on: std::thread::ThreadId,
+ not_send: std::marker::PhantomData<*const ()>,
+ }
+
+ impl DropCheck {
+ fn new() -> Self {
+ Self {
+ created_on: std::thread::current().id(),
+ not_send: std::marker::PhantomData,
+ }
+ }
+ }
+ impl Drop for DropCheck {
+ fn drop(&mut self) {
+ if std::thread::current().id() != self.created_on {
+ panic!("non-Send value dropped in another thread!");
+ }
+ }
+ }
+
+ let rt = Builder::new_current_thread().build().unwrap();
+ let local = tokio::task::LocalSet::new();
+
+ let check = DropCheck::new();
+ let jh = local.spawn_local(async move {
+ futures::future::pending::<()>().await;
+ drop(check);
+ });
+
+ let jh2 = std::thread::spawn(move || {
+ sleep(Duration::from_millis(10));
+ jh.abort();
+ });
+
+ rt.block_on(local);
+ jh2.join().unwrap();
+}
+
+/// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
+/// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>.
+#[test]
+fn test_abort_wakes_task_3964() {
+ let rt = Builder::new_current_thread().enable_time().build().unwrap();
+
+ rt.block_on(async move {
+ let notify_dropped = Arc::new(());
+ let weak_notify_dropped = Arc::downgrade(&notify_dropped);
+
+ let handle = tokio::spawn(async move {
+ // Make sure the Arc is moved into the task
+ let _notify_dropped = notify_dropped;
+ println!("task started");
+ tokio::time::sleep(Duration::new(100, 0)).await
+ });
+
+ // wait for task to sleep.
+ tokio::time::sleep(Duration::from_millis(10)).await;
+
+ handle.abort();
+ drop(handle);
+
+ // wait for task to abort.
+ tokio::time::sleep(Duration::from_millis(10)).await;
+
+ // Check that the Arc has been dropped.
+ assert!(weak_notify_dropped.upgrade().is_none());
+ });
+}
+
+/// Checks that aborting a task whose destructor panics does not allow the
+/// panic to escape the task.
+#[test]
+#[cfg(not(target_os = "android"))]
+fn test_abort_task_that_panics_on_drop_contained() {
+ let rt = Builder::new_current_thread().enable_time().build().unwrap();
+
+ rt.block_on(async move {
+ let handle = tokio::spawn(async move {
+ // Make sure the Arc is moved into the task
+ let _panic_dropped = PanicOnDrop;
+ println!("task started");
+ tokio::time::sleep(Duration::new(100, 0)).await
+ });
+
+ // wait for task to sleep.
+ tokio::time::sleep(Duration::from_millis(10)).await;
+
+ handle.abort();
+ drop(handle);
+
+ // wait for task to abort.
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ });
+}
+
+/// Checks that aborting a task whose destructor panics has the expected result.
+#[test]
+#[cfg(not(target_os = "android"))]
+fn test_abort_task_that_panics_on_drop_returned() {
+ let rt = Builder::new_current_thread().enable_time().build().unwrap();
+
+ rt.block_on(async move {
+ let handle = tokio::spawn(async move {
+ // Make sure the Arc is moved into the task
+ let _panic_dropped = PanicOnDrop;
+ println!("task started");
+ tokio::time::sleep(Duration::new(100, 0)).await
+ });
+
+ // wait for task to sleep.
+ tokio::time::sleep(Duration::from_millis(10)).await;
+
+ handle.abort();
+ assert!(handle.await.unwrap_err().is_panic());
+ });
+}
diff --git a/tests/task_builder.rs b/tests/task_builder.rs
new file mode 100644
index 0000000..1499abf
--- /dev/null
+++ b/tests/task_builder.rs
@@ -0,0 +1,67 @@
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+mod tests {
+ use std::rc::Rc;
+ use tokio::{
+ task::{Builder, LocalSet},
+ test,
+ };
+
+ #[test]
+ async fn spawn_with_name() {
+ let result = Builder::new()
+ .name("name")
+ .spawn(async { "task executed" })
+ .await;
+
+ assert_eq!(result.unwrap(), "task executed");
+ }
+
+ #[test]
+ async fn spawn_blocking_with_name() {
+ let result = Builder::new()
+ .name("name")
+ .spawn_blocking(|| "task executed")
+ .await;
+
+ assert_eq!(result.unwrap(), "task executed");
+ }
+
+ #[test]
+ async fn spawn_local_with_name() {
+ let unsend_data = Rc::new("task executed");
+ let result = LocalSet::new()
+ .run_until(async move {
+ Builder::new()
+ .name("name")
+ .spawn_local(async move { unsend_data })
+ .await
+ })
+ .await;
+
+ assert_eq!(*result.unwrap(), "task executed");
+ }
+
+ #[test]
+ async fn spawn_without_name() {
+ let result = Builder::new().spawn(async { "task executed" }).await;
+
+ assert_eq!(result.unwrap(), "task executed");
+ }
+
+ #[test]
+ async fn spawn_blocking_without_name() {
+ let result = Builder::new().spawn_blocking(|| "task executed").await;
+
+ assert_eq!(result.unwrap(), "task executed");
+ }
+
+ #[test]
+ async fn spawn_local_without_name() {
+ let unsend_data = Rc::new("task executed");
+ let result = LocalSet::new()
+ .run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await })
+ .await;
+
+ assert_eq!(*result.unwrap(), "task executed");
+ }
+}
diff --git a/tests/task_local_set.rs b/tests/task_local_set.rs
index 8513609..f8a35d0 100644
--- a/tests/task_local_set.rs
+++ b/tests/task_local_set.rs
@@ -67,11 +67,11 @@ async fn localset_future_timers() {
let local = LocalSet::new();
local.spawn_local(async move {
- time::sleep(Duration::from_millis(10)).await;
+ time::sleep(Duration::from_millis(5)).await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
- time::sleep(Duration::from_millis(20)).await;
+ time::sleep(Duration::from_millis(10)).await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
@@ -299,9 +299,7 @@ fn drop_cancels_tasks() {
let _rc2 = rc2;
started_tx.send(()).unwrap();
- loop {
- time::sleep(Duration::from_secs(3600)).await;
- }
+ futures::future::pending::<()>().await;
});
local.block_on(&rt, async {
@@ -334,7 +332,7 @@ fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
// something we can easily make assertions about, we'll run it in a
// thread. When the test thread finishes, it will send a message on a
// channel to this thread. We'll wait for that message with a fairly
- // generous timeout, and if we don't recieve it, we assume the test
+ // generous timeout, and if we don't receive it, we assume the test
// thread has hung.
//
// Note that it should definitely complete in under a minute, but just
@@ -400,13 +398,32 @@ fn local_tasks_wake_join_all() {
});
}
-#[tokio::test]
-async fn local_tasks_are_polled_after_tick() {
+#[test]
+fn local_tasks_are_polled_after_tick() {
+ // This test depends on timing, so we run it up to five times.
+ for _ in 0..4 {
+ let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
+ if res.is_ok() {
+ // success
+ return;
+ }
+ }
+
+ // Test failed 4 times. Try one more time without catching panics. If it
+ // fails again, the test fails.
+ local_tasks_are_polled_after_tick_inner();
+}
+
+#[tokio::main(flavor = "current_thread")]
+async fn local_tasks_are_polled_after_tick_inner() {
// Reproduces issues #1899 and #1900
static RX1: AtomicUsize = AtomicUsize::new(0);
static RX2: AtomicUsize = AtomicUsize::new(0);
- static EXPECTED: usize = 500;
+ const EXPECTED: usize = 500;
+
+ RX1.store(0, SeqCst);
+ RX2.store(0, SeqCst);
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -416,7 +433,7 @@ async fn local_tasks_are_polled_after_tick() {
.run_until(async {
let task2 = task::spawn(async move {
// Wait a bit
- time::sleep(Duration::from_millis(100)).await;
+ time::sleep(Duration::from_millis(10)).await;
let mut oneshots = Vec::with_capacity(EXPECTED);
@@ -427,13 +444,13 @@ async fn local_tasks_are_polled_after_tick() {
tx.send(oneshot_rx).unwrap();
}
- time::sleep(Duration::from_millis(100)).await;
+ time::sleep(Duration::from_millis(10)).await;
for tx in oneshots.drain(..) {
tx.send(()).unwrap();
}
- time::sleep(Duration::from_millis(300)).await;
+ time::sleep(Duration::from_millis(20)).await;
let rx1 = RX1.load(SeqCst);
let rx2 = RX2.load(SeqCst);
println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
diff --git a/tests/tcp_into_split.rs b/tests/tcp_into_split.rs
index b4bb2ee..2e06643 100644
--- a/tests/tcp_into_split.rs
+++ b/tests/tcp_into_split.rs
@@ -116,7 +116,7 @@ async fn drop_write() -> Result<()> {
// drop it while the read is in progress
std::thread::spawn(move || {
- thread::sleep(std::time::Duration::from_millis(50));
+ thread::sleep(std::time::Duration::from_millis(10));
drop(write_half);
});
diff --git a/tests/time_interval.rs b/tests/time_interval.rs
index a3c7f08..5f7bf55 100644
--- a/tests/time_interval.rs
+++ b/tests/time_interval.rs
@@ -1,56 +1,173 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::time::{self, Duration, Instant};
+use tokio::time::{self, Duration, Instant, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};
-use std::future::Future;
use std::task::Poll;
+// Takes the `Interval` task, `start` variable, and optional time deltas
+// For each time delta, it polls the `Interval` and asserts that the result is
+// equal to `start` + the specific time delta. Then it asserts that the
+// `Interval` is pending.
+macro_rules! check_interval_poll {
+ ($i:ident, $start:ident, $($delta:expr),*$(,)?) => {
+ $(
+ assert_ready_eq!(poll_next(&mut $i), $start + ms($delta));
+ )*
+ assert_pending!(poll_next(&mut $i));
+ };
+ ($i:ident, $start:ident) => {
+ check_interval_poll!($i, $start,);
+ };
+}
+
#[tokio::test]
#[should_panic]
async fn interval_zero_duration() {
let _ = time::interval_at(Instant::now(), ms(0));
}
-#[tokio::test]
-async fn usage() {
- time::pause();
+// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+// Actual ticks: | work -----| delay | work | work | work -| work -----|
+// Poll behavior: | | | | | | | |
+// | | | | | | | |
+// Ready(s) | | Ready(s + 2p) | | | |
+// Pending | Ready(s + 3p) | | |
+// Ready(s + p) Ready(s + 4p) | |
+// Ready(s + 5p) |
+// Ready(s + 6p)
+#[tokio::test(start_paused = true)]
+async fn burst() {
+ let start = Instant::now();
+
+ // This is necessary because the timer is only so granular, and in order for
+ // all our ticks to resolve, the time needs to be 1ms ahead of what we
+ // expect, so that the runtime will see that it is time to resolve the timer
+ time::advance(ms(1)).await;
+
+ let mut i = task::spawn(time::interval_at(start, ms(300)));
+
+ check_interval_poll!(i, start, 0);
+
+ time::advance(ms(100)).await;
+ check_interval_poll!(i, start);
+
+ time::advance(ms(200)).await;
+ check_interval_poll!(i, start, 300);
+
+ time::advance(ms(650)).await;
+ check_interval_poll!(i, start, 600, 900);
+
+ time::advance(ms(200)).await;
+ check_interval_poll!(i, start);
+
+ time::advance(ms(100)).await;
+ check_interval_poll!(i, start, 1200);
+
+ time::advance(ms(250)).await;
+ check_interval_poll!(i, start, 1500);
+
+ time::advance(ms(300)).await;
+ check_interval_poll!(i, start, 1800);
+}
+// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+// Actual ticks: | work -----| delay | work -----| work -----| work -----|
+// Poll behavior: | | | | | | | |
+// | | | | | | | |
+// Ready(s) | | Ready(s + 2p) | | | |
+// Pending | Pending | | |
+// Ready(s + p) Ready(s + 2p + d) | |
+// Ready(s + 3p + d) |
+// Ready(s + 4p + d)
+#[tokio::test(start_paused = true)]
+async fn delay() {
let start = Instant::now();
- // TODO: Skip this
+ // This is necessary because the timer is only so granular, and in order for
+ // all our ticks to resolve, the time needs to be 1ms ahead of what we
+ // expect, so that the runtime will see that it is time to resolve the timer
time::advance(ms(1)).await;
let mut i = task::spawn(time::interval_at(start, ms(300)));
+ i.set_missed_tick_behavior(MissedTickBehavior::Delay);
- assert_ready_eq!(poll_next(&mut i), start);
- assert_pending!(poll_next(&mut i));
+ check_interval_poll!(i, start, 0);
time::advance(ms(100)).await;
- assert_pending!(poll_next(&mut i));
+ check_interval_poll!(i, start);
time::advance(ms(200)).await;
- assert_ready_eq!(poll_next(&mut i), start + ms(300));
- assert_pending!(poll_next(&mut i));
+ check_interval_poll!(i, start, 300);
+
+ time::advance(ms(650)).await;
+ check_interval_poll!(i, start, 600);
+
+ time::advance(ms(100)).await;
+ check_interval_poll!(i, start);
+
+ // We have to add one here for the same reason as is above.
+ // Because `Interval` has reset its timer according to `Instant::now()`,
+ // we have to go forward 1 more millisecond than is expected so that the
+ // runtime realizes that it's time to resolve the timer.
+ time::advance(ms(201)).await;
+ // We add one because when using the `Delay` behavior, `Interval`
+ // adds the `period` from `Instant::now()`, which will always be off by one
+ // because we have to advance time by 1 (see above).
+ check_interval_poll!(i, start, 1251);
+
+ time::advance(ms(300)).await;
+ // Again, we add one.
+ check_interval_poll!(i, start, 1551);
+
+ time::advance(ms(300)).await;
+ check_interval_poll!(i, start, 1851);
+}
+
+// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
+// Actual ticks: | work -----| delay | work ---| work -----| work -----|
+// Poll behavior: | | | | | | |
+// | | | | | | |
+// Ready(s) | | Ready(s + 2p) | | |
+// Pending | Ready(s + 4p) | |
+// Ready(s + p) Ready(s + 5p) |
+// Ready(s + 6p)
+#[tokio::test(start_paused = true)]
+async fn skip() {
+ let start = Instant::now();
+
+ // This is necessary because the timer is only so granular, and in order for
+ // all our ticks to resolve, the time needs to be 1ms ahead of what we
+ // expect, so that the runtime will see that it is time to resolve the timer
+ time::advance(ms(1)).await;
+
+ let mut i = task::spawn(time::interval_at(start, ms(300)));
+ i.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ check_interval_poll!(i, start, 0);
+
+ time::advance(ms(100)).await;
+ check_interval_poll!(i, start);
+
+ time::advance(ms(200)).await;
+ check_interval_poll!(i, start, 300);
+
+ time::advance(ms(650)).await;
+ check_interval_poll!(i, start, 600);
+
+ time::advance(ms(250)).await;
+ check_interval_poll!(i, start, 1200);
- time::advance(ms(400)).await;
- assert_ready_eq!(poll_next(&mut i), start + ms(600));
- assert_pending!(poll_next(&mut i));
+ time::advance(ms(300)).await;
+ check_interval_poll!(i, start, 1500);
- time::advance(ms(500)).await;
- assert_ready_eq!(poll_next(&mut i), start + ms(900));
- assert_ready_eq!(poll_next(&mut i), start + ms(1200));
- assert_pending!(poll_next(&mut i));
+ time::advance(ms(300)).await;
+ check_interval_poll!(i, start, 1800);
}
fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
- interval.enter(|cx, mut interval| {
- tokio::pin! {
- let fut = interval.tick();
- }
- fut.poll(cx)
- })
+ interval.enter(|cx, mut interval| interval.poll_tick(cx))
}
fn ms(n: u64) -> Duration {
diff --git a/tests/time_rt.rs b/tests/time_rt.rs
index 0775343..23367be 100644
--- a/tests/time_rt.rs
+++ b/tests/time_rt.rs
@@ -13,7 +13,7 @@ fn timer_with_threaded_runtime() {
let (tx, rx) = mpsc::channel();
rt.spawn(async move {
- let when = Instant::now() + Duration::from_millis(100);
+ let when = Instant::now() + Duration::from_millis(10);
sleep_until(when).await;
assert!(Instant::now() >= when);
@@ -32,7 +32,7 @@ fn timer_with_basic_scheduler() {
let (tx, rx) = mpsc::channel();
rt.block_on(async move {
- let when = Instant::now() + Duration::from_millis(100);
+ let when = Instant::now() + Duration::from_millis(10);
sleep_until(when).await;
assert!(Instant::now() >= when);
@@ -67,7 +67,7 @@ async fn starving() {
}
}
- let when = Instant::now() + Duration::from_millis(20);
+ let when = Instant::now() + Duration::from_millis(10);
let starve = Starve(Box::pin(sleep_until(when)), 0);
starve.await;
@@ -81,7 +81,7 @@ async fn timeout_value() {
let (_tx, rx) = oneshot::channel::<()>();
let now = Instant::now();
- let dur = Duration::from_millis(20);
+ let dur = Duration::from_millis(10);
let res = timeout(dur, rx).await;
assert!(res.is_err());
diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs
index 9c04d22..20477d2 100644
--- a/tests/time_sleep.rs
+++ b/tests/time_sleep.rs
@@ -24,7 +24,7 @@ async fn immediate_sleep() {
async fn is_elapsed() {
time::pause();
- let sleep = time::sleep(Duration::from_millis(50));
+ let sleep = time::sleep(Duration::from_millis(10));
tokio::pin!(sleep);
@@ -349,7 +349,7 @@ async fn drop_from_wake() {
assert!(
!panicked.load(Ordering::SeqCst),
- "paniced when dropping timers"
+ "panicked when dropping timers"
);
#[derive(Clone)]
diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs
index 10314be..4d28468 100644
--- a/tests/uds_datagram.rs
+++ b/tests/uds_datagram.rs
@@ -87,9 +87,12 @@ async fn try_send_recv_never_block() -> io::Result<()> {
dgram1.writable().await.unwrap();
match dgram1.try_send(payload) {
- Err(err) => match err.kind() {
- io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
- _ => unreachable!("unexpected error {:?}", err),
+ Err(err) => match (err.kind(), err.raw_os_error()) {
+ (io::ErrorKind::WouldBlock, _) => break,
+ (_, Some(libc::ENOBUFS)) => break,
+ _ => {
+ panic!("unexpected error {:?}", err);
+ }
},
Ok(len) => {
assert_eq!(len, payload.len());
@@ -291,9 +294,12 @@ async fn try_recv_buf_never_block() -> io::Result<()> {
dgram1.writable().await.unwrap();
match dgram1.try_send(payload) {
- Err(err) => match err.kind() {
- io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
- _ => unreachable!("unexpected error {:?}", err),
+ Err(err) => match (err.kind(), err.raw_os_error()) {
+ (io::ErrorKind::WouldBlock, _) => break,
+ (_, Some(libc::ENOBUFS)) => break,
+ _ => {
+ panic!("unexpected error {:?}", err);
+ }
},
Ok(len) => {
assert_eq!(len, payload.len());
diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs
index 2754e84..5f1b4cf 100644
--- a/tests/uds_stream.rs
+++ b/tests/uds_stream.rs
@@ -379,3 +379,33 @@ async fn try_read_buf() -> std::io::Result<()> {
Ok(())
}
+
+// https://github.com/tokio-rs/tokio/issues/3879
+#[tokio::test]
+#[cfg(not(target_os = "macos"))]
+async fn epollhup() -> io::Result<()> {
+ let dir = tempfile::Builder::new()
+ .prefix("tokio-uds-tests")
+ .tempdir()
+ .unwrap();
+ let sock_path = dir.path().join("connect.sock");
+
+ let listener = UnixListener::bind(&sock_path)?;
+ let connect = UnixStream::connect(&sock_path);
+ tokio::pin!(connect);
+
+ // Poll `connect` once.
+ poll_fn(|cx| {
+ use std::future::Future;
+
+ assert_pending!(connect.as_mut().poll(cx));
+ Poll::Ready(())
+ })
+ .await;
+
+ drop(listener);
+
+ let err = connect.await.unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::ConnectionReset);
+ Ok(())
+}