From 5fe87985ba723ee4d9532495587d7114e4b6e143 Mon Sep 17 00:00:00 2001 From: Joel Galenson Date: Thu, 30 Sep 2021 08:55:02 -0700 Subject: Upgrade rust/crates/tokio to 1.12.0 Test: make Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699 --- tests/fs_file.rs | 20 ++- tests/io_async_fd.rs | 18 +- tests/io_fill_buf.rs | 34 ++++ tests/io_poll_aio.rs | 375 ++++++++++++++++++++++++++++++++++++++++++ tests/process_kill_on_drop.rs | 12 +- tests/sync_mpsc.rs | 119 +++++++++++++- tests/sync_watch.rs | 15 ++ 7 files changed, 568 insertions(+), 25 deletions(-) create mode 100644 tests/io_fill_buf.rs create mode 100644 tests/io_poll_aio.rs (limited to 'tests') diff --git a/tests/fs_file.rs b/tests/fs_file.rs index bf2f1d7..f645e61 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -1,12 +1,11 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio_test::task; - use std::io::prelude::*; use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio_test::task; const HELLO: &[u8] = b"hello world..."; @@ -50,6 +49,19 @@ async fn basic_write_and_shutdown() { assert_eq!(file, HELLO); } +#[tokio::test] +async fn rewind_seek_position() { + let tempfile = tempfile(); + + let mut file = File::create(tempfile.path()).await.unwrap(); + + file.seek(SeekFrom::Current(10)).await.unwrap(); + + file.rewind().await.unwrap(); + + assert_eq!(file.stream_position().await.unwrap(), 0); +} + #[tokio::test] async fn coop() { let mut tempfile = tempfile(); diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs index dc21e42..5a6875e 100644 --- a/tests/io_async_fd.rs +++ b/tests/io_async_fd.rs @@ -15,7 +15,7 @@ use std::{ use nix::unistd::{close, read, write}; -use futures::{poll, FutureExt}; +use futures::poll; use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; use tokio_test::{assert_err, assert_pending}; @@ -163,10 +163,11 @@ async fn initially_writable() { afd_a.writable().await.unwrap().clear_ready(); afd_b.writable().await.unwrap().clear_ready(); - futures::select_biased! { - _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {}, - _ = afd_a.readable().fuse() => panic!("Unexpected readable state"), - _ = afd_b.readable().fuse() => panic!("Unexpected readable state"), + tokio::select! { + biased; + _ = tokio::time::sleep(Duration::from_millis(10)) => {}, + _ = afd_a.readable() => panic!("Unexpected readable state"), + _ = afd_b.readable() => panic!("Unexpected readable state"), } } @@ -353,12 +354,13 @@ async fn multiple_waiters() { futures::future::pending::<()>().await; }; - futures::select_biased! { - guard = afd_a.readable().fuse() => { + tokio::select! { + biased; + guard = afd_a.readable() => { tokio::task::yield_now().await; guard.unwrap().clear_ready() }, - _ = notify_barrier.fuse() => unreachable!(), + _ = notify_barrier => unreachable!(), } std::mem::drop(afd_a); diff --git a/tests/io_fill_buf.rs b/tests/io_fill_buf.rs new file mode 100644 index 0000000..0b2ebd7 --- /dev/null +++ b/tests/io_fill_buf.rs @@ -0,0 +1,34 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_test::assert_ok; + +#[tokio::test] +async fn fill_buf_file() { + let file = NamedTempFile::new().unwrap(); + + assert_ok!(std::fs::write(file.path(), b"hello")); + + let file = assert_ok!(File::open(file.path()).await); + let mut file = BufReader::new(file); + + let mut contents = Vec::new(); + + loop { + let consumed = { + let buffer = assert_ok!(file.fill_buf().await); + if buffer.is_empty() { + break; + } + contents.extend_from_slice(buffer); + buffer.len() + }; + + file.consume(consumed); + } + + assert_eq!(contents, b"hello"); +} diff --git a/tests/io_poll_aio.rs b/tests/io_poll_aio.rs new file mode 100644 index 0000000..f044af5 --- /dev/null +++ b/tests/io_poll_aio.rs @@ -0,0 +1,375 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(target_os = "freebsd", feature = "net"))] + +use mio_aio::{AioCb, AioFsyncMode, LioCb}; +use std::{ + future::Future, + mem, + os::unix::io::{AsRawFd, RawFd}, + pin::Pin, + task::{Context, Poll}, +}; +use tempfile::tempfile; +use tokio::io::bsd::{Aio, AioSource}; +use tokio_test::assert_pending; + +mod aio { + use super::*; + + /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource + struct WrappedAioCb<'a>(AioCb<'a>); + impl<'a> AioSource for WrappedAioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude implementation of an AIO-based future + struct FsyncFut(Aio>); + + impl Future for FsyncFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let result = (*self.0).0.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + /// Low-level AIO Source + /// + /// An example bypassing mio_aio and Nix to demonstrate how the kevent + /// registration actually works, under the hood. + struct LlSource(Pin>); + + impl AioSource for LlSource { + fn register(&mut self, kq: RawFd, token: usize) { + let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + sev.sigev_notify = libc::SIGEV_KEVENT; + sev.sigev_signo = kq; + sev.sigev_value = libc::sigval { + sival_ptr: token as *mut libc::c_void, + }; + self.0.aio_sigevent = sev; + } + + fn deregister(&mut self) { + unsafe { + self.0.aio_sigevent = mem::zeroed(); + } + } + } + + struct LlFut(Aio); + + impl Future for LlFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; + assert_eq!(0, r); + Poll::Ready(Ok(())) + } + } + } + } + + /// A very simple object that can implement AioSource and can be reused. + /// + /// mio_aio normally assumes that each AioCb will be consumed on completion. + /// This somewhat contrived example shows how an Aio object can be reused + /// anyway. + struct ReusableFsyncSource { + aiocb: Pin>>, + fd: RawFd, + token: usize, + } + impl ReusableFsyncSource { + fn fsync(&mut self) { + self.aiocb.register_raw(self.fd, self.token); + self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); + } + fn new(aiocb: AioCb<'static>) -> Self { + ReusableFsyncSource { + aiocb: Box::pin(aiocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, aiocb: AioCb<'static>) { + self.aiocb = Box::pin(aiocb); + } + } + impl AioSource for ReusableFsyncSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + + struct ReusableFsyncFut<'a>(&'a mut Aio); + impl<'a> Future for ReusableFsyncFut<'a> { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let result = (*self.0).aiocb.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + #[tokio::test] + async fn fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb = AioCb::from_fd(fd, 0); + let source = WrappedAioCb(aiocb); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); + let fut = FsyncFut(poll_aio); + fut.await.unwrap(); + } + + #[tokio::test] + async fn ll_fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + aiocb.aio_fildes = fd; + let source = LlSource(Box::pin(aiocb)); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + let r = unsafe { + let p = (*poll_aio).0.as_mut().get_unchecked_mut(); + libc::aio_fsync(libc::O_SYNC, p) + }; + assert_eq!(0, r); + let fut = LlFut(poll_aio); + fut.await.unwrap(); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb0 = AioCb::from_fd(fd, 0); + let source = ReusableFsyncSource::new(aiocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.fsync(); + let fut0 = ReusableFsyncFut(&mut poll_aio); + fut0.await.unwrap(); + + let aiocb1 = AioCb::from_fd(fd, 0); + poll_aio.reset(aiocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.fsync(); + let fut1 = ReusableFsyncFut(&mut poll_aio); + fut1.await.unwrap(); + } +} + +mod lio { + use super::*; + + struct WrappedLioCb<'a>(LioCb<'a>); + impl<'a> AioSource for WrappedLioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude lio_listio-based Future + struct LioFut(Option>>); + + impl Future for LioFut { + type Output = std::io::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.as_mut().unwrap().poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// Minimal example demonstrating reuse of an Aio object with lio + /// readiness. mio_aio::LioCb actually does something similar under the + /// hood. + struct ReusableLioSource { + liocb: Option>, + fd: RawFd, + token: usize, + } + impl ReusableLioSource { + fn new(liocb: LioCb<'static>) -> Self { + ReusableLioSource { + liocb: Some(liocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, liocb: LioCb<'static>) { + self.liocb = Some(liocb); + } + fn submit(&mut self) { + self.liocb + .as_mut() + .unwrap() + .register_raw(self.fd, self.token); + self.liocb.as_mut().unwrap().submit().unwrap(); + } + } + impl AioSource for ReusableLioSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + struct ReusableLioFut<'a>(&'a mut Aio); + impl<'a> Future for ReusableLioFut<'a> { + type Output = std::io::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let r = (*self.0).liocb.take().unwrap().into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// An lio_listio operation with one write element + #[tokio::test] + async fn onewrite() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder = mio_aio::LioCbBuilder::with_capacity(1); + builder = builder.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb = builder.finish(); + let source = WrappedLioCb(liocb); + let mut poll_aio = Aio::new_for_lio(source).unwrap(); + + // Send the operation to the kernel + (*poll_aio).0.submit().unwrap(); + let fut = LioFut(Some(poll_aio)); + let v = fut.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); + builder0 = builder0.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb0 = builder0.finish(); + let source = ReusableLioSource::new(liocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.submit(); + let fut0 = ReusableLioFut(&mut poll_aio); + let v = fut0.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + + // Now reuse the same Aio + let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); + builder1 = builder1.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb1 = builder1.finish(); + poll_aio.reset(liocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.submit(); + let fut1 = ReusableLioFut(&mut poll_aio); + let v = fut1.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } +} diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs index 00f5c6d..658e4ad 100644 --- a/tests/process_kill_on_drop.rs +++ b/tests/process_kill_on_drop.rs @@ -1,6 +1,7 @@ #![cfg(all(unix, feature = "process"))] #![warn(rust_2018_idioms)] +use std::io::ErrorKind; use std::process::Stdio; use std::time::Duration; use tokio::io::AsyncReadExt; @@ -24,11 +25,12 @@ async fn kill_on_drop() { ", ]); - let mut child = cmd - .kill_on_drop(true) - .stdout(Stdio::piped()) - .spawn() - .unwrap(); + let e = cmd.kill_on_drop(true).stdout(Stdio::piped()).spawn(); + if e.is_err() && e.as_ref().unwrap_err().kind() == ErrorKind::NotFound { + println!("bash not available; skipping test"); + return; + } + let mut child = e.unwrap(); sleep(Duration::from_secs(2)).await; diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index cd43ad4..1947d26 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -5,7 +5,7 @@ use std::thread; use tokio::runtime::Runtime; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -327,6 +327,27 @@ async fn try_send_fail() { assert!(rx.recv().await.is_none()); } +#[tokio::test] +async fn try_send_fail_with_try_recv() { + let (tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + assert_eq!(rx.try_recv(), Ok("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + assert_eq!(rx.try_recv(), Ok("goodbye")); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + #[tokio::test] async fn try_reserve_fails() { let (tx, mut rx) = mpsc::channel(1); @@ -389,13 +410,15 @@ fn dropping_rx_closes_channel_for_try() { drop(rx); - { - let err = assert_err!(tx.try_send(msg.clone())); - match err { - TrySendError::Closed(..) => {} - _ => panic!(), - } - } + assert!(matches!( + tx.try_send(msg.clone()), + Err(TrySendError::Closed(_)) + )); + assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_)))); + assert!(matches!( + tx.try_reserve_owned(), + Err(TrySendError::Closed(_)) + )); assert_eq!(1, Arc::strong_count(&msg)); } @@ -494,3 +517,83 @@ async fn permit_available_not_acquired_close() { drop(permit2); assert!(rx.recv().await.is_none()); } + +#[test] +fn try_recv_bounded() { + let (tx, mut rx) = mpsc::channel(5); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert_eq!(Ok("hello"), rx.try_recv()); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + drop(tx); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_unbounded() { + for num in 0..100 { + let (tx, mut rx) = mpsc::unbounded_channel(); + + for i in 0..num { + tx.send(i).unwrap(); + } + + for i in 0..num { + assert_eq!(rx.try_recv(), Ok(i)); + } + + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } +} + +#[test] +fn try_recv_close_while_empty_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(5); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_close_while_empty_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs index a2a276d..b7bbaf7 100644 --- a/tests/sync_watch.rs +++ b/tests/sync_watch.rs @@ -186,3 +186,18 @@ fn borrow_and_update() { assert_eq!(*rx.borrow_and_update(), "three"); assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); } + +#[test] +fn reopened_after_subscribe() { + let (tx, rx) = watch::channel("one"); + assert!(!tx.is_closed()); + + drop(rx); + assert!(tx.is_closed()); + + let rx = tx.subscribe(); + assert!(!tx.is_closed()); + + drop(rx); + assert!(tx.is_closed()); +} -- cgit v1.2.3