diff options
author | Joel Galenson <jgalenson@google.com> | 2021-10-08 19:34:14 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-10-08 19:34:14 +0000 |
commit | 7c69dbe097f4ba0a1a0aa9e67ed50294b68fa3dc (patch) | |
tree | 71a18fec0599d209bd7c1b95140dc75566fa3788 | |
parent | 5995a2b9e75ea42269618222220e58a0d39de6e3 (diff) | |
parent | e16ac718df3b8af3bef9bc0c1b6c9bfb5f8e71e1 (diff) | |
download | tokio-7c69dbe097f4ba0a1a0aa9e67ed50294b68fa3dc.tar.gz |
Merge "Upgrade rust/crates/tokio to 1.12.0" am: e16ac718df
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1833327
Change-Id: If3efbcadf0ba1879fc7b299ef291c17dd49fcf61
61 files changed, 2111 insertions, 254 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 755a957..0577e49 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "dd060b16f54ce196b3891042030dedd3abc017c4" + "sha1": "1ed89aa5cf7e5b9524b9e08a02030d222fd63417" } } @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio", cargo_env_compat: true, - cargo_pkg_version: "1.10.1", + cargo_pkg_version: "1.12.0", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b41957..16e44e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,80 @@ +# 1.12.0 (September 21, 2021) + +### Fixed + +- mpsc: ensure `try_reserve` error is consistent with `try_send` ([#4119]) +- mpsc: use `spin_loop_hint` instead of `yield_now` ([#4115]) +- sync: make `SendError` field public ([#4097]) + +### Added + +- io: add POSIX AIO on FreeBSD ([#4054]) +- io: add convenience method `AsyncSeekExt::rewind` ([#4107]) +- runtime: add tracing span for `block_on` futures ([#4094]) +- runtime: callback when a worker parks and unparks ([#4070]) +- sync: implement `try_recv` for mpsc channels ([#4113]) + +### Changed + +- macros: run runtime inside `LocalSet` when using macro ([#4027]) + +### Documented + +- docs: clarify CPU-bound tasks on Tokio ([#4105]) +- mpsc: document spurious failures on `poll_recv` ([#4117]) +- mpsc: document that `PollSender` impls `Sink` ([#4110]) +- task: document non-guarantees of `yield_now` ([#4091]) +- time: document paused time details better ([#4061], [#4103]) + +[#4027]: https://github.com/tokio-rs/tokio/pull/4027 +[#4054]: https://github.com/tokio-rs/tokio/pull/4054 +[#4061]: https://github.com/tokio-rs/tokio/pull/4061 +[#4070]: https://github.com/tokio-rs/tokio/pull/4070 +[#4091]: https://github.com/tokio-rs/tokio/pull/4091 +[#4094]: https://github.com/tokio-rs/tokio/pull/4094 +[#4097]: https://github.com/tokio-rs/tokio/pull/4097 +[#4103]: https://github.com/tokio-rs/tokio/pull/4103 +[#4105]: https://github.com/tokio-rs/tokio/pull/4105 +[#4107]: https://github.com/tokio-rs/tokio/pull/4107 +[#4110]: https://github.com/tokio-rs/tokio/pull/4110 +[#4113]: https://github.com/tokio-rs/tokio/pull/4113 +[#4115]: https://github.com/tokio-rs/tokio/pull/4115 +[#4117]: https://github.com/tokio-rs/tokio/pull/4117 +[#4119]: https://github.com/tokio-rs/tokio/pull/4119 + +# 1.11.0 (August 31, 2021) + +### Fixed + + - time: don't panic when Instant is not monotonic ([#4044]) + - io: fix panic in `fill_buf` by not calling `poll_fill_buf` twice ([#4084]) + +### Added + + - watch: add `watch::Sender::subscribe` ([#3800]) + - process: add `from_std` to `ChildStd*` ([#4045]) + - stats: initial work on runtime stats ([#4043]) + +### Changed + + - tracing: change span naming to new console convention ([#4042]) + - io: speed-up waking by using uninitialized array ([#4055], [#4071], [#4075]) + +### Documented + + - time: make Sleep examples easier to find ([#4040]) + +[#3800]: https://github.com/tokio-rs/tokio/pull/3800 +[#4040]: https://github.com/tokio-rs/tokio/pull/4040 +[#4042]: https://github.com/tokio-rs/tokio/pull/4042 +[#4043]: https://github.com/tokio-rs/tokio/pull/4043 +[#4044]: https://github.com/tokio-rs/tokio/pull/4044 +[#4045]: https://github.com/tokio-rs/tokio/pull/4045 +[#4055]: https://github.com/tokio-rs/tokio/pull/4055 +[#4071]: https://github.com/tokio-rs/tokio/pull/4071 +[#4075]: https://github.com/tokio-rs/tokio/pull/4075 +[#4084]: https://github.com/tokio-rs/tokio/pull/4084 + # 1.10.1 (August 24, 2021) ### Fixed @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio" -version = "1.10.1" +version = "1.12.0" authors = ["Tokio Contributors <team@tokio.rs>"] description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio/1.10.0/tokio/" +documentation = "https://docs.rs/tokio/1.12.0/tokio/" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] categories = ["asynchronous", "network-programming"] @@ -101,12 +101,16 @@ process = ["bytes", "once_cell", "libc", "mio/os-poll", "mio/os-util", "mio/uds" rt = [] rt-multi-thread = ["num_cpus", "rt"] signal = ["once_cell", "libc", "mio/os-poll", "mio/uds", "mio/os-util", "signal-hook-registry", "winapi/consoleapi"] +stats = [] sync = [] test-util = ["rt", "sync", "time"] time = [] [target."cfg(loom)".dev-dependencies.loom] version = "0.5" features = ["futures", "checkpoint"] +[target."cfg(target_os = \"freebsd\")".dev-dependencies.mio-aio] +version = "0.6.0" +features = ["tokio"] [target."cfg(tokio_unstable)".dependencies.tracing] version = "0.1.21" features = ["std"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 90455fb..d2e4696 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.10.1" +version = "1.12.0" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.10.0/tokio/" +documentation = "https://docs.rs/tokio/1.12.0/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] macros = ["tokio-macros"] +stats = [] net = [ "libc", "mio/os-poll", @@ -130,6 +131,9 @@ tempfile = "3.1.0" async-stream = "0.3" socket2 = "0.4" +[target.'cfg(target_os = "freebsd")'.dev-dependencies] +mio-aio = { version = "0.6.0", features = ["tokio"] } + [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5", features = ["futures", "checkpoint"] } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.10.1.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.12.0.crate" } - version: "1.10.1" + version: "1.12.0" license_type: NOTICE last_upgrade_date { year: 2021 - month: 8 - day: 25 + month: 9 + day: 30 } } @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.10.1", features = ["full"] } +tokio = { version = "1.12.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/patches/test_fix.patch b/patches/test_fix.patch new file mode 100644 index 0000000..efc8c27 --- /dev/null +++ b/patches/test_fix.patch @@ -0,0 +1,21 @@ +diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs +index a70f49b81a..f8a35d0ede 100644 +--- a/tests/task_local_set.rs ++++ b/tests/task_local_set.rs +@@ -16,16 +16,6 @@ use std::sync::atomic::Ordering::{self, SeqCst}; + use std::sync::atomic::{AtomicBool, AtomicUsize}; + use std::time::Duration; + +-#[tokio::test(flavor = "current_thread")] +-async fn localset_implicit_current_thread() { +- task::spawn_local(async {}).await.unwrap(); +-} +- +-#[tokio::test(flavor = "multi_thread")] +-async fn localset_implicit_multi_thread() { +- task::spawn_local(async {}).await.unwrap(); +-} +- + #[tokio::test(flavor = "current_thread")] + async fn local_basic_scheduler() { + LocalSet::new()
\ No newline at end of file diff --git a/src/doc/mod.rs b/src/doc/mod.rs index 12c2247..3a94934 100644 --- a/src/doc/mod.rs +++ b/src/doc/mod.rs @@ -17,6 +17,7 @@ /// will ever accidentally use it. /// /// [`never` type]: https://doc.rust-lang.org/std/primitive.never.html +#[derive(Debug)] pub enum NotDefinedHere {} pub mod os; diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs new file mode 100644 index 0000000..a765d76 --- /dev/null +++ b/src/io/bsd/poll_aio.rs @@ -0,0 +1,195 @@ +//! Use POSIX AIO futures with Tokio + +use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; +use mio::event::Source; +use mio::Registry; +use mio::Token; +use std::fmt; +use std::io; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::AsRawFd; +use std::os::unix::prelude::RawFd; +use std::task::{Context, Poll}; + +/// Like [`mio::event::Source`], but for POSIX AIO only. +/// +/// Tokio's consumer must pass an implementor of this trait to create a +/// [`Aio`] object. +pub trait AioSource { + /// Register this AIO event source with Tokio's reactor + fn register(&mut self, kq: RawFd, token: usize); + + /// Deregister this AIO event source with Tokio's reactor + fn deregister(&mut self); +} + +/// Wrap the user's AioSource in order to implement mio::event::Source, which +/// is what the rest of the crate wants. +struct MioSource<T>(T); + +impl<T: AioSource> Source for MioSource<T> { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + self.0.deregister(); + Ok(()) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } +} + +/// Associates a POSIX AIO control block with the reactor that drives it. +/// +/// `Aio`'s wrapped type must implement [`AioSource`] to be driven +/// by the reactor. +/// +/// The wrapped source may be accessed through the `Aio` via the `Deref` and +/// `DerefMut` traits. +/// +/// ## Clearing readiness +/// +/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the +/// Source is not completely ready and must return to the Pending state, +/// [`Aio::clear_ready`] may be used. This can be useful with +/// [`lio_listio`], which may generate a kevent when only a portion of the +/// operations have completed. +/// +/// ## Platforms +/// +/// Only FreeBSD implements POSIX AIO with kqueue notification, so +/// `Aio` is only available for that operating system. +/// +/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html +// Note: Unlike every other kqueue event source, POSIX AIO registers events not +// via kevent(2) but when the aiocb is submitted to the kernel via aio_read, +// aio_write, etc. It needs the kqueue's file descriptor to do that. So +// AsyncFd can't be used for POSIX AIO. +// +// Note that Aio doesn't implement Drop. There's no need. Unlike other +// kqueue sources, simply dropping the object effectively deregisters it. +pub struct Aio<E> { + io: MioSource<E>, + registration: Registration, +} + +// ===== impl Aio ===== + +impl<E: AioSource> Aio<E> { + /// Creates a new `Aio` suitable for use with POSIX AIO functions. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn new_for_aio(io: E) -> io::Result<Self> { + Self::new_with_interest(io, Interest::AIO) + } + + /// Creates a new `Aio` suitable for use with [`lio_listio`]. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn new_for_lio(io: E) -> io::Result<Self> { + Self::new_with_interest(io, Interest::LIO) + } + + fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { + let mut io = MioSource(io); + let handle = Handle::current(); + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; + Ok(Self { io, registration }) + } + + /// Indicates to Tokio that the source is no longer ready. The internal + /// readiness flag will be cleared, and tokio will wait for the next + /// edge-triggered readiness notification from the OS. + /// + /// It is critical that this method not be called unless your code + /// _actually observes_ that the source is _not_ ready. The OS must + /// deliver a subsequent notification, or this source will block + /// forever. It is equally critical that you `do` call this method if you + /// resubmit the same structure to the kernel and poll it again. + /// + /// This method is not very useful with AIO readiness, since each `aiocb` + /// structure is typically only used once. It's main use with + /// [`lio_listio`], which will sometimes send notification when only a + /// portion of its elements are complete. In that case, the caller must + /// call `clear_ready` before resubmitting it. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn clear_ready(&self, ev: AioEvent) { + self.registration.clear_readiness(ev.0) + } + + /// Destroy the [`Aio`] and return its inner source. + pub fn into_inner(self) -> E { + self.io.0 + } + + /// Polls for readiness. Either AIO or LIO counts. + /// + /// This method returns: + /// * `Poll::Pending` if the underlying operation is not complete, whether + /// or not it completed successfully. This will be true if the OS is + /// still processing it, or if it has not yet been submitted to the OS. + /// * `Poll::Ready(Ok(_))` if the underlying operation is complete. + /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does + /// _not_ indicate that the underlying operation encountered an error. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` + /// is scheduled to receive a wakeup when the underlying operation + /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the + /// `Context` passed to the most recent call is scheduled to receive a wakeup. + pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> { + let ev = ready!(self.registration.poll_read_ready(cx))?; + Poll::Ready(Ok(AioEvent(ev))) + } +} + +impl<E: AioSource> Deref for Aio<E> { + type Target = E; + + fn deref(&self) -> &E { + &self.io.0 + } +} + +impl<E: AioSource> DerefMut for Aio<E> { + fn deref_mut(&mut self) -> &mut E { + &mut self.io.0 + } +} + +impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Aio").field("io", &self.io.0).finish() + } +} + +/// Opaque data returned by [`Aio::poll_ready`]. +/// +/// It can be fed back to [`Aio::clear_ready`]. +#[derive(Debug)] +pub struct AioEvent(ReadyEvent); diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs index 36951cf..c5b18ed 100644 --- a/src/io/driver/interest.rs +++ b/src/io/driver/interest.rs @@ -14,6 +14,26 @@ use std::ops; pub struct Interest(mio::Interest); impl Interest { + // The non-FreeBSD definitions in this block are active only when + // building documentation. + cfg_aio! { + /// Interest for POSIX AIO + #[cfg(target_os = "freebsd")] + pub const AIO: Interest = Interest(mio::Interest::AIO); + + /// Interest for POSIX AIO + #[cfg(not(target_os = "freebsd"))] + pub const AIO: Interest = Interest(mio::Interest::READABLE); + + /// Interest for POSIX AIO lio_listio events + #[cfg(target_os = "freebsd")] + pub const LIO: Interest = Interest(mio::Interest::LIO); + + /// Interest for POSIX AIO lio_listio events + #[cfg(not(target_os = "freebsd"))] + pub const LIO: Interest = Interest(mio::Interest::READABLE); + } + /// Interest in all readable events. /// /// Readable interest includes read-closed events. diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index 3aa0cfb..1511884 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -51,6 +51,7 @@ pub(crate) struct Handle { inner: Weak<Inner>, } +#[derive(Debug)] pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs index 2ac01bd..305dc91 100644 --- a/src/io/driver/ready.rs +++ b/src/io/driver/ready.rs @@ -38,6 +38,17 @@ impl Ready { pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + if event.is_readable() { ready |= Ready::READABLE; } diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs index 5178010..a265720 100644 --- a/src/io/driver/scheduled_io.rs +++ b/src/io/driver/scheduled_io.rs @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; use crate::util::slab::Entry; +use crate::util::WakeList; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; @@ -212,10 +213,7 @@ impl ScheduledIo { } fn wake0(&self, ready: Ready, shutdown: bool) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); - let mut curr = 0; + let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); @@ -224,16 +222,14 @@ impl ScheduledIo { // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } // check for AsyncWrite slot if ready.is_writable() { if let Some(waker) = waiters.writer.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } @@ -241,15 +237,14 @@ impl ScheduledIo { 'outer: loop { let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - while curr < NUM_WAKERS { + while wakers.can_push() { match iter.next() { Some(waiter) => { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { waiter.is_ready = true; - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } None => { @@ -260,11 +255,7 @@ impl ScheduledIo { drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } - - curr = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -273,9 +264,7 @@ impl ScheduledIo { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { diff --git a/src/io/mod.rs b/src/io/mod.rs index 14a4a63..a5ee108 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -217,6 +217,15 @@ cfg_io_driver_impl! { pub(crate) use poll_evented::PollEvented; } +cfg_aio! { + /// BSD-specific I/O types + pub mod bsd { + mod poll_aio; + + pub use poll_aio::{Aio, AioEvent, AioSource}; + } +} + cfg_net_unix! { mod async_fd; diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs index 297a4a6..46b3e6c 100644 --- a/src/io/util/async_seek_ext.rs +++ b/src/io/util/async_seek_ext.rs @@ -67,6 +67,16 @@ cfg_io_util! { seek(self, pos) } + /// Creates a future which will rewind to the beginning of the stream. + /// + /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`. + fn rewind(&mut self) -> Seek<'_, Self> + where + Self: Unpin, + { + self.seek(SeekFrom::Start(0)) + } + /// Creates a future which will return the current seek position from the /// start of the stream. /// diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs index 98ae2ea..3655c01 100644 --- a/src/io/util/fill_buf.rs +++ b/src/io/util/fill_buf.rs @@ -34,15 +34,16 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - // Due to a limitation in the borrow-checker, we cannot return the value - // directly on Ready. Once Rust starts using the polonius borrow checker, - // this can be simplified. let reader = me.reader.take().expect("Polled after completion."); match Pin::new(&mut *reader).poll_fill_buf(cx) { - Poll::Ready(_) => match Pin::new(reader).poll_fill_buf(cx) { - Poll::Ready(slice) => Poll::Ready(slice), - Poll::Pending => panic!("poll_fill_buf returned Pending while having data"), + Poll::Ready(Ok(slice)) => unsafe { + // Safety: This is necessary only due to a limitation in the + // borrow checker. Once Rust starts using the polonius borrow + // checker, this can be simplified. + let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice); + Poll::Ready(Ok(slice)) }, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Pending => { *me.reader = Some(reader); Poll::Pending @@ -16,6 +16,7 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] #![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(docsrs, allow(unused_attributes))] //! A runtime for writing reliable network applications without compromising speed. //! @@ -204,9 +205,15 @@ //! ``` //! //! If your code is CPU-bound and you wish to limit the number of threads used -//! to run it, you should run it on another thread pool such as [rayon]. You -//! can use an [`oneshot`] channel to send the result back to Tokio when the -//! rayon task finishes. +//! to run it, you should use a separate thread pool dedicated to CPU bound tasks. +//! For example, you could consider using the [rayon] library for CPU-bound +//! tasks. It is also possible to create an extra Tokio runtime dedicated to +//! CPU-bound tasks, but if you do this, you should be careful that the extra +//! runtime runs _only_ CPU-bound tasks, as IO-bound tasks on that runtime +//! will behave poorly. +//! +//! Hint: If using rayon, you can use a [`oneshot`] channel to send the result back +//! to Tokio when the rayon task finishes. //! //! [rayon]: https://docs.rs/rayon //! [`oneshot`]: crate::sync::oneshot @@ -307,8 +314,9 @@ //! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `io-util`: Enables the IO based `Ext` traits. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. -//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`, -//! as well as (on Unix-like systems) `AsyncFd` +//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and +//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on +//! FreeBSD) `PollAio`. //! - `time`: Enables `tokio::time` types and allows the schedulers to enable //! the built in timer. //! - `process`: Enables `tokio::process` types. diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs index b29cbee..8b6e8bc 100644 --- a/src/loom/std/mod.rs +++ b/src/loom/std/mod.rs @@ -93,4 +93,17 @@ pub(crate) mod sys { } } -pub(crate) use std::thread; +pub(crate) mod thread { + #[inline] + pub(crate) fn yield_now() { + // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead. + #[allow(deprecated)] + std::sync::atomic::spin_loop_hint(); + } + + #[allow(unused_imports)] + pub(crate) use std::thread::{ + current, panicking, park, park_timeout, sleep, spawn, Builder, JoinHandle, LocalKey, + Result, Thread, ThreadId, + }; +} diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs index 7c87522..193bcd7 100644 --- a/src/macros/cfg.rs +++ b/src/macros/cfg.rs @@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl { } } +macro_rules! cfg_aio { + ($($item:item)*) => { + $( + #[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))] + #[cfg_attr(docsrs, + doc(cfg(all(target_os = "freebsd", feature = "net"))) + )] + $item + )* + } +} + macro_rules! cfg_fs { ($($item:item)*) => { $( @@ -162,6 +174,25 @@ macro_rules! cfg_macros { } } +macro_rules! cfg_stats { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "stats"))] + #[cfg_attr(docsrs, doc(cfg(feature = "stats")))] + $item + )* + } +} + +macro_rules! cfg_not_stats { + ($($item:item)*) => { + $( + #[cfg(not(all(tokio_unstable, feature = "stats")))] + $item + )* + } +} + macro_rules! cfg_net { ($($item:item)*) => { $( @@ -176,7 +207,7 @@ macro_rules! cfg_net_unix { ($($item:item)*) => { $( #[cfg(all(unix, feature = "net"))] - #[cfg_attr(docsrs, doc(cfg(feature = "net")))] + #[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))] $item )* } diff --git a/src/process/mod.rs b/src/process/mod.rs index 42654b1..7a15024 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -225,9 +225,9 @@ pub struct Command { pub(crate) struct SpawnedChild { child: imp::Child, - stdin: Option<imp::ChildStdin>, - stdout: Option<imp::ChildStdout>, - stderr: Option<imp::ChildStderr>, + stdin: Option<imp::ChildStdio>, + stdout: Option<imp::ChildStdio>, + stderr: Option<imp::ChildStdio>, } impl Command { @@ -1151,7 +1151,7 @@ impl Child { /// handle of a child process asynchronously. #[derive(Debug)] pub struct ChildStdin { - inner: imp::ChildStdin, + inner: imp::ChildStdio, } /// The standard output stream for spawned children. @@ -1160,7 +1160,7 @@ pub struct ChildStdin { /// handle of a child process asynchronously. #[derive(Debug)] pub struct ChildStdout { - inner: imp::ChildStdout, + inner: imp::ChildStdio, } /// The standard error stream for spawned children. @@ -1169,7 +1169,52 @@ pub struct ChildStdout { /// handle of a child process asynchronously. #[derive(Debug)] pub struct ChildStderr { - inner: imp::ChildStderr, + inner: imp::ChildStdio, +} + +impl ChildStdin { + /// Create an asynchronous `ChildStdin` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStdin) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } +} + +impl ChildStdout { + /// Create an asynchronous `ChildStderr` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStdout) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } +} + +impl ChildStderr { + /// Create an asynchronous `ChildStderr` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStderr) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } } impl AsyncWrite for ChildStdin { diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index fab63dd..0f379c9 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -101,9 +101,9 @@ impl fmt::Debug for Child { pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { let mut child = cmd.spawn()?; - let stdin = stdio(child.stdin.take())?; - let stdout = stdio(child.stdout.take())?; - let stderr = stdio(child.stderr.take())?; + let stdin = child.stdin.take().map(stdio).transpose()?; + let stdout = child.stdout.take().map(stdio).transpose()?; + let stderr = child.stderr.take().map(stdio).transpose()?; let signal = signal(SignalKind::child())?; @@ -213,9 +213,7 @@ impl Source for Pipe { } } -pub(crate) type ChildStdin = PollEvented<Pipe>; -pub(crate) type ChildStdout = PollEvented<Pipe>; -pub(crate) type ChildStderr = PollEvented<Pipe>; +pub(crate) type ChildStdio = PollEvented<Pipe>; fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { unsafe { @@ -240,18 +238,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> Ok(()) } -fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>> +pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>> where T: IntoRawFd, { - let io = match option { - Some(io) => io, - None => return Ok(None), - }; - // Set the fd to nonblocking before we pass it to the event loop let mut pipe = Pipe::from(io); set_nonblocking(&mut pipe, true)?; - Ok(Some(PollEvented::new(pipe)?)) + PollEvented::new(pipe) } diff --git a/src/process/windows.rs b/src/process/windows.rs index 06fc1b6..136d5b0 100644 --- a/src/process/windows.rs +++ b/src/process/windows.rs @@ -67,9 +67,9 @@ unsafe impl Send for Waiting {} pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild> { let mut child = cmd.spawn()?; - let stdin = stdio(child.stdin.take()); - let stdout = stdio(child.stdout.take()); - let stderr = stdio(child.stderr.take()); + let stdin = child.stdin.take().map(stdio).transpose()?; + let stdout = child.stdout.take().map(stdio).transpose()?; + let stderr = child.stderr.take().map(stdio).transpose()?; Ok(SpawnedChild { child: Child { @@ -167,20 +167,14 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) { let _ = complete.take().unwrap().send(()); } -pub(crate) type ChildStdin = PollEvented<NamedPipe>; -pub(crate) type ChildStdout = PollEvented<NamedPipe>; -pub(crate) type ChildStderr = PollEvented<NamedPipe>; +pub(crate) type ChildStdio = PollEvented<NamedPipe>; -fn stdio<T>(option: Option<T>) -> Option<PollEvented<NamedPipe>> +pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>> where T: IntoRawHandle, { - let io = match option { - Some(io) => io, - None => return None, - }; let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) }; - PollEvented::new(pipe).ok() + PollEvented::new(pipe) } pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> { diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs index fe2e4a8..e37d872 100644 --- a/src/runtime/basic_scheduler.rs +++ b/src/runtime/basic_scheduler.rs @@ -2,7 +2,9 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; +use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::Callback; use crate::sync::notify::Notify; use crate::util::{waker_ref, Wake, WakerRef}; @@ -47,6 +49,14 @@ struct Inner<P: Park> { /// Thread park handle park: P, + + /// Callback for a worker parking itself + before_park: Option<Callback>, + /// Callback for a worker unparking itself + after_unpark: Option<Callback>, + + /// Stats batcher + stats: WorkerStatsBatcher, } #[derive(Clone)] @@ -87,6 +97,9 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, + + /// Keeps track of various runtime stats. + stats: RuntimeStats, } /// Thread-local context. @@ -114,7 +127,11 @@ const REMOTE_FIRST_INTERVAL: u8 = 31; scoped_thread_local!(static CURRENT: Context); impl<P: Park> BasicScheduler<P> { - pub(crate) fn new(park: P) -> BasicScheduler<P> { + pub(crate) fn new( + park: P, + before_park: Option<Callback>, + after_unpark: Option<Callback>, + ) -> BasicScheduler<P> { let unpark = Box::new(park.unpark()); let spawner = Spawner { @@ -123,6 +140,7 @@ impl<P: Park> BasicScheduler<P> { owned: OwnedTasks::new(), unpark: unpark as Box<dyn Unpark>, woken: AtomicBool::new(false), + stats: RuntimeStats::new(1), }), }; @@ -133,6 +151,9 @@ impl<P: Park> BasicScheduler<P> { spawner: spawner.clone(), tick: 0, park, + before_park, + after_unpark, + stats: WorkerStatsBatcher::new(0), })); BasicScheduler { @@ -205,6 +226,7 @@ impl<P: Park> Inner<P> { 'outer: loop { if scheduler.spawner.was_woken() || !polled { polled = true; + scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; } @@ -237,8 +259,21 @@ impl<P: Park> Inner<P> { let entry = match entry { Some(entry) => entry, None => { - // Park until the thread is signaled - scheduler.park.park().expect("failed to park"); + if let Some(f) = &scheduler.before_park { + f(); + } + // This check will fail if `before_park` spawns a task for us to run + // instead of parking the thread + if context.tasks.borrow_mut().queue.is_empty() { + // Park until the thread is signaled + scheduler.stats.about_to_park(); + scheduler.stats.submit(&scheduler.spawner.shared.stats); + scheduler.park.park().expect("failed to park"); + scheduler.stats.returned_from_park(); + } + if let Some(f) = &scheduler.after_unpark { + f(); + } // Try polling the `block_on` future next continue 'outer; @@ -247,6 +282,7 @@ impl<P: Park> Inner<P> { match entry { RemoteMsg::Schedule(task) => { + scheduler.stats.incr_poll_count(); let task = context.shared.owned.assert_owner(task); crate::coop::budget(|| task.run()) } @@ -255,6 +291,7 @@ impl<P: Park> Inner<P> { // Yield to the park, this drives the timer and pulls any pending // I/O events. + scheduler.stats.submit(&scheduler.spawner.shared.stats); scheduler .park .park_timeout(Duration::from_millis(0)) @@ -369,6 +406,10 @@ impl Spawner { handle } + pub(crate) fn stats(&self) -> &RuntimeStats { + &self.shared.stats + } + fn pop(&self) -> Option<RemoteMsg> { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index 51bf8c8..91c365f 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -70,6 +70,12 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option<Callback>, + /// To run before each worker thread is parked. + pub(super) before_park: Option<Callback>, + + /// To run after each thread is unparked. + pub(super) after_unpark: Option<Callback>, + /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option<Duration>, } @@ -135,6 +141,8 @@ impl Builder { // No worker thread callbacks after_start: None, before_stop: None, + before_park: None, + after_unpark: None, keep_alive: None, } @@ -374,6 +382,120 @@ impl Builder { self } + /// Executes function `f` just before a thread is parked (goes idle). + /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) + /// can be called, and may result in this thread being unparked immediately. + /// + /// This can be used to start work only when the executor is idle, or for bookkeeping + /// and monitoring purposes. + /// + /// Note: There can only be one park callback for a runtime; calling this function + /// more than once replaces the last callback defined, rather than adding to it. + /// + /// # Examples + /// + /// ## Multithreaded executor + /// ``` + /// # use std::sync::Arc; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use tokio::runtime; + /// # use tokio::sync::Barrier; + /// # pub fn main() { + /// let once = AtomicBool::new(true); + /// let barrier = Arc::new(Barrier::new(2)); + /// + /// let runtime = runtime::Builder::new_multi_thread() + /// .worker_threads(1) + /// .on_thread_park({ + /// let barrier = barrier.clone(); + /// move || { + /// let barrier = barrier.clone(); + /// if once.swap(false, Ordering::Relaxed) { + /// tokio::spawn(async move { barrier.wait().await; }); + /// } + /// } + /// }) + /// .build() + /// .unwrap(); + /// + /// runtime.block_on(async { + /// barrier.wait().await; + /// }) + /// # } + /// ``` + /// ## Current thread executor + /// ``` + /// # use std::sync::Arc; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use tokio::runtime; + /// # use tokio::sync::Barrier; + /// # pub fn main() { + /// let once = AtomicBool::new(true); + /// let barrier = Arc::new(Barrier::new(2)); + /// + /// let runtime = runtime::Builder::new_current_thread() + /// .on_thread_park({ + /// let barrier = barrier.clone(); + /// move || { + /// let barrier = barrier.clone(); + /// if once.swap(false, Ordering::Relaxed) { + /// tokio::spawn(async move { barrier.wait().await; }); + /// } + /// } + /// }) + /// .build() + /// .unwrap(); + /// + /// runtime.block_on(async { + /// barrier.wait().await; + /// }) + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.before_park = Some(std::sync::Arc::new(f)); + self + } + + /// Executes function `f` just after a thread unparks (starts executing tasks). + /// + /// This is intended for bookkeeping and monitoring use cases; note that work + /// in this callback will increase latencies when the application has allowed one or + /// more runtime threads to go idle. + /// + /// Note: There can only be one unpark callback for a runtime; calling this function + /// more than once replaces the last callback defined, rather than adding to it. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new_multi_thread() + /// .on_thread_unpark(|| { + /// println!("thread unparking"); + /// }) + /// .build(); + /// + /// runtime.unwrap().block_on(async { + /// tokio::task::yield_now().await; + /// println!("Hello from Tokio!"); + /// }) + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.after_unpark = Some(std::sync::Arc::new(f)); + self + } + /// Creates the configured `Runtime`. /// /// The returned `Runtime` instance is ready to spawn tasks. @@ -441,7 +563,8 @@ impl Builder { // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = BasicScheduler::new(driver); + let scheduler = + BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool @@ -546,7 +669,7 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); + let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone()); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool @@ -587,7 +710,9 @@ impl fmt::Debug for Builder { ) .field("thread_stack_size", &self.thread_stack_size) .field("after_start", &self.after_start.as_ref().map(|_| "...")) - .field("before_stop", &self.after_start.as_ref().map(|_| "...")) + .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) + .field("before_park", &self.before_park.as_ref().map(|_| "...")) + .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) .finish() } } diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index ddc170a..bad6a00 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -111,6 +111,14 @@ impl Handle { context::current().ok_or(TryCurrentError(())) } + cfg_stats! { + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats { + self.spawner.stats() + } + } + /// Spawn a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a @@ -192,20 +200,20 @@ impl Handle { let location = std::panic::Location::caller(); #[cfg(tokio_track_caller)] let span = tracing::trace_span!( - target: "tokio::task", - "task", + target: "tokio::task::blocking", + "runtime.spawn", kind = %"blocking", - function = %std::any::type_name::<F>(), task.name = %name.unwrap_or_default(), + "fn" = %std::any::type_name::<F>(), spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), ); #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( - target: "tokio::task", - "task", + target: "tokio::task::blocking", + "runtime.spawn", kind = %"blocking", task.name = %name.unwrap_or_default(), - function = %std::any::type_name::<F>(), + "fn" = %std::any::type_name::<F>(), ); fut.instrument(span) }; @@ -288,7 +296,11 @@ impl Handle { /// [`tokio::fs`]: crate::fs /// [`tokio::net`]: crate::net /// [`tokio::time`]: crate::time + #[cfg_attr(tokio_track_caller, track_caller)] pub fn block_on<F: Future>(&self, future: F) -> F::Output { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "block_on", None); + // Enter the **runtime** context. This configures spawning, the current I/O driver, ... let _rt_enter = self.enter(); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 52532ec..ec7d0c0 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -181,6 +181,13 @@ pub(crate) mod enter; pub(crate) mod task; +cfg_stats! { + pub mod stats; +} +cfg_not_stats! { + pub(crate) mod stats; +} + cfg_rt! { mod basic_scheduler; use basic_scheduler::BasicScheduler; @@ -443,7 +450,11 @@ cfg_rt! { /// ``` /// /// [handle]: fn@Handle::block_on + #[cfg_attr(tokio_track_caller, track_caller)] pub fn block_on<F: Future>(&self, future: F) -> F::Output { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "block_on", None); + let _enter = self.enter(); match &self.kind { diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs index c45cb6a..a88dffc 100644 --- a/src/runtime/queue.rs +++ b/src/runtime/queue.rs @@ -3,6 +3,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; use crate::loom::sync::Arc; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject}; use std::mem::MaybeUninit; @@ -92,6 +93,14 @@ impl<T> Local<T> { !self.inner.is_empty() } + /// Returns false if there are any entries in the queue + /// + /// Separate to is_stealable so that refactors of is_stealable to "protect" + /// some tasks from stealing won't affect this + pub(super) fn has_tasks(&self) -> bool { + !self.inner.is_empty() + } + /// Pushes a task to the back of the local queue, skipping the LIFO slot. pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) { let tail = loop { @@ -288,7 +297,11 @@ impl<T> Steal<T> { } /// Steals half the tasks from self and place them into `dst`. - pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> { + pub(super) fn steal_into( + &self, + dst: &mut Local<T>, + stats: &mut WorkerStatsBatcher, + ) -> Option<task::Notified<T>> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. let dst_tail = unsafe { dst.inner.tail.unsync_load() }; @@ -307,6 +320,7 @@ impl<T> Steal<T> { // Steal the tasks into `dst`'s buffer. This does not yet expose the // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); + stats.incr_steal_count(n); if n == 0 { // No tasks were stolen diff --git a/src/runtime/spawner.rs b/src/runtime/spawner.rs index fbcde2c..9a3d465 100644 --- a/src/runtime/spawner.rs +++ b/src/runtime/spawner.rs @@ -1,8 +1,7 @@ -cfg_rt! { - use crate::future::Future; - use crate::runtime::basic_scheduler; - use crate::task::JoinHandle; -} +use crate::future::Future; +use crate::runtime::basic_scheduler; +use crate::runtime::stats::RuntimeStats; +use crate::task::JoinHandle; cfg_rt_multi_thread! { use crate::runtime::thread_pool; @@ -10,7 +9,6 @@ cfg_rt_multi_thread! { #[derive(Debug, Clone)] pub(crate) enum Spawner { - #[cfg(feature = "rt")] Basic(basic_scheduler::Spawner), #[cfg(feature = "rt-multi-thread")] ThreadPool(thread_pool::Spawner), @@ -25,21 +23,25 @@ impl Spawner { } } } -} -cfg_rt! { - impl Spawner { - pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match self { - #[cfg(feature = "rt")] - Spawner::Basic(spawner) => spawner.spawn(future), - #[cfg(feature = "rt-multi-thread")] - Spawner::ThreadPool(spawner) => spawner.spawn(future), - } + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Spawner::Basic(spawner) => spawner.spawn(future), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.spawn(future), + } + } + + #[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))] + pub(crate) fn stats(&self) -> &RuntimeStats { + match self { + Spawner::Basic(spawner) => spawner.stats(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.stats(), } } } diff --git a/src/runtime/stats/mock.rs b/src/runtime/stats/mock.rs new file mode 100644 index 0000000..3bda8bf --- /dev/null +++ b/src/runtime/stats/mock.rs @@ -0,0 +1,27 @@ +//! This file contains mocks of the types in src/runtime/stats/stats.rs + +pub(crate) struct RuntimeStats {} + +impl RuntimeStats { + pub(crate) fn new(_worker_threads: usize) -> Self { + Self {} + } +} + +pub(crate) struct WorkerStatsBatcher {} + +impl WorkerStatsBatcher { + pub(crate) fn new(_my_index: usize) -> Self { + Self {} + } + + pub(crate) fn submit(&mut self, _to: &RuntimeStats) {} + + pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn returned_from_park(&mut self) {} + + #[cfg(feature = "rt-multi-thread")] + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + + pub(crate) fn incr_poll_count(&mut self) {} +} diff --git a/src/runtime/stats/mod.rs b/src/runtime/stats/mod.rs new file mode 100644 index 0000000..5e08e8e --- /dev/null +++ b/src/runtime/stats/mod.rs @@ -0,0 +1,17 @@ +//! This module contains information need to view information about how the +//! runtime is performing. +#![allow(clippy::module_inception)] + +cfg_stats! { + mod stats; + + pub use self::stats::{RuntimeStats, WorkerStats}; + pub(crate) use self::stats::WorkerStatsBatcher; +} + +cfg_not_stats! { + #[path = "mock.rs"] + mod stats; + + pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher}; +} diff --git a/src/runtime/stats/stats.rs b/src/runtime/stats/stats.rs new file mode 100644 index 0000000..39a48ae --- /dev/null +++ b/src/runtime/stats/stats.rs @@ -0,0 +1,97 @@ +//! This file contains the types necessary to collect various types of stats. +use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; + +/// This type contains methods to retrieve stats from a Tokio runtime. +#[derive(Debug)] +pub struct RuntimeStats { + workers: Box<[WorkerStats]>, +} + +/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime. +#[derive(Debug)] +#[repr(align(128))] +pub struct WorkerStats { + park_count: AtomicU64, + steal_count: AtomicU64, + poll_count: AtomicU64, +} + +impl RuntimeStats { + pub(crate) fn new(worker_threads: usize) -> Self { + let mut workers = Vec::with_capacity(worker_threads); + for _ in 0..worker_threads { + workers.push(WorkerStats { + park_count: AtomicU64::new(0), + steal_count: AtomicU64::new(0), + poll_count: AtomicU64::new(0), + }); + } + + Self { + workers: workers.into_boxed_slice(), + } + } + + /// Returns a slice containing the worker stats for each worker thread. + pub fn workers(&self) -> impl Iterator<Item = &WorkerStats> { + self.workers.iter() + } +} + +impl WorkerStats { + /// Returns the total number of times this worker thread has parked. + pub fn park_count(&self) -> u64 { + self.park_count.load(Relaxed) + } + + /// Returns the number of tasks this worker has stolen from other worker + /// threads. + pub fn steal_count(&self) -> u64 { + self.steal_count.load(Relaxed) + } + + /// Returns the number of times this worker has polled a task. + pub fn poll_count(&self) -> u64 { + self.poll_count.load(Relaxed) + } +} + +pub(crate) struct WorkerStatsBatcher { + my_index: usize, + park_count: u64, + steal_count: u64, + poll_count: u64, +} + +impl WorkerStatsBatcher { + pub(crate) fn new(my_index: usize) -> Self { + Self { + my_index, + park_count: 0, + steal_count: 0, + poll_count: 0, + } + } + pub(crate) fn submit(&mut self, to: &RuntimeStats) { + let worker = &to.workers[self.my_index]; + + worker.park_count.store(self.park_count, Relaxed); + worker.steal_count.store(self.steal_count, Relaxed); + worker.poll_count.store(self.poll_count, Relaxed); + } + + pub(crate) fn about_to_park(&mut self) { + self.park_count += 1; + } + + pub(crate) fn returned_from_park(&mut self) {} + + #[cfg(feature = "rt-multi-thread")] + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.steal_count += u64::from(by); + } + + pub(crate) fn incr_poll_count(&mut self) { + self.poll_count += 1; + } +} diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs index 2fe40a7..0abbff2 100644 --- a/src/runtime/task/join.rs +++ b/src/runtime/task/join.rs @@ -162,7 +162,7 @@ impl<T> JoinHandle<T> { /// /// Awaiting a cancelled task might complete as usual if the task was /// already completed at the time it was cancelled, but most likely it - /// will complete with a `Err(JoinError::Cancelled)`. + /// will fail with a [cancelled] `JoinError`. /// /// ```rust /// use tokio::time; @@ -190,6 +190,7 @@ impl<T> JoinHandle<T> { /// } /// } /// ``` + /// [cancelled]: method@super::error::JoinError::is_cancelled pub fn abort(&self) { if let Some(raw) = self.raw { raw.remote_abort(); diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs index a1ed171..2cbb0a1 100644 --- a/src/runtime/tests/loom_queue.rs +++ b/src/runtime/tests/loom_queue.rs @@ -1,5 +1,6 @@ use crate::runtime::blocking::NoopSchedule; use crate::runtime::queue; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::Inject; use loom::thread; @@ -11,11 +12,12 @@ fn basic() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -65,10 +67,11 @@ fn steal_overflow() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -113,9 +116,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local).is_none() { + if steal.steal_into(&mut local, &mut stats).is_none() { return 0; } @@ -165,6 +169,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { + let mut stats = WorkerStatsBatcher::new(0); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -180,8 +185,9 @@ fn chained_steal() { // Spawn a task to steal from **our** queue let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); - s1.steal_into(&mut local); + s1.steal_into(&mut local, &mut stats); while local.pop().is_some() {} }); @@ -189,7 +195,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1); + s2.steal_into(&mut l1, &mut stats); th.join().unwrap(); diff --git a/src/runtime/tests/queue.rs b/src/runtime/tests/queue.rs index 428b002..47f1b01 100644 --- a/src/runtime/tests/queue.rs +++ b/src/runtime/tests/queue.rs @@ -1,4 +1,5 @@ use crate::runtime::queue; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject, Schedule, Task}; use std::thread; @@ -44,6 +45,8 @@ fn overflow() { #[test] fn steal_batch() { + let mut stats = WorkerStatsBatcher::new(0); + let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); let inject = Inject::new(); @@ -53,7 +56,7 @@ fn steal_batch() { local1.push_back(task, &inject); } - assert!(steal1.steal_into(&mut local2).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); for _ in 0..1 { assert!(local2.pop().is_some()); @@ -81,11 +84,12 @@ fn stress1() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -137,11 +141,12 @@ fn stress2() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs index 3808aa2..f2e68f6 100644 --- a/src/runtime/thread_pool/mod.rs +++ b/src/runtime/thread_pool/mod.rs @@ -12,8 +12,9 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; +use crate::runtime::stats::RuntimeStats; use crate::runtime::task::JoinHandle; -use crate::runtime::Parker; +use crate::runtime::{Callback, Parker}; use std::fmt; use std::future::Future; @@ -43,8 +44,13 @@ pub(crate) struct Spawner { // ===== impl ThreadPool ===== impl ThreadPool { - pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker); + pub(crate) fn new( + size: usize, + parker: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, + ) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, parker, before_park, after_unpark); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; @@ -99,6 +105,10 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } + + pub(crate) fn stats(&self) -> &RuntimeStats { + self.shared.stats() + } } impl fmt::Debug for Spawner { diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index f5004c0..44f8db8 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -64,9 +64,10 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{AtomicCell, Idle}; -use crate::runtime::{queue, task}; +use crate::runtime::{queue, task, Callback}; use crate::util::FastRand; use std::cell::RefCell; @@ -112,6 +113,9 @@ struct Core { /// borrow checker happy. park: Option<Parker>, + /// Batching stats so they can be submitted to RuntimeStats. + stats: WorkerStatsBatcher, + /// Fast random number generator. rand: FastRand, } @@ -137,6 +141,14 @@ pub(super) struct Shared { /// stolen by a thread that was spawned as part of `block_in_place`. #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex<Vec<Box<Core>>>, + + /// Callback for a worker parking itself + before_park: Option<Callback>, + /// Callback for a worker unparking itself + after_unpark: Option<Callback>, + + /// Collect stats from the runtime. + stats: RuntimeStats, } /// Used to communicate with a worker from other threads. @@ -174,12 +186,17 @@ type Notified = task::Notified<Arc<Shared>>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { +pub(super) fn create( + size: usize, + park: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, +) -> (Arc<Shared>, Launch) { let mut cores = vec![]; let mut remotes = vec![]; // Create the local queues - for _ in 0..size { + for i in 0..size { let (steal, run_queue) = queue::local(); let park = park.clone(); @@ -192,6 +209,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { is_searching: false, is_shutdown: false, park: Some(park), + stats: WorkerStatsBatcher::new(i), rand: FastRand::new(seed()), })); @@ -204,6 +222,9 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), + before_park, + after_unpark, + stats: RuntimeStats::new(size), }); let mut launch = Launch(vec![]); @@ -391,6 +412,7 @@ impl Context { core.transition_from_searching(&self.worker); // Make the core available to the runtime context + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); // Run the task @@ -415,6 +437,7 @@ impl Context { if coop::has_budget_remaining() { // Run the LIFO task, then loop + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); let task = self.worker.shared.owned.assert_owner(task); task.run(); @@ -442,19 +465,26 @@ impl Context { } fn park(&self, mut core: Box<Core>) -> Box<Core> { - core.transition_to_parked(&self.worker); + if let Some(f) = &self.worker.shared.before_park { + f(); + } - while !core.is_shutdown { - core = self.park_timeout(core, None); + if core.transition_to_parked(&self.worker) { + while !core.is_shutdown { + core = self.park_timeout(core, None); - // Run regularly scheduled maintenance - core.maintenance(&self.worker); + // Run regularly scheduled maintenance + core.maintenance(&self.worker); - if core.transition_from_parked(&self.worker) { - return core; + if core.transition_from_parked(&self.worker) { + break; + } } } + if let Some(f) = &self.worker.shared.after_unpark { + f(); + } core } @@ -462,6 +492,8 @@ impl Context { // Take the parker out of core let mut park = core.park.take().expect("park missing"); + core.stats.about_to_park(); + // Store `core` in context *self.core.borrow_mut() = Some(core); @@ -483,6 +515,8 @@ impl Context { self.worker.shared.notify_parked(); } + core.stats.returned_from_park(); + core } } @@ -524,7 +558,10 @@ impl Core { } let target = &worker.shared.remotes[i]; - if let Some(task) = target.steal.steal_into(&mut self.run_queue) { + if let Some(task) = target + .steal + .steal_into(&mut self.run_queue, &mut self.stats) + { return Some(task); } } @@ -551,7 +588,14 @@ impl Core { } /// Prepare the worker state for parking - fn transition_to_parked(&mut self, worker: &Worker) { + /// + /// Returns true if the transition happend, false if there is work to do first + fn transition_to_parked(&mut self, worker: &Worker) -> bool { + // Workers should not park if they have work to do + if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + return false; + } + // When the final worker transitions **out** of searching to parked, it // must check all the queues one last time in case work materialized // between the last work scan and transitioning out of searching. @@ -567,6 +611,8 @@ impl Core { if is_last_searcher { worker.shared.notify_if_work_pending(); } + + true } /// Returns `true` if the transition happened. @@ -590,6 +636,8 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { + self.stats.submit(&worker.shared.stats); + if !self.is_shutdown { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); @@ -601,6 +649,8 @@ impl Core { fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. worker.shared.owned.close_and_shutdown_all(); + + self.stats.submit(&worker.shared.stats); } /// Shutdown the core @@ -651,6 +701,10 @@ impl Shared { handle } + pub(crate) fn stats(&self) -> &RuntimeStats { + &self.stats + } + pub(super) fn schedule(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs index 872d53e..9b43404 100644 --- a/src/sync/batch_semaphore.rs +++ b/src/sync/batch_semaphore.rs @@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::future::Future; use std::marker::PhantomPinned; @@ -239,12 +240,12 @@ impl Semaphore { /// If `rem` exceeds the number of permits needed by the wait list, the /// remainder are assigned back to the semaphore. fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { - let mut wakers: [Option<Waker>; 8] = Default::default(); + let mut wakers = WakeList::new(); let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); - 'inner: for slot in &mut wakers[..] { + 'inner: while wakers.can_push() { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { Some(waiter) => { @@ -260,7 +261,11 @@ impl Semaphore { } }; let mut waiter = waiters.queue.pop_back().unwrap(); - *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; + if let Some(waker) = + unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } + { + wakers.push(waker); + } } if rem > 0 && is_empty { @@ -283,10 +288,7 @@ impl Semaphore { drop(waiters); // release the lock - wakers - .iter_mut() - .filter_map(Option::take) - .for_each(Waker::wake); + wakers.wake_all(); } assert_eq!(rem, 0); diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs index 7a0873b..6e7b700 100644 --- a/src/sync/mpsc/block.rs +++ b/src/sync/mpsc/block.rs @@ -343,13 +343,7 @@ impl<T> Block<T> { Err(curr) => curr, }; - #[cfg(all(test, loom))] crate::loom::thread::yield_now(); - - // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead. - #[cfg(not(all(test, loom)))] - #[allow(deprecated)] - std::sync::atomic::spin_loop_hint(); } } } diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index d7af172..bcad84d 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -1,6 +1,6 @@ use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{SendError, TrySendError}; +use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; cfg_time! { use crate::sync::mpsc::error::SendTimeoutError; @@ -14,8 +14,8 @@ use std::task::{Context, Poll}; /// /// Instances are created by the [`channel`](channel) function. /// -/// To use the `Sender` in a poll function, you can use the [`PollSender`] -/// utility. +/// To convert the `Sender` into a `Sink` or use it in a poll function, you can +/// use the [`PollSender`] utility. /// /// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html pub struct Sender<T> { @@ -187,6 +187,50 @@ impl<T> Receiver<T> { poll_fn(|cx| self.chan.recv(cx)).await } + /// Try to receive the next value for this receiver. + /// + /// This method returns the [`Empty`] error if the channel is currently + /// empty, but there are still outstanding [senders] or [permits]. + /// + /// This method returns the [`Disconnected`] error if the channel is + /// currently empty, and there are no outstanding [senders] or [permits]. + /// + /// Unlike the [`poll_recv`] method, this method will never return an + /// [`Empty`] error spuriously. + /// + /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty + /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected + /// [`poll_recv`]: Self::poll_recv + /// [senders]: crate::sync::mpsc::Sender + /// [permits]: crate::sync::mpsc::Permit + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// use tokio::sync::mpsc::error::TryRecvError; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(100); + /// + /// tx.send("hello").await.unwrap(); + /// + /// assert_eq!(Ok("hello"), rx.try_recv()); + /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + /// + /// tx.send("hello").await.unwrap(); + /// // Drop the last sender, closing the channel. + /// drop(tx); + /// + /// assert_eq!(Ok("hello"), rx.try_recv()); + /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); + /// } + /// ``` + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + self.chan.try_recv() + } + /// Blocking receive to call outside of asynchronous contexts. /// /// This method returns `None` if the channel has been closed and there are @@ -291,7 +335,7 @@ impl<T> Receiver<T> { /// This method returns: /// /// * `Poll::Pending` if no messages are available but the channel is not - /// closed. + /// closed, or if a spurious failure happens. /// * `Poll::Ready(Some(message))` if a message is available. /// * `Poll::Ready(None)` if the channel has been closed and all messages /// sent before it was closed have been received. @@ -301,6 +345,12 @@ impl<T> Receiver<T> { /// receiver, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the `Waker` from the `Context` passed to the most /// recent call is scheduled to receive a wakeup. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -811,7 +861,8 @@ impl<T> Sender<T> { pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> { match self.chan.semaphore().0.try_acquire(1) { Ok(_) => {} - Err(_) => return Err(TrySendError::Full(())), + Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), + Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), } Ok(Permit { chan: &self.chan }) @@ -875,7 +926,8 @@ impl<T> Sender<T> { pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> { match self.chan.semaphore().0.try_acquire(1) { Ok(_) => {} - Err(_) => return Err(TrySendError::Full(self)), + Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)), + Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)), } Ok(OwnedPermit { diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs index 554d022..637ae1f 100644 --- a/src/sync/mpsc/chan.rs +++ b/src/sync/mpsc/chan.rs @@ -2,6 +2,9 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; +use crate::park::thread::CachedParkThread; +use crate::park::Park; +use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; use crate::sync::notify::Notify; @@ -263,6 +266,51 @@ impl<T, S: Semaphore> Rx<T, S> { } }) } + + /// Try to receive the next value. + pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> { + use super::list::TryPopResult; + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.try_pop(&self.inner.tx) { + TryPopResult::Ok(value) => { + self.inner.semaphore.add_permit(); + return Ok(value); + } + TryPopResult::Closed => return Err(TryRecvError::Disconnected), + TryPopResult::Empty => return Err(TryRecvError::Empty), + TryPopResult::Busy => {} // fall through + } + }; + } + + try_recv!(); + + // If a previous `poll_recv` call has set a waker, we wake it here. + // This allows us to put our own CachedParkThread waker in the + // AtomicWaker slot instead. + // + // This is not a spurious wakeup to `poll_recv` since we just got a + // Busy from `try_pop`, which only happens if there are messages in + // the queue. + self.inner.rx_waker.wake(); + + // Park the thread until the problematic send has completed. + let mut park = CachedParkThread::new(); + let waker = park.unpark().into_waker(); + loop { + self.inner.rx_waker.register_by_ref(&waker); + // It is possible that the problematic send has now completed, + // so we have to check for messages again. + try_recv!(); + park.park().expect("park failed"); + } + }) + } } impl<T, S: Semaphore> Drop for Rx<T, S> { diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs index 0d25ad3..48ca379 100644 --- a/src/sync/mpsc/error.rs +++ b/src/sync/mpsc/error.rs @@ -51,6 +51,30 @@ impl<T> From<SendError<T>> for TrySendError<T> { } } +// ===== TryRecvError ===== + +/// Error returned by `try_recv`. +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum TryRecvError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + Disconnected, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(fmt), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt), + } + } +} + +impl Error for TryRecvError {} + // ===== RecvError ===== /// Error returned by `Receiver`. diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs index 5dad2ba..53c34d2 100644 --- a/src/sync/mpsc/list.rs +++ b/src/sync/mpsc/list.rs @@ -13,23 +13,35 @@ pub(crate) struct Tx<T> { /// Tail in the `Block` mpmc list. block_tail: AtomicPtr<Block<T>>, - /// Position to push the next message. This reference a block and offset + /// Position to push the next message. This references a block and offset /// into the block. tail_position: AtomicUsize, } /// List queue receive handle pub(crate) struct Rx<T> { - /// Pointer to the block being processed + /// Pointer to the block being processed. head: NonNull<Block<T>>, - /// Next slot index to process + /// Next slot index to process. index: usize, - /// Pointer to the next block pending release + /// Pointer to the next block pending release. free_head: NonNull<Block<T>>, } +/// Return value of `Rx::try_pop`. +pub(crate) enum TryPopResult<T> { + /// Successfully popped a value. + Ok(T), + /// The channel is empty. + Empty, + /// The channel is empty and closed. + Closed, + /// The channel is not empty, but the first value is being written. + Busy, +} + pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) { // Create the initial block shared between the tx and rx halves. let initial_block = Box::new(Block::new(0)); @@ -218,7 +230,7 @@ impl<T> fmt::Debug for Tx<T> { } impl<T> Rx<T> { - /// Pops the next value off the queue + /// Pops the next value off the queue. pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> { // Advance `head`, if needed if !self.try_advancing_head() { @@ -240,6 +252,26 @@ impl<T> Rx<T> { } } + /// Pops the next value off the queue, detecting whether the block + /// is busy or empty on failure. + /// + /// This function exists because `Rx::pop` can return `None` even if the + /// channel's queue contains a message that has been completely written. + /// This can happen if the fully delivered message is behind another message + /// that is in the middle of being written to the block, since the channel + /// can't return the messages out of order. + pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> { + let tail_position = tx.tail_position.load(Acquire); + let result = self.pop(tx); + + match result { + Some(block::Read::Value(t)) => TryPopResult::Ok(t), + Some(block::Read::Closed) => TryPopResult::Closed, + None if tail_position == self.index => TryPopResult::Empty, + None => TryPopResult::Busy, + } + } + /// Tries advancing the block pointer to the block referenced by `self.index`. /// /// Returns `true` if successful, `false` if there is no next block to load. diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs index 23c80f6..8961930 100644 --- a/src/sync/mpsc/unbounded.rs +++ b/src/sync/mpsc/unbounded.rs @@ -1,6 +1,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::SendError; +use crate::sync::mpsc::error::{SendError, TryRecvError}; use std::fmt; use std::task::{Context, Poll}; @@ -129,6 +129,50 @@ impl<T> UnboundedReceiver<T> { poll_fn(|cx| self.poll_recv(cx)).await } + /// Try to receive the next value for this receiver. + /// + /// This method returns the [`Empty`] error if the channel is currently + /// empty, but there are still outstanding [senders] or [permits]. + /// + /// This method returns the [`Disconnected`] error if the channel is + /// currently empty, and there are no outstanding [senders] or [permits]. + /// + /// Unlike the [`poll_recv`] method, this method will never return an + /// [`Empty`] error spuriously. + /// + /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty + /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected + /// [`poll_recv`]: Self::poll_recv + /// [senders]: crate::sync::mpsc::Sender + /// [permits]: crate::sync::mpsc::Permit + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// use tokio::sync::mpsc::error::TryRecvError; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.send("hello").unwrap(); + /// + /// assert_eq!(Ok("hello"), rx.try_recv()); + /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + /// + /// tx.send("hello").unwrap(); + /// // Drop the last sender, closing the channel. + /// drop(tx); + /// + /// assert_eq!(Ok("hello"), rx.try_recv()); + /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); + /// } + /// ``` + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + self.chan.try_recv() + } + /// Blocking receive to call outside of asynchronous contexts. /// /// # Panics @@ -172,7 +216,7 @@ impl<T> UnboundedReceiver<T> { /// This method returns: /// /// * `Poll::Pending` if no messages are available but the channel is not - /// closed. + /// closed, or if a spurious failure happens. /// * `Poll::Ready(Some(message))` if a message is available. /// * `Poll::Ready(None)` if the channel has been closed and all messages /// sent before it was closed have been received. @@ -182,6 +226,12 @@ impl<T> UnboundedReceiver<T> { /// receiver, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the `Waker` from the `Context` passed to the most /// recent call is scheduled to receive a wakeup. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 2ea6359..74b97cc 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -8,6 +8,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::cell::UnsafeCell; use std::future::Future; @@ -391,10 +392,7 @@ impl Notify { /// } /// ``` pub fn notify_waiters(&self) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); - let mut curr_waker = 0; + let mut wakers = WakeList::new(); // There are waiters, the lock must be acquired to notify. let mut waiters = self.waiters.lock(); @@ -414,7 +412,7 @@ impl Notify { // concurrently change, as holding the lock is required to // transition **out** of `WAITING`. 'outer: loop { - while curr_waker < NUM_WAKERS { + while wakers.can_push() { match waiters.pop_back() { Some(mut waiter) => { // Safety: `waiters` lock is still held. @@ -425,8 +423,7 @@ impl Notify { waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { - wakers[curr_waker] = Some(waker); - curr_waker += 1; + wakers.push(waker); } } None => { @@ -437,11 +434,7 @@ impl Notify { drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } - - curr_waker = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -456,9 +449,7 @@ impl Notify { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } } diff --git a/src/sync/tests/loom_mpsc.rs b/src/sync/tests/loom_mpsc.rs index c12313b..f165e70 100644 --- a/src/sync/tests/loom_mpsc.rs +++ b/src/sync/tests/loom_mpsc.rs @@ -132,3 +132,59 @@ fn dropping_unbounded_tx() { assert!(v.is_none()); }); } + +#[test] +fn try_recv() { + loom::model(|| { + use crate::sync::{mpsc, Semaphore}; + use loom::sync::{Arc, Mutex}; + + const PERMITS: usize = 2; + const TASKS: usize = 2; + const CYCLES: usize = 1; + + struct Context { + sem: Arc<Semaphore>, + tx: mpsc::Sender<()>, + rx: Mutex<mpsc::Receiver<()>>, + } + + fn run(ctx: &Context) { + block_on(async { + let permit = ctx.sem.acquire().await; + assert_ok!(ctx.rx.lock().unwrap().try_recv()); + crate::task::yield_now().await; + assert_ok!(ctx.tx.clone().try_send(())); + drop(permit); + }); + } + + let (tx, rx) = mpsc::channel(PERMITS); + let sem = Arc::new(Semaphore::new(PERMITS)); + let ctx = Arc::new(Context { + sem, + tx, + rx: Mutex::new(rx), + }); + + for _ in 0..PERMITS { + assert_ok!(ctx.tx.clone().try_send(())); + } + + let mut ths = Vec::new(); + + for _ in 0..TASKS { + let ctx = ctx.clone(); + + ths.push(thread::spawn(move || { + run(&ctx); + })); + } + + run(&ctx); + + for th in ths { + th.join().unwrap(); + } + }); +} diff --git a/src/sync/watch.rs b/src/sync/watch.rs index 96d1d16..b5da218 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -123,9 +123,7 @@ pub mod error { /// Error produced when sending a value fails. #[derive(Debug)] - pub struct SendError<T> { - pub(crate) inner: T, - } + pub struct SendError<T>(pub T); // ===== impl SendError ===== @@ -165,6 +163,9 @@ mod state { /// Snapshot of the state. The first bit is used as the CLOSED bit. /// The remaining bits are used as the version. + /// + /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all + /// receivers does not set it. #[derive(Copy, Clone, Debug)] pub(super) struct StateSnapshot(usize); @@ -427,8 +428,8 @@ impl<T> Sender<T> { /// every receiver has been dropped. pub fn send(&self, value: T) -> Result<(), error::SendError<T>> { // This is pretty much only useful as a hint anyway, so synchronization isn't critical. - if 0 == self.shared.ref_count_rx.load(Relaxed) { - return Err(error::SendError { inner: value }); + if 0 == self.receiver_count() { + return Err(error::SendError(value)); } { @@ -484,7 +485,7 @@ impl<T> Sender<T> { /// assert!(tx.is_closed()); /// ``` pub fn is_closed(&self) -> bool { - self.shared.ref_count_rx.load(Relaxed) == 0 + self.receiver_count() == 0 } /// Completes when all receivers have dropped. @@ -517,23 +518,81 @@ impl<T> Sender<T> { /// } /// ``` pub async fn closed(&self) { - let notified = self.shared.notify_tx.notified(); + while self.receiver_count() > 0 { + let notified = self.shared.notify_tx.notified(); - if self.shared.ref_count_rx.load(Relaxed) == 0 { - return; - } + if self.receiver_count() == 0 { + return; + } - notified.await; - debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); + notified.await; + // The channel could have been reopened in the meantime by calling + // `subscribe`, so we loop again. + } } - cfg_signal_internal! { - pub(crate) fn subscribe(&self) -> Receiver<T> { - let shared = self.shared.clone(); - let version = shared.state.load().version(); + /// Creates a new [`Receiver`] connected to this `Sender`. + /// + /// All messages sent before this call to `subscribe` are initially marked + /// as seen by the new `Receiver`. + /// + /// This method can be called even if there are no other receivers. In this + /// case, the channel is reopened. + /// + /// # Examples + /// + /// The new channel will receive messages sent on this `Sender`. + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = watch::channel(0u64); + /// + /// tx.send(5).unwrap(); + /// + /// let rx = tx.subscribe(); + /// assert_eq!(5, *rx.borrow()); + /// + /// tx.send(10).unwrap(); + /// assert_eq!(10, *rx.borrow()); + /// } + /// ``` + /// + /// The most recent message is considered seen by the channel, so this test + /// is guaranteed to pass. + /// + /// ``` + /// use tokio::sync::watch; + /// use tokio::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = watch::channel(0u64); + /// tx.send(5).unwrap(); + /// let mut rx = tx.subscribe(); + /// + /// tokio::spawn(async move { + /// // by spawning and sleeping, the message is sent after `main` + /// // hits the call to `changed`. + /// # if false { + /// tokio::time::sleep(Duration::from_millis(10)).await; + /// # } + /// tx.send(100).unwrap(); + /// }); + /// + /// rx.changed().await.unwrap(); + /// assert_eq!(100, *rx.borrow()); + /// } + /// ``` + pub fn subscribe(&self) -> Receiver<T> { + let shared = self.shared.clone(); + let version = shared.state.load().version(); - Receiver::from_shared(version, shared) - } + // The CLOSED bit in the state tracks only whether the sender is + // dropped, so we do not need to unset it if this reopens the channel. + Receiver::from_shared(version, shared) } /// Returns the number of receivers that currently exist diff --git a/src/task/yield_now.rs b/src/task/yield_now.rs index 251cb93..5eeb46a 100644 --- a/src/task/yield_now.rs +++ b/src/task/yield_now.rs @@ -2,37 +2,58 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_rt! { - /// Yields execution back to the Tokio runtime. - /// - /// A task yields by awaiting on `yield_now()`, and may resume when that - /// future completes (with no output.) The current task will be re-added as - /// a pending task at the _back_ of the pending queue. Any other pending - /// tasks will be scheduled. No other waking is required for the task to - /// continue. - /// - /// See also the usage example in the [task module](index.html#yield_now). - #[must_use = "yield_now does nothing unless polled/`await`-ed"] - pub async fn yield_now() { - /// Yield implementation - struct YieldNow { - yielded: bool, - } - - impl Future for YieldNow { - type Output = (); +/// Yields execution back to the Tokio runtime. +/// +/// A task yields by awaiting on `yield_now()`, and may resume when that future +/// completes (with no output.) The current task will be re-added as a pending +/// task at the _back_ of the pending queue. Any other pending tasks will be +/// scheduled. No other waking is required for the task to continue. +/// +/// See also the usage example in the [task module](index.html#yield_now). +/// +/// ## Non-guarantees +/// +/// This function may not yield all the way up to the executor if there are any +/// special combinators above it in the call stack. For example, if a +/// [`tokio::select!`] has another branch complete during the same poll as the +/// `yield_now()`, then the yield is not propagated all the way up to the +/// runtime. +/// +/// It is generally not guaranteed that the runtime behaves like you expect it +/// to when deciding which task to schedule next after a call to `yield_now()`. +/// In particular, the runtime may choose to poll the task that just ran +/// `yield_now()` again immediately without polling any other tasks first. For +/// example, the runtime will not drive the IO driver between every poll of a +/// task, and this could result in the runtime polling the current task again +/// immediately even if there is another task that could make progress if that +/// other task is waiting for a notification from the IO driver. +/// +/// In general, changes to the order in which the runtime polls tasks is not +/// considered a breaking change, and your program should be correct no matter +/// which order the runtime polls your tasks in. +/// +/// [`tokio::select!`]: macro@crate::select +#[must_use = "yield_now does nothing unless polled/`await`-ed"] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub async fn yield_now() { + /// Yield implementation + struct YieldNow { + yielded: bool, + } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.yielded { - return Poll::Ready(()); - } + impl Future for YieldNow { + type Output = (); - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.yielded { + return Poll::Ready(()); } - } - YieldNow { yielded: false }.await + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } } + + YieldNow { yielded: false }.await } diff --git a/src/time/clock.rs b/src/time/clock.rs index a44d75f..fae5c76 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -64,15 +64,24 @@ cfg_test_util! { /// Pause time /// /// The current value of `Instant::now()` is saved and all subsequent calls - /// to `Instant::now()` until the timer wheel is checked again will return - /// the saved value. Once the timer wheel is checked, time will immediately - /// advance to the next registered `Sleep`. This is useful for running tests - /// that depend on time. + /// to `Instant::now()` will return the saved value. The saved value can be + /// changed by [`advance`] or by the time auto-advancing once the runtime + /// has no work to do. This only affects the `Instant` type in Tokio, and + /// the `Instant` in std continues to work as normal. /// /// Pausing time requires the `current_thread` Tokio runtime. This is the /// default runtime used by `#[tokio::test]`. The runtime can be initialized /// with time in a paused state using the `Builder::start_paused` method. /// + /// For cases where time is immediately paused, it is better to pause + /// the time using the `main` or `test` macro: + /// ``` + /// #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// println!("Hello world"); + /// } + /// ``` + /// /// # Panics /// /// Panics if time is already frozen or if called from outside of a @@ -86,6 +95,7 @@ cfg_test_util! { /// current time when awaited. /// /// [`Sleep`]: crate::time::Sleep + /// [`advance`]: crate::time::advance pub fn pause() { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); clock.pause(); @@ -111,11 +121,25 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Advance time + /// Advance time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent /// calls to `Instant::now()` will return the result of the increment. /// + /// This function will make the current time jump forward by the given + /// duration in one jump. This means that all `sleep` calls with a deadline + /// before the new time will immediately complete "at the same time", and + /// the runtime is free to poll them in any order. Additionally, this + /// method will not wait for the `sleep` calls it advanced past to complete. + /// If you want to do that, you should instead call [`sleep`] and rely on + /// the runtime's auto-advance feature. + /// + /// Note that calls to `sleep` are not guaranteed to complete the first time + /// they are polled after a call to `advance`. For example, this can happen + /// if the runtime has not yet touched the timer driver after the call to + /// `advance`. However if they don't, the runtime will poll the task again + /// shortly. + /// /// # Panics /// /// Panics if time is not frozen or if called from outside of the Tokio @@ -126,6 +150,8 @@ cfg_test_util! { /// If the time is paused and there is no work to do, the runtime advances /// time to the next timer. See [`pause`](pause#auto-advance) for more /// details. + /// + /// [`sleep`]: fn@crate::time::sleep pub async fn advance(duration: Duration) { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); clock.advance(duration); diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 37d2231..f611fbb 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -288,13 +288,21 @@ impl Handle { self.process_at_time(now) } - pub(self) fn process_at_time(&self, now: u64) { + pub(self) fn process_at_time(&self, mut now: u64) { let mut waker_list: [Option<Waker>; 32] = Default::default(); let mut waker_idx = 0; let mut lock = self.get().lock(); - assert!(now >= lock.elapsed); + if now < lock.elapsed { + // Time went backwards! This normally shouldn't happen as the Rust language + // guarantees that an Instant is monotonic, but can happen when running + // Linux in a VM on a Windows host due to std incorrectly trusting the + // hardware clock to be monotonic. + // + // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. + now = lock.elapsed; + } while let Some(entry) = lock.wheel.poll(now) { debug_assert!(unsafe { entry.is_pending() }); diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs index 40f745a..4e9ed65 100644 --- a/src/time/driver/sleep.rs +++ b/src/time/driver/sleep.rs @@ -12,10 +12,31 @@ use std::task::{self, Poll}; /// operates at millisecond granularity and should not be used for tasks that /// require high-resolution timers. /// +/// To run something regularly on a schedule, see [`interval`]. +/// /// # Cancellation /// /// Canceling a sleep instance is done by dropping the returned future. No additional /// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep_until, Instant, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep_until(Instant::now() + Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// [`Sleep`]: struct@crate::time::Sleep +/// [`interval`]: crate::time::interval() // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "delay_until"))] pub fn sleep_until(deadline: Instant) -> Sleep { @@ -54,6 +75,9 @@ pub fn sleep_until(deadline: Instant) -> Sleep { /// } /// ``` /// +/// See the documentation for the [`Sleep`] type for more examples. +/// +/// [`Sleep`]: struct@crate::time::Sleep /// [`interval`]: crate::time::interval() // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "delay_for"))] @@ -216,6 +240,8 @@ impl Sleep { /// # } /// ``` /// + /// See also the top-level examples. + /// /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut pub fn reset(self: Pin<&mut Self>, deadline: Instant) { let me = self.project(); diff --git a/src/util/mod.rs b/src/util/mod.rs index 9065f50..df30f2b 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -4,6 +4,29 @@ cfg_io_driver! { } #[cfg(any( + // io driver uses `WakeList` directly + feature = "net", + feature = "process", + // `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`. + feature = "sync", + // `fs` uses `batch_semaphore`, which requires `WakeList`. + feature = "fs", + // rt and signal use `Notify`, which requires `WakeList`. + feature = "rt", + feature = "signal", +))] +mod wake_list; +#[cfg(any( + feature = "net", + feature = "process", + feature = "sync", + feature = "fs", + feature = "rt", + feature = "signal", +))] +pub(crate) use wake_list::WakeList; + +#[cfg(any( feature = "fs", feature = "net", feature = "process", diff --git a/src/util/trace.rs b/src/util/trace.rs index c51a5a7..61c155c 100644 --- a/src/util/trace.rs +++ b/src/util/trace.rs @@ -11,17 +11,17 @@ cfg_trace! { #[cfg(tokio_track_caller)] let span = tracing::trace_span!( target: "tokio::task", - "task", + "runtime.spawn", %kind, + task.name = %name.unwrap_or_default(), spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), - task.name = %name.unwrap_or_default() ); #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", - "task", + "runtime.spawn", %kind, - task.name = %name.unwrap_or_default() + task.name = %name.unwrap_or_default(), ); task.instrument(span) } diff --git a/src/util/wake_list.rs b/src/util/wake_list.rs new file mode 100644 index 0000000..aa569dd --- /dev/null +++ b/src/util/wake_list.rs @@ -0,0 +1,53 @@ +use core::mem::MaybeUninit; +use core::ptr; +use std::task::Waker; + +const NUM_WAKERS: usize = 32; + +pub(crate) struct WakeList { + inner: [MaybeUninit<Waker>; NUM_WAKERS], + curr: usize, +} + +impl WakeList { + pub(crate) fn new() -> Self { + Self { + inner: unsafe { + // safety: Create an uninitialized array of `MaybeUninit`. The + // `assume_init` is safe because the type we are claiming to + // have initialized here is a bunch of `MaybeUninit`s, which do + // not require initialization. + MaybeUninit::uninit().assume_init() + }, + curr: 0, + } + } + + #[inline] + pub(crate) fn can_push(&self) -> bool { + self.curr < NUM_WAKERS + } + + pub(crate) fn push(&mut self, val: Waker) { + debug_assert!(self.can_push()); + + self.inner[self.curr] = MaybeUninit::new(val); + self.curr += 1; + } + + pub(crate) fn wake_all(&mut self) { + assert!(self.curr <= NUM_WAKERS); + while self.curr > 0 { + self.curr -= 1; + let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) }; + waker.wake(); + } + } +} + +impl Drop for WakeList { + fn drop(&mut self) { + let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr); + unsafe { ptr::drop_in_place(slice) }; + } +} diff --git a/tests/fs_file.rs b/tests/fs_file.rs index bf2f1d7..f645e61 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -1,12 +1,11 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio_test::task; - use std::io::prelude::*; use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio_test::task; const HELLO: &[u8] = b"hello world..."; @@ -51,6 +50,19 @@ async fn basic_write_and_shutdown() { } #[tokio::test] +async fn rewind_seek_position() { + let tempfile = tempfile(); + + let mut file = File::create(tempfile.path()).await.unwrap(); + + file.seek(SeekFrom::Current(10)).await.unwrap(); + + file.rewind().await.unwrap(); + + assert_eq!(file.stream_position().await.unwrap(), 0); +} + +#[tokio::test] async fn coop() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs index dc21e42..5a6875e 100644 --- a/tests/io_async_fd.rs +++ b/tests/io_async_fd.rs @@ -15,7 +15,7 @@ use std::{ use nix::unistd::{close, read, write}; -use futures::{poll, FutureExt}; +use futures::poll; use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; use tokio_test::{assert_err, assert_pending}; @@ -163,10 +163,11 @@ async fn initially_writable() { afd_a.writable().await.unwrap().clear_ready(); afd_b.writable().await.unwrap().clear_ready(); - futures::select_biased! { - _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {}, - _ = afd_a.readable().fuse() => panic!("Unexpected readable state"), - _ = afd_b.readable().fuse() => panic!("Unexpected readable state"), + tokio::select! { + biased; + _ = tokio::time::sleep(Duration::from_millis(10)) => {}, + _ = afd_a.readable() => panic!("Unexpected readable state"), + _ = afd_b.readable() => panic!("Unexpected readable state"), } } @@ -353,12 +354,13 @@ async fn multiple_waiters() { futures::future::pending::<()>().await; }; - futures::select_biased! { - guard = afd_a.readable().fuse() => { + tokio::select! { + biased; + guard = afd_a.readable() => { tokio::task::yield_now().await; guard.unwrap().clear_ready() }, - _ = notify_barrier.fuse() => unreachable!(), + _ = notify_barrier => unreachable!(), } std::mem::drop(afd_a); diff --git a/tests/io_fill_buf.rs b/tests/io_fill_buf.rs new file mode 100644 index 0000000..0b2ebd7 --- /dev/null +++ b/tests/io_fill_buf.rs @@ -0,0 +1,34 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_test::assert_ok; + +#[tokio::test] +async fn fill_buf_file() { + let file = NamedTempFile::new().unwrap(); + + assert_ok!(std::fs::write(file.path(), b"hello")); + + let file = assert_ok!(File::open(file.path()).await); + let mut file = BufReader::new(file); + + let mut contents = Vec::new(); + + loop { + let consumed = { + let buffer = assert_ok!(file.fill_buf().await); + if buffer.is_empty() { + break; + } + contents.extend_from_slice(buffer); + buffer.len() + }; + + file.consume(consumed); + } + + assert_eq!(contents, b"hello"); +} diff --git a/tests/io_poll_aio.rs b/tests/io_poll_aio.rs new file mode 100644 index 0000000..f044af5 --- /dev/null +++ b/tests/io_poll_aio.rs @@ -0,0 +1,375 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(target_os = "freebsd", feature = "net"))] + +use mio_aio::{AioCb, AioFsyncMode, LioCb}; +use std::{ + future::Future, + mem, + os::unix::io::{AsRawFd, RawFd}, + pin::Pin, + task::{Context, Poll}, +}; +use tempfile::tempfile; +use tokio::io::bsd::{Aio, AioSource}; +use tokio_test::assert_pending; + +mod aio { + use super::*; + + /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource + struct WrappedAioCb<'a>(AioCb<'a>); + impl<'a> AioSource for WrappedAioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude implementation of an AIO-based future + struct FsyncFut(Aio<WrappedAioCb<'static>>); + + impl Future for FsyncFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let result = (*self.0).0.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + /// Low-level AIO Source + /// + /// An example bypassing mio_aio and Nix to demonstrate how the kevent + /// registration actually works, under the hood. + struct LlSource(Pin<Box<libc::aiocb>>); + + impl AioSource for LlSource { + fn register(&mut self, kq: RawFd, token: usize) { + let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + sev.sigev_notify = libc::SIGEV_KEVENT; + sev.sigev_signo = kq; + sev.sigev_value = libc::sigval { + sival_ptr: token as *mut libc::c_void, + }; + self.0.aio_sigevent = sev; + } + + fn deregister(&mut self) { + unsafe { + self.0.aio_sigevent = mem::zeroed(); + } + } + } + + struct LlFut(Aio<LlSource>); + + impl Future for LlFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; + assert_eq!(0, r); + Poll::Ready(Ok(())) + } + } + } + } + + /// A very simple object that can implement AioSource and can be reused. + /// + /// mio_aio normally assumes that each AioCb will be consumed on completion. + /// This somewhat contrived example shows how an Aio object can be reused + /// anyway. + struct ReusableFsyncSource { + aiocb: Pin<Box<AioCb<'static>>>, + fd: RawFd, + token: usize, + } + impl ReusableFsyncSource { + fn fsync(&mut self) { + self.aiocb.register_raw(self.fd, self.token); + self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); + } + fn new(aiocb: AioCb<'static>) -> Self { + ReusableFsyncSource { + aiocb: Box::pin(aiocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, aiocb: AioCb<'static>) { + self.aiocb = Box::pin(aiocb); + } + } + impl AioSource for ReusableFsyncSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + + struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>); + impl<'a> Future for ReusableFsyncFut<'a> { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let result = (*self.0).aiocb.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + #[tokio::test] + async fn fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb = AioCb::from_fd(fd, 0); + let source = WrappedAioCb(aiocb); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); + let fut = FsyncFut(poll_aio); + fut.await.unwrap(); + } + + #[tokio::test] + async fn ll_fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + aiocb.aio_fildes = fd; + let source = LlSource(Box::pin(aiocb)); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + let r = unsafe { + let p = (*poll_aio).0.as_mut().get_unchecked_mut(); + libc::aio_fsync(libc::O_SYNC, p) + }; + assert_eq!(0, r); + let fut = LlFut(poll_aio); + fut.await.unwrap(); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb0 = AioCb::from_fd(fd, 0); + let source = ReusableFsyncSource::new(aiocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.fsync(); + let fut0 = ReusableFsyncFut(&mut poll_aio); + fut0.await.unwrap(); + + let aiocb1 = AioCb::from_fd(fd, 0); + poll_aio.reset(aiocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.fsync(); + let fut1 = ReusableFsyncFut(&mut poll_aio); + fut1.await.unwrap(); + } +} + +mod lio { + use super::*; + + struct WrappedLioCb<'a>(LioCb<'a>); + impl<'a> AioSource for WrappedLioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude lio_listio-based Future + struct LioFut(Option<Aio<WrappedLioCb<'static>>>); + + impl Future for LioFut { + type Output = std::io::Result<Vec<isize>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.as_mut().unwrap().poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// Minimal example demonstrating reuse of an Aio object with lio + /// readiness. mio_aio::LioCb actually does something similar under the + /// hood. + struct ReusableLioSource { + liocb: Option<LioCb<'static>>, + fd: RawFd, + token: usize, + } + impl ReusableLioSource { + fn new(liocb: LioCb<'static>) -> Self { + ReusableLioSource { + liocb: Some(liocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, liocb: LioCb<'static>) { + self.liocb = Some(liocb); + } + fn submit(&mut self) { + self.liocb + .as_mut() + .unwrap() + .register_raw(self.fd, self.token); + self.liocb.as_mut().unwrap().submit().unwrap(); + } + } + impl AioSource for ReusableLioSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>); + impl<'a> Future for ReusableLioFut<'a> { + type Output = std::io::Result<Vec<isize>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let r = (*self.0).liocb.take().unwrap().into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// An lio_listio operation with one write element + #[tokio::test] + async fn onewrite() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder = mio_aio::LioCbBuilder::with_capacity(1); + builder = builder.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb = builder.finish(); + let source = WrappedLioCb(liocb); + let mut poll_aio = Aio::new_for_lio(source).unwrap(); + + // Send the operation to the kernel + (*poll_aio).0.submit().unwrap(); + let fut = LioFut(Some(poll_aio)); + let v = fut.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); + builder0 = builder0.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb0 = builder0.finish(); + let source = ReusableLioSource::new(liocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.submit(); + let fut0 = ReusableLioFut(&mut poll_aio); + let v = fut0.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + + // Now reuse the same Aio + let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); + builder1 = builder1.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb1 = builder1.finish(); + poll_aio.reset(liocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.submit(); + let fut1 = ReusableLioFut(&mut poll_aio); + let v = fut1.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } +} diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs index 00f5c6d..658e4ad 100644 --- a/tests/process_kill_on_drop.rs +++ b/tests/process_kill_on_drop.rs @@ -1,6 +1,7 @@ #![cfg(all(unix, feature = "process"))] #![warn(rust_2018_idioms)] +use std::io::ErrorKind; use std::process::Stdio; use std::time::Duration; use tokio::io::AsyncReadExt; @@ -24,11 +25,12 @@ async fn kill_on_drop() { ", ]); - let mut child = cmd - .kill_on_drop(true) - .stdout(Stdio::piped()) - .spawn() - .unwrap(); + let e = cmd.kill_on_drop(true).stdout(Stdio::piped()).spawn(); + if e.is_err() && e.as_ref().unwrap_err().kind() == ErrorKind::NotFound { + println!("bash not available; skipping test"); + return; + } + let mut child = e.unwrap(); sleep(Duration::from_secs(2)).await; diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index cd43ad4..1947d26 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -5,7 +5,7 @@ use std::thread; use tokio::runtime::Runtime; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -328,6 +328,27 @@ async fn try_send_fail() { } #[tokio::test] +async fn try_send_fail_with_try_recv() { + let (tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + assert_eq!(rx.try_recv(), Ok("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + assert_eq!(rx.try_recv(), Ok("goodbye")); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[tokio::test] async fn try_reserve_fails() { let (tx, mut rx) = mpsc::channel(1); @@ -389,13 +410,15 @@ fn dropping_rx_closes_channel_for_try() { drop(rx); - { - let err = assert_err!(tx.try_send(msg.clone())); - match err { - TrySendError::Closed(..) => {} - _ => panic!(), - } - } + assert!(matches!( + tx.try_send(msg.clone()), + Err(TrySendError::Closed(_)) + )); + assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_)))); + assert!(matches!( + tx.try_reserve_owned(), + Err(TrySendError::Closed(_)) + )); assert_eq!(1, Arc::strong_count(&msg)); } @@ -494,3 +517,83 @@ async fn permit_available_not_acquired_close() { drop(permit2); assert!(rx.recv().await.is_none()); } + +#[test] +fn try_recv_bounded() { + let (tx, mut rx) = mpsc::channel(5); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert_eq!(Ok("hello"), rx.try_recv()); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + drop(tx); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_unbounded() { + for num in 0..100 { + let (tx, mut rx) = mpsc::unbounded_channel(); + + for i in 0..num { + tx.send(i).unwrap(); + } + + for i in 0..num { + assert_eq!(rx.try_recv(), Ok(i)); + } + + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } +} + +#[test] +fn try_recv_close_while_empty_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(5); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_close_while_empty_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs index a2a276d..b7bbaf7 100644 --- a/tests/sync_watch.rs +++ b/tests/sync_watch.rs @@ -186,3 +186,18 @@ fn borrow_and_update() { assert_eq!(*rx.borrow_and_update(), "three"); assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); } + +#[test] +fn reopened_after_subscribe() { + let (tx, rx) = watch::channel("one"); + assert!(!tx.is_closed()); + + drop(rx); + assert!(tx.is_closed()); + + let rx = tx.subscribe(); + assert!(!tx.is_closed()); + + drop(rx); + assert!(tx.is_closed()); +} |