aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-09-30 08:55:02 -0700
committerJoel Galenson <jgalenson@google.com>2021-09-30 08:55:40 -0700
commit5fe87985ba723ee4d9532495587d7114e4b6e143 (patch)
tree71a18fec0599d209bd7c1b95140dc75566fa3788 /tests
parentd61267ffdfea9ed9be38e805f8e3ff78e384005f (diff)
downloadtokio-5fe87985ba723ee4d9532495587d7114e4b6e143.tar.gz
Upgrade rust/crates/tokio to 1.12.0
Test: make Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699
Diffstat (limited to 'tests')
-rw-r--r--tests/fs_file.rs20
-rw-r--r--tests/io_async_fd.rs18
-rw-r--r--tests/io_fill_buf.rs34
-rw-r--r--tests/io_poll_aio.rs375
-rw-r--r--tests/process_kill_on_drop.rs12
-rw-r--r--tests/sync_mpsc.rs119
-rw-r--r--tests/sync_watch.rs15
7 files changed, 568 insertions, 25 deletions
diff --git a/tests/fs_file.rs b/tests/fs_file.rs
index bf2f1d7..f645e61 100644
--- a/tests/fs_file.rs
+++ b/tests/fs_file.rs
@@ -1,12 +1,11 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::fs::File;
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio_test::task;
-
use std::io::prelude::*;
use tempfile::NamedTempFile;
+use tokio::fs::File;
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
+use tokio_test::task;
const HELLO: &[u8] = b"hello world...";
@@ -51,6 +50,19 @@ async fn basic_write_and_shutdown() {
}
#[tokio::test]
+async fn rewind_seek_position() {
+ let tempfile = tempfile();
+
+ let mut file = File::create(tempfile.path()).await.unwrap();
+
+ file.seek(SeekFrom::Current(10)).await.unwrap();
+
+ file.rewind().await.unwrap();
+
+ assert_eq!(file.stream_position().await.unwrap(), 0);
+}
+
+#[tokio::test]
async fn coop() {
let mut tempfile = tempfile();
tempfile.write_all(HELLO).unwrap();
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
index dc21e42..5a6875e 100644
--- a/tests/io_async_fd.rs
+++ b/tests/io_async_fd.rs
@@ -15,7 +15,7 @@ use std::{
use nix::unistd::{close, read, write};
-use futures::{poll, FutureExt};
+use futures::poll;
use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
use tokio_test::{assert_err, assert_pending};
@@ -163,10 +163,11 @@ async fn initially_writable() {
afd_a.writable().await.unwrap().clear_ready();
afd_b.writable().await.unwrap().clear_ready();
- futures::select_biased! {
- _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
- _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
- _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
+ tokio::select! {
+ biased;
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {},
+ _ = afd_a.readable() => panic!("Unexpected readable state"),
+ _ = afd_b.readable() => panic!("Unexpected readable state"),
}
}
@@ -353,12 +354,13 @@ async fn multiple_waiters() {
futures::future::pending::<()>().await;
};
- futures::select_biased! {
- guard = afd_a.readable().fuse() => {
+ tokio::select! {
+ biased;
+ guard = afd_a.readable() => {
tokio::task::yield_now().await;
guard.unwrap().clear_ready()
},
- _ = notify_barrier.fuse() => unreachable!(),
+ _ = notify_barrier => unreachable!(),
}
std::mem::drop(afd_a);
diff --git a/tests/io_fill_buf.rs b/tests/io_fill_buf.rs
new file mode 100644
index 0000000..0b2ebd7
--- /dev/null
+++ b/tests/io_fill_buf.rs
@@ -0,0 +1,34 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tempfile::NamedTempFile;
+use tokio::fs::File;
+use tokio::io::{AsyncBufReadExt, BufReader};
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn fill_buf_file() {
+ let file = NamedTempFile::new().unwrap();
+
+ assert_ok!(std::fs::write(file.path(), b"hello"));
+
+ let file = assert_ok!(File::open(file.path()).await);
+ let mut file = BufReader::new(file);
+
+ let mut contents = Vec::new();
+
+ loop {
+ let consumed = {
+ let buffer = assert_ok!(file.fill_buf().await);
+ if buffer.is_empty() {
+ break;
+ }
+ contents.extend_from_slice(buffer);
+ buffer.len()
+ };
+
+ file.consume(consumed);
+ }
+
+ assert_eq!(contents, b"hello");
+}
diff --git a/tests/io_poll_aio.rs b/tests/io_poll_aio.rs
new file mode 100644
index 0000000..f044af5
--- /dev/null
+++ b/tests/io_poll_aio.rs
@@ -0,0 +1,375 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(target_os = "freebsd", feature = "net"))]
+
+use mio_aio::{AioCb, AioFsyncMode, LioCb};
+use std::{
+ future::Future,
+ mem,
+ os::unix::io::{AsRawFd, RawFd},
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tempfile::tempfile;
+use tokio::io::bsd::{Aio, AioSource};
+use tokio_test::assert_pending;
+
+mod aio {
+ use super::*;
+
+ /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
+ struct WrappedAioCb<'a>(AioCb<'a>);
+ impl<'a> AioSource for WrappedAioCb<'a> {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.0.register_raw(kq, token)
+ }
+ fn deregister(&mut self) {
+ self.0.deregister_raw()
+ }
+ }
+
+ /// A very crude implementation of an AIO-based future
+ struct FsyncFut(Aio<WrappedAioCb<'static>>);
+
+ impl Future for FsyncFut {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ // At this point, we could clear readiness. But there's no
+ // point, since we're about to drop the Aio.
+ let result = (*self.0).0.aio_return();
+ match result {
+ Ok(_) => Poll::Ready(Ok(())),
+ Err(e) => Poll::Ready(Err(e.into())),
+ }
+ }
+ }
+ }
+ }
+
+ /// Low-level AIO Source
+ ///
+ /// An example bypassing mio_aio and Nix to demonstrate how the kevent
+ /// registration actually works, under the hood.
+ struct LlSource(Pin<Box<libc::aiocb>>);
+
+ impl AioSource for LlSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
+ sev.sigev_notify = libc::SIGEV_KEVENT;
+ sev.sigev_signo = kq;
+ sev.sigev_value = libc::sigval {
+ sival_ptr: token as *mut libc::c_void,
+ };
+ self.0.aio_sigevent = sev;
+ }
+
+ fn deregister(&mut self) {
+ unsafe {
+ self.0.aio_sigevent = mem::zeroed();
+ }
+ }
+ }
+
+ struct LlFut(Aio<LlSource>);
+
+ impl Future for LlFut {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
+ assert_eq!(0, r);
+ Poll::Ready(Ok(()))
+ }
+ }
+ }
+ }
+
+ /// A very simple object that can implement AioSource and can be reused.
+ ///
+ /// mio_aio normally assumes that each AioCb will be consumed on completion.
+ /// This somewhat contrived example shows how an Aio object can be reused
+ /// anyway.
+ struct ReusableFsyncSource {
+ aiocb: Pin<Box<AioCb<'static>>>,
+ fd: RawFd,
+ token: usize,
+ }
+ impl ReusableFsyncSource {
+ fn fsync(&mut self) {
+ self.aiocb.register_raw(self.fd, self.token);
+ self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
+ }
+ fn new(aiocb: AioCb<'static>) -> Self {
+ ReusableFsyncSource {
+ aiocb: Box::pin(aiocb),
+ fd: 0,
+ token: 0,
+ }
+ }
+ fn reset(&mut self, aiocb: AioCb<'static>) {
+ self.aiocb = Box::pin(aiocb);
+ }
+ }
+ impl AioSource for ReusableFsyncSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.fd = kq;
+ self.token = token;
+ }
+ fn deregister(&mut self) {
+ self.fd = 0;
+ }
+ }
+
+ struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
+ impl<'a> Future for ReusableFsyncFut<'a> {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(ev)) => {
+ // Since this future uses a reusable Aio, we must clear
+ // its readiness here. That makes the future
+ // non-idempotent; the caller can't poll it repeatedly after
+ // it has already returned Ready. But that's ok; most
+ // futures behave this way.
+ self.0.clear_ready(ev);
+ let result = (*self.0).aiocb.aio_return();
+ match result {
+ Ok(_) => Poll::Ready(Ok(())),
+ Err(e) => Poll::Ready(Err(e.into())),
+ }
+ }
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn fsync() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let aiocb = AioCb::from_fd(fd, 0);
+ let source = WrappedAioCb(aiocb);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
+ let fut = FsyncFut(poll_aio);
+ fut.await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn ll_fsync() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
+ aiocb.aio_fildes = fd;
+ let source = LlSource(Box::pin(aiocb));
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ let r = unsafe {
+ let p = (*poll_aio).0.as_mut().get_unchecked_mut();
+ libc::aio_fsync(libc::O_SYNC, p)
+ };
+ assert_eq!(0, r);
+ let fut = LlFut(poll_aio);
+ fut.await.unwrap();
+ }
+
+ /// A suitably crafted future type can reuse an Aio object
+ #[tokio::test]
+ async fn reuse() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let aiocb0 = AioCb::from_fd(fd, 0);
+ let source = ReusableFsyncSource::new(aiocb0);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ poll_aio.fsync();
+ let fut0 = ReusableFsyncFut(&mut poll_aio);
+ fut0.await.unwrap();
+
+ let aiocb1 = AioCb::from_fd(fd, 0);
+ poll_aio.reset(aiocb1);
+ let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
+ assert_pending!(poll_aio.poll_ready(&mut ctx));
+ poll_aio.fsync();
+ let fut1 = ReusableFsyncFut(&mut poll_aio);
+ fut1.await.unwrap();
+ }
+}
+
+mod lio {
+ use super::*;
+
+ struct WrappedLioCb<'a>(LioCb<'a>);
+ impl<'a> AioSource for WrappedLioCb<'a> {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.0.register_raw(kq, token)
+ }
+ fn deregister(&mut self) {
+ self.0.deregister_raw()
+ }
+ }
+
+ /// A very crude lio_listio-based Future
+ struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
+
+ impl Future for LioFut {
+ type Output = std::io::Result<Vec<isize>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ // At this point, we could clear readiness. But there's no
+ // point, since we're about to drop the Aio.
+ let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
+ iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
+ });
+ Poll::Ready(Ok(r))
+ }
+ }
+ }
+ }
+
+ /// Minimal example demonstrating reuse of an Aio object with lio
+ /// readiness. mio_aio::LioCb actually does something similar under the
+ /// hood.
+ struct ReusableLioSource {
+ liocb: Option<LioCb<'static>>,
+ fd: RawFd,
+ token: usize,
+ }
+ impl ReusableLioSource {
+ fn new(liocb: LioCb<'static>) -> Self {
+ ReusableLioSource {
+ liocb: Some(liocb),
+ fd: 0,
+ token: 0,
+ }
+ }
+ fn reset(&mut self, liocb: LioCb<'static>) {
+ self.liocb = Some(liocb);
+ }
+ fn submit(&mut self) {
+ self.liocb
+ .as_mut()
+ .unwrap()
+ .register_raw(self.fd, self.token);
+ self.liocb.as_mut().unwrap().submit().unwrap();
+ }
+ }
+ impl AioSource for ReusableLioSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.fd = kq;
+ self.token = token;
+ }
+ fn deregister(&mut self) {
+ self.fd = 0;
+ }
+ }
+ struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
+ impl<'a> Future for ReusableLioFut<'a> {
+ type Output = std::io::Result<Vec<isize>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(ev)) => {
+ // Since this future uses a reusable Aio, we must clear
+ // its readiness here. That makes the future
+ // non-idempotent; the caller can't poll it repeatedly after
+ // it has already returned Ready. But that's ok; most
+ // futures behave this way.
+ self.0.clear_ready(ev);
+ let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
+ iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
+ });
+ Poll::Ready(Ok(r))
+ }
+ }
+ }
+ }
+
+ /// An lio_listio operation with one write element
+ #[tokio::test]
+ async fn onewrite() {
+ const WBUF: &[u8] = b"abcdef";
+ let f = tempfile().unwrap();
+
+ let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
+ builder = builder.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb = builder.finish();
+ let source = WrappedLioCb(liocb);
+ let mut poll_aio = Aio::new_for_lio(source).unwrap();
+
+ // Send the operation to the kernel
+ (*poll_aio).0.submit().unwrap();
+ let fut = LioFut(Some(poll_aio));
+ let v = fut.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+ }
+
+ /// A suitably crafted future type can reuse an Aio object
+ #[tokio::test]
+ async fn reuse() {
+ const WBUF: &[u8] = b"abcdef";
+ let f = tempfile().unwrap();
+
+ let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
+ builder0 = builder0.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb0 = builder0.finish();
+ let source = ReusableLioSource::new(liocb0);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ poll_aio.submit();
+ let fut0 = ReusableLioFut(&mut poll_aio);
+ let v = fut0.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+
+ // Now reuse the same Aio
+ let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
+ builder1 = builder1.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb1 = builder1.finish();
+ poll_aio.reset(liocb1);
+ let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
+ assert_pending!(poll_aio.poll_ready(&mut ctx));
+ poll_aio.submit();
+ let fut1 = ReusableLioFut(&mut poll_aio);
+ let v = fut1.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+ }
+}
diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs
index 00f5c6d..658e4ad 100644
--- a/tests/process_kill_on_drop.rs
+++ b/tests/process_kill_on_drop.rs
@@ -1,6 +1,7 @@
#![cfg(all(unix, feature = "process"))]
#![warn(rust_2018_idioms)]
+use std::io::ErrorKind;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncReadExt;
@@ -24,11 +25,12 @@ async fn kill_on_drop() {
",
]);
- let mut child = cmd
- .kill_on_drop(true)
- .stdout(Stdio::piped())
- .spawn()
- .unwrap();
+ let e = cmd.kill_on_drop(true).stdout(Stdio::piped()).spawn();
+ if e.is_err() && e.as_ref().unwrap_err().kind() == ErrorKind::NotFound {
+ println!("bash not available; skipping test");
+ return;
+ }
+ let mut child = e.unwrap();
sleep(Duration::from_secs(2)).await;
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs
index cd43ad4..1947d26 100644
--- a/tests/sync_mpsc.rs
+++ b/tests/sync_mpsc.rs
@@ -5,7 +5,7 @@
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -328,6 +328,27 @@ async fn try_send_fail() {
}
#[tokio::test]
+async fn try_send_fail_with_try_recv() {
+ let (tx, mut rx) = mpsc::channel(1);
+
+ tx.try_send("hello").unwrap();
+
+ // This should fail
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
+
+ assert_eq!(rx.try_recv(), Ok("hello"));
+
+ assert_ok!(tx.try_send("goodbye"));
+ drop(tx);
+
+ assert_eq!(rx.try_recv(), Ok("goodbye"));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+}
+
+#[tokio::test]
async fn try_reserve_fails() {
let (tx, mut rx) = mpsc::channel(1);
@@ -389,13 +410,15 @@ fn dropping_rx_closes_channel_for_try() {
drop(rx);
- {
- let err = assert_err!(tx.try_send(msg.clone()));
- match err {
- TrySendError::Closed(..) => {}
- _ => panic!(),
- }
- }
+ assert!(matches!(
+ tx.try_send(msg.clone()),
+ Err(TrySendError::Closed(_))
+ ));
+ assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
+ assert!(matches!(
+ tx.try_reserve_owned(),
+ Err(TrySendError::Closed(_))
+ ));
assert_eq!(1, Arc::strong_count(&msg));
}
@@ -494,3 +517,83 @@ async fn permit_available_not_acquired_close() {
drop(permit2);
assert!(rx.recv().await.is_none());
}
+
+#[test]
+fn try_recv_bounded() {
+ let (tx, mut rx) = mpsc::channel(5);
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert!(tx.try_send("hello").is_err());
+
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert_eq!(Ok("hello"), rx.try_recv());
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert!(tx.try_send("hello").is_err());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ drop(tx);
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
+
+#[test]
+fn try_recv_unbounded() {
+ for num in 0..100 {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ for i in 0..num {
+ tx.send(i).unwrap();
+ }
+
+ for i in 0..num {
+ assert_eq!(rx.try_recv(), Ok(i));
+ }
+
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ drop(tx);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ }
+}
+
+#[test]
+fn try_recv_close_while_empty_bounded() {
+ let (tx, mut rx) = mpsc::channel::<()>(5);
+
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ drop(tx);
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
+
+#[test]
+fn try_recv_close_while_empty_unbounded() {
+ let (tx, mut rx) = mpsc::unbounded_channel::<()>();
+
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ drop(tx);
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs
index a2a276d..b7bbaf7 100644
--- a/tests/sync_watch.rs
+++ b/tests/sync_watch.rs
@@ -186,3 +186,18 @@ fn borrow_and_update() {
assert_eq!(*rx.borrow_and_update(), "three");
assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
}
+
+#[test]
+fn reopened_after_subscribe() {
+ let (tx, rx) = watch::channel("one");
+ assert!(!tx.is_closed());
+
+ drop(rx);
+ assert!(tx.is_closed());
+
+ let rx = tx.subscribe();
+ assert!(!tx.is_closed());
+
+ drop(rx);
+ assert!(tx.is_closed());
+}