diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-11-25 20:05:29 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-11-25 20:05:29 +0000 |
commit | 2f0421fc344db1fc129b5ece09f6c9b662648c2e (patch) | |
tree | 7c474e5234417d4e078c4a115ce27112bf34c301 /src/io | |
parent | 6c9a00a78b690ca3f86b75d07c5270e21facad62 (diff) | |
parent | b8563af5d8e399995ad8a5d01bf40ee260885b32 (diff) | |
download | tokio-android12-mainline-mediaprovider-release.tar.gz |
Snap for 7947225 from b8563af5d8e399995ad8a5d01bf40ee260885b32 to mainline-mediaprovider-releaseandroid-mainline-12.0.0_r90android-mainline-12.0.0_r76android-mainline-12.0.0_r121android-mainline-12.0.0_r106aml_mpr_311911090android12-mainline-mediaprovider-release
Change-Id: I1319ffb9800fb9ba5c4f6e30ce14f5fe2ae7153b
Diffstat (limited to 'src/io')
34 files changed, 1392 insertions, 142 deletions
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs index 5a68d30..9ec5b7f 100644 --- a/src/io/async_fd.rs +++ b/src/io/async_fd.rs @@ -205,13 +205,13 @@ impl<T: AsRawFd> AsyncFd<T> { }) } - /// Returns a shared reference to the backing object of this [`AsyncFd`] + /// Returns a shared reference to the backing object of this [`AsyncFd`]. #[inline] pub fn get_ref(&self) -> &T { self.inner.as_ref().unwrap() } - /// Returns a mutable reference to the backing object of this [`AsyncFd`] + /// Returns a mutable reference to the backing object of this [`AsyncFd`]. #[inline] pub fn get_mut(&mut self) -> &mut T { self.inner.as_mut().unwrap() @@ -540,6 +540,16 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { result => Ok(result), } } + + /// Returns a shared reference to the inner [`AsyncFd`]. + pub fn get_ref(&self) -> &AsyncFd<Inner> { + self.async_fd + } + + /// Returns a shared reference to the backing object of the inner [`AsyncFd`]. + pub fn get_inner(&self) -> &Inner { + self.get_ref().get_ref() + } } impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> { @@ -601,6 +611,26 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> { result => Ok(result), } } + + /// Returns a shared reference to the inner [`AsyncFd`]. + pub fn get_ref(&self) -> &AsyncFd<Inner> { + self.async_fd + } + + /// Returns a mutable reference to the inner [`AsyncFd`]. + pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> { + self.async_fd + } + + /// Returns a shared reference to the backing object of the inner [`AsyncFd`]. + pub fn get_inner(&self) -> &Inner { + self.get_ref().get_ref() + } + + /// Returns a mutable reference to the backing object of the inner [`AsyncFd`]. + pub fn get_inner_mut(&mut self) -> &mut Inner { + self.get_mut().get_mut() + } } impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> { diff --git a/src/io/async_write.rs b/src/io/async_write.rs index 569fb9c..7ec1a30 100644 --- a/src/io/async_write.rs +++ b/src/io/async_write.rs @@ -45,7 +45,11 @@ use std::task::{Context, Poll}; pub trait AsyncWrite { /// Attempt to write bytes from `buf` into the object. /// - /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. If successful, + /// then it must be guaranteed that `n <= buf.len()`. A return value of `0` + /// typically means that the underlying object is no longer able to accept + /// bytes and will likely not be able to in the future as well, or that the + /// buffer provided is empty. /// /// If the object is not ready for writing, the method returns /// `Poll::Pending` and arranges for the current task (via diff --git a/src/io/blocking.rs b/src/io/blocking.rs index 94a3484..1d79ee7 100644 --- a/src/io/blocking.rs +++ b/src/io/blocking.rs @@ -16,7 +16,7 @@ use self::State::*; pub(crate) struct Blocking<T> { inner: Option<T>, state: State<T>, - /// `true` if the lower IO layer needs flushing + /// `true` if the lower IO layer needs flushing. need_flush: bool, } @@ -175,7 +175,7 @@ where } } -/// Repeats operations that are interrupted +/// Repeats operations that are interrupted. macro_rules! uninterruptibly { ($e:expr) => {{ loop { diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs new file mode 100644 index 0000000..f1ac4b2 --- /dev/null +++ b/src/io/bsd/poll_aio.rs @@ -0,0 +1,195 @@ +//! Use POSIX AIO futures with Tokio. + +use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; +use mio::event::Source; +use mio::Registry; +use mio::Token; +use std::fmt; +use std::io; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::AsRawFd; +use std::os::unix::prelude::RawFd; +use std::task::{Context, Poll}; + +/// Like [`mio::event::Source`], but for POSIX AIO only. +/// +/// Tokio's consumer must pass an implementor of this trait to create a +/// [`Aio`] object. +pub trait AioSource { + /// Registers this AIO event source with Tokio's reactor. + fn register(&mut self, kq: RawFd, token: usize); + + /// Deregisters this AIO event source with Tokio's reactor. + fn deregister(&mut self); +} + +/// Wraps the user's AioSource in order to implement mio::event::Source, which +/// is what the rest of the crate wants. +struct MioSource<T>(T); + +impl<T: AioSource> Source for MioSource<T> { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + self.0.deregister(); + Ok(()) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } +} + +/// Associates a POSIX AIO control block with the reactor that drives it. +/// +/// `Aio`'s wrapped type must implement [`AioSource`] to be driven +/// by the reactor. +/// +/// The wrapped source may be accessed through the `Aio` via the `Deref` and +/// `DerefMut` traits. +/// +/// ## Clearing readiness +/// +/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the +/// Source is not completely ready and must return to the Pending state, +/// [`Aio::clear_ready`] may be used. This can be useful with +/// [`lio_listio`], which may generate a kevent when only a portion of the +/// operations have completed. +/// +/// ## Platforms +/// +/// Only FreeBSD implements POSIX AIO with kqueue notification, so +/// `Aio` is only available for that operating system. +/// +/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html +// Note: Unlike every other kqueue event source, POSIX AIO registers events not +// via kevent(2) but when the aiocb is submitted to the kernel via aio_read, +// aio_write, etc. It needs the kqueue's file descriptor to do that. So +// AsyncFd can't be used for POSIX AIO. +// +// Note that Aio doesn't implement Drop. There's no need. Unlike other +// kqueue sources, simply dropping the object effectively deregisters it. +pub struct Aio<E> { + io: MioSource<E>, + registration: Registration, +} + +// ===== impl Aio ===== + +impl<E: AioSource> Aio<E> { + /// Creates a new `Aio` suitable for use with POSIX AIO functions. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn new_for_aio(io: E) -> io::Result<Self> { + Self::new_with_interest(io, Interest::AIO) + } + + /// Creates a new `Aio` suitable for use with [`lio_listio`]. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn new_for_lio(io: E) -> io::Result<Self> { + Self::new_with_interest(io, Interest::LIO) + } + + fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { + let mut io = MioSource(io); + let handle = Handle::current(); + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; + Ok(Self { io, registration }) + } + + /// Indicates to Tokio that the source is no longer ready. The internal + /// readiness flag will be cleared, and tokio will wait for the next + /// edge-triggered readiness notification from the OS. + /// + /// It is critical that this method not be called unless your code + /// _actually observes_ that the source is _not_ ready. The OS must + /// deliver a subsequent notification, or this source will block + /// forever. It is equally critical that you `do` call this method if you + /// resubmit the same structure to the kernel and poll it again. + /// + /// This method is not very useful with AIO readiness, since each `aiocb` + /// structure is typically only used once. It's main use with + /// [`lio_listio`], which will sometimes send notification when only a + /// portion of its elements are complete. In that case, the caller must + /// call `clear_ready` before resubmitting it. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn clear_ready(&self, ev: AioEvent) { + self.registration.clear_readiness(ev.0) + } + + /// Destroy the [`Aio`] and return its inner source. + pub fn into_inner(self) -> E { + self.io.0 + } + + /// Polls for readiness. Either AIO or LIO counts. + /// + /// This method returns: + /// * `Poll::Pending` if the underlying operation is not complete, whether + /// or not it completed successfully. This will be true if the OS is + /// still processing it, or if it has not yet been submitted to the OS. + /// * `Poll::Ready(Ok(_))` if the underlying operation is complete. + /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does + /// _not_ indicate that the underlying operation encountered an error. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` + /// is scheduled to receive a wakeup when the underlying operation + /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the + /// `Context` passed to the most recent call is scheduled to receive a wakeup. + pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> { + let ev = ready!(self.registration.poll_read_ready(cx))?; + Poll::Ready(Ok(AioEvent(ev))) + } +} + +impl<E: AioSource> Deref for Aio<E> { + type Target = E; + + fn deref(&self) -> &E { + &self.io.0 + } +} + +impl<E: AioSource> DerefMut for Aio<E> { + fn deref_mut(&mut self) -> &mut E { + &mut self.io.0 + } +} + +impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Aio").field("io", &self.io.0).finish() + } +} + +/// Opaque data returned by [`Aio::poll_ready`]. +/// +/// It can be fed back to [`Aio::clear_ready`]. +#[derive(Debug)] +pub struct AioEvent(ReadyEvent); diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs index 9eead08..d6b46df 100644 --- a/src/io/driver/interest.rs +++ b/src/io/driver/interest.rs @@ -5,7 +5,7 @@ use crate::io::driver::Ready; use std::fmt; use std::ops; -/// Readiness event interest +/// Readiness event interest. /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. @@ -14,12 +14,32 @@ use std::ops; pub struct Interest(mio::Interest); impl Interest { + // The non-FreeBSD definitions in this block are active only when + // building documentation. + cfg_aio! { + /// Interest for POSIX AIO. + #[cfg(target_os = "freebsd")] + pub const AIO: Interest = Interest(mio::Interest::AIO); + + /// Interest for POSIX AIO. + #[cfg(not(target_os = "freebsd"))] + pub const AIO: Interest = Interest(mio::Interest::READABLE); + + /// Interest for POSIX AIO lio_listio events. + #[cfg(target_os = "freebsd")] + pub const LIO: Interest = Interest(mio::Interest::LIO); + + /// Interest for POSIX AIO lio_listio events. + #[cfg(not(target_os = "freebsd"))] + pub const LIO: Interest = Interest(mio::Interest::READABLE); + } + /// Interest in all readable events. /// /// Readable interest includes read-closed events. pub const READABLE: Interest = Interest(mio::Interest::READABLE); - /// Interest in all writable events + /// Interest in all writable events. /// /// Writable interest includes write-closed events. pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); @@ -58,7 +78,7 @@ impl Interest { self.0.is_writable() } - /// Add together two `Interst` values. + /// Add together two `Interest` values. /// /// This function works from a `const` context. /// diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index fa2d420..19f67a2 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -23,10 +23,10 @@ use std::io; use std::sync::{Arc, Weak}; use std::time::Duration; -/// I/O driver, backed by Mio +/// I/O driver, backed by Mio. pub(crate) struct Driver { /// Tracks the number of times `turn` is called. It is safe for this to wrap - /// as it is mostly used to determine when to call `compact()` + /// as it is mostly used to determine when to call `compact()`. tick: u8, /// Reuse the `mio::Events` value across calls to poll. @@ -35,22 +35,23 @@ pub(crate) struct Driver { /// Primary slab handle containing the state for each resource registered /// with this driver. During Drop this is moved into the Inner structure, so /// this is an Option to allow it to be vacated (until Drop this is always - /// Some) + /// Some). resources: Option<Slab<ScheduledIo>>, - /// The system event queue + /// The system event queue. poll: mio::Poll, /// State shared between the reactor and the handles. inner: Arc<Inner>, } -/// A reference to an I/O driver +/// A reference to an I/O driver. #[derive(Clone)] pub(crate) struct Handle { inner: Weak<Inner>, } +#[derive(Debug)] pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, @@ -65,13 +66,13 @@ pub(super) struct Inner { /// without risking new ones being registered in the meantime. resources: Mutex<Option<Slab<ScheduledIo>>>, - /// Registers I/O resources + /// Registers I/O resources. registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. pub(super) io_dispatch: slab::Allocator<ScheduledIo>, - /// Used to wake up the reactor from a call to `turn` + /// Used to wake up the reactor from a call to `turn`. waker: mio::Waker, } @@ -96,7 +97,7 @@ const ADDRESS: bit::Pack = bit::Pack::least_significant(24); // // The generation prevents a race condition where a slab slot is reused for a // new socket while the I/O driver is about to apply a readiness event. The -// generaton value is checked when setting new readiness. If the generation do +// generation value is checked when setting new readiness. If the generation do // not match, then the readiness event is discarded. const GENERATION: bit::Pack = ADDRESS.then(7); @@ -252,7 +253,7 @@ impl fmt::Debug for Driver { cfg_rt! { impl Handle { - /// Returns a handle to the current reactor + /// Returns a handle to the current reactor. /// /// # Panics /// @@ -266,14 +267,14 @@ cfg_rt! { cfg_not_rt! { impl Handle { - /// Returns a handle to the current reactor + /// Returns a handle to the current reactor. /// /// # Panics /// /// This function panics if there is no current reactor set, or if the `rt` /// feature flag is not enabled. pub(super) fn current() -> Self { - panic!(crate::util::error::CONTEXT_MISSING_ERROR) + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs index 2ac01bd..2430d30 100644 --- a/src/io/driver/ready.rs +++ b/src/io/driver/ready.rs @@ -38,6 +38,17 @@ impl Ready { pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + if event.is_readable() { ready |= Ready::READABLE; } @@ -57,7 +68,7 @@ impl Ready { ready } - /// Returns true if `Ready` is the empty set + /// Returns true if `Ready` is the empty set. /// /// # Examples /// @@ -71,7 +82,7 @@ impl Ready { self == Ready::EMPTY } - /// Returns `true` if the value includes `readable` + /// Returns `true` if the value includes `readable`. /// /// # Examples /// @@ -87,7 +98,7 @@ impl Ready { self.contains(Ready::READABLE) || self.is_read_closed() } - /// Returns `true` if the value includes writable `readiness` + /// Returns `true` if the value includes writable `readiness`. /// /// # Examples /// @@ -103,7 +114,7 @@ impl Ready { self.contains(Ready::WRITABLE) || self.is_write_closed() } - /// Returns `true` if the value includes read-closed `readiness` + /// Returns `true` if the value includes read-closed `readiness`. /// /// # Examples /// @@ -118,7 +129,7 @@ impl Ready { self.contains(Ready::READ_CLOSED) } - /// Returns `true` if the value includes write-closed `readiness` + /// Returns `true` if the value includes write-closed `readiness`. /// /// # Examples /// @@ -143,7 +154,7 @@ impl Ready { (self & other) == other } - /// Create a `Ready` instance using the given `usize` representation. + /// Creates a `Ready` instance using the given `usize` representation. /// /// The `usize` representation must have been obtained from a call to /// `Readiness::as_usize`. diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs index 8251fe6..7350be6 100644 --- a/src/io/driver/registration.rs +++ b/src/io/driver/registration.rs @@ -14,8 +14,9 @@ cfg_io_driver! { /// that it will receive task notifications on readiness. This is the lowest /// level API for integrating with a reactor. /// - /// The association between an I/O resource is made by calling [`new`]. Once - /// the association is established, it remains established until the + /// The association between an I/O resource is made by calling + /// [`new_with_interest_and_handle`]. + /// Once the association is established, it remains established until the /// registration instance is dropped. /// /// A registration instance represents two separate readiness streams. One @@ -36,7 +37,7 @@ cfg_io_driver! { /// stream. The write readiness event stream is only for `Ready::writable()` /// events. /// - /// [`new`]: method@Self::new + /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs index 2626b40..76f9343 100644 --- a/src/io/driver/scheduled_io.rs +++ b/src/io/driver/scheduled_io.rs @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; use crate::util::slab::Entry; +use crate::util::WakeList; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; @@ -35,16 +36,16 @@ cfg_io_readiness! { #[derive(Debug, Default)] struct Waiters { #[cfg(feature = "net")] - /// List of all current waiters + /// List of all current waiters. list: WaitList, - /// Waker used for AsyncRead + /// Waker used for AsyncRead. reader: Option<Waker>, - /// Waker used for AsyncWrite + /// Waker used for AsyncWrite. writer: Option<Waker>, - /// True if this ScheduledIo has been killed due to IO driver shutdown + /// True if this ScheduledIo has been killed due to IO driver shutdown. is_shutdown: bool, } @@ -53,19 +54,19 @@ cfg_io_readiness! { struct Waiter { pointers: linked_list::Pointers<Waiter>, - /// The waker for this task + /// The waker for this task. waker: Option<Waker>, - /// The interest this waiter is waiting on + /// The interest this waiter is waiting on. interest: Interest, is_ready: bool, - /// Should never be `!Unpin` + /// Should never be `!Unpin`. _p: PhantomPinned, } - /// Future returned by `readiness()` + /// Future returned by `readiness()`. struct Readiness<'a> { scheduled_io: &'a ScheduledIo, @@ -84,9 +85,9 @@ cfg_io_readiness! { // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. // -// | reserved | generation | driver tick | readinesss | -// |----------+------------+--------------+------------| -// | 1 bit | 7 bits + 8 bits + 16 bits | +// | reserved | generation | driver tick | readiness | +// |----------+------------+--------------+-----------| +// | 1 bit | 7 bits + 8 bits + 16 bits | const READINESS: bit::Pack = bit::Pack::least_significant(16); @@ -212,10 +213,7 @@ impl ScheduledIo { } fn wake0(&self, ready: Ready, shutdown: bool) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); - let mut curr = 0; + let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); @@ -224,16 +222,14 @@ impl ScheduledIo { // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } // check for AsyncWrite slot if ready.is_writable() { if let Some(waker) = waiters.writer.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } @@ -241,15 +237,14 @@ impl ScheduledIo { 'outer: loop { let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - while curr < NUM_WAKERS { + while wakers.can_push() { match iter.next() { Some(waiter) => { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { waiter.is_ready = true; - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } None => { @@ -260,11 +255,7 @@ impl ScheduledIo { drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } - - curr = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -273,9 +264,7 @@ impl ScheduledIo { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { @@ -287,7 +276,7 @@ impl ScheduledIo { } } - /// Poll version of checking readiness for a certain direction. + /// Polls for readiness events in a given direction. /// /// These are to support `AsyncRead` and `AsyncWrite` polling methods, /// which cannot use the `async fn` version. This uses reserved reader @@ -374,7 +363,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { - /// An async version of `poll_readiness` which uses a linked list of wakers + /// An async version of `poll_readiness` which uses a linked list of wakers. pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { self.readiness_fut(interest).await } diff --git a/src/io/mod.rs b/src/io/mod.rs index 14a4a63..cfdda61 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -217,6 +217,15 @@ cfg_io_driver_impl! { pub(crate) use poll_evented::PollEvented; } +cfg_aio! { + /// BSD-specific I/O types. + pub mod bsd { + mod poll_aio; + + pub use poll_aio::{Aio, AioEvent, AioSource}; + } +} + cfg_net_unix! { mod async_fd; diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 47ae558..44e68a2 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -10,10 +10,10 @@ cfg_io_driver! { /// [`std::io::Write`] traits with the reactor that drives it. /// /// `PollEvented` uses [`Registration`] internally to take a type that - /// implements [`mio::Evented`] as well as [`std::io::Read`] and or + /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or /// [`std::io::Write`] and associate it with a reactor that will drive it. /// - /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be + /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be /// used from within the future's execution model. As such, the /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] /// implementations using the underlying I/O resource as well as readiness @@ -40,13 +40,12 @@ cfg_io_driver! { /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O - /// resource not being ready, the caller must call [`clear_read_ready`] or - /// [`clear_write_ready`]. This clears the readiness state until a new - /// readiness event is received. + /// resource not being ready, the caller must call `clear_readiness`. + /// This clears the readiness state until a new readiness event is received. /// /// This allows the caller to implement additional functions. For example, /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and - /// [`clear_read_ready`]. + /// `clear_read_ready`. /// /// ## Platform-specific events /// @@ -54,17 +53,11 @@ cfg_io_driver! { /// These events are included as part of the read readiness event stream. The /// write readiness event stream is only for `Ready::writable()` events. /// - /// [`std::io::Read`]: trait@std::io::Read - /// [`std::io::Write`]: trait@std::io::Write - /// [`AsyncRead`]: trait@AsyncRead - /// [`AsyncWrite`]: trait@AsyncWrite - /// [`mio::Evented`]: trait@mio::Evented - /// [`Registration`]: struct@Registration - /// [`TcpListener`]: struct@crate::net::TcpListener - /// [`clear_read_ready`]: method@Self::clear_read_ready - /// [`clear_write_ready`]: method@Self::clear_write_ready - /// [`poll_read_ready`]: method@Self::poll_read_ready - /// [`poll_write_ready`]: method@Self::poll_write_ready + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`AsyncWrite`]: crate::io::AsyncWrite + /// [`TcpListener`]: crate::net::TcpListener + /// [`poll_read_ready`]: Registration::poll_read_ready + /// [`poll_write_ready`]: Registration::poll_write_ready pub(crate) struct PollEvented<E: Source> { io: Option<E>, registration: Registration, @@ -120,7 +113,7 @@ impl<E: Source> PollEvented<E> { }) } - /// Returns a reference to the registration + /// Returns a reference to the registration. #[cfg(any( feature = "net", all(unix, feature = "process"), @@ -130,7 +123,7 @@ impl<E: Source> PollEvented<E> { &self.registration } - /// Deregister the inner io from the registration and returns a Result containing the inner io + /// Deregisters the inner io from the registration and returns a Result containing the inner io. #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn into_inner(mut self) -> io::Result<E> { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs index 38e857d..ad58cbe 100644 --- a/src/io/read_buf.rs +++ b/src/io/read_buf.rs @@ -45,7 +45,7 @@ impl<'a> ReadBuf<'a> { /// Creates a new `ReadBuf` from a fully uninitialized buffer. /// - /// Use `assume_init` if part of the buffer is known to be already inintialized. + /// Use `assume_init` if part of the buffer is known to be already initialized. #[inline] pub fn uninit(buf: &'a mut [MaybeUninit<u8>]) -> ReadBuf<'a> { ReadBuf { @@ -85,7 +85,7 @@ impl<'a> ReadBuf<'a> { #[inline] pub fn take(&mut self, n: usize) -> ReadBuf<'_> { let max = std::cmp::min(self.remaining(), n); - // Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`. + // Safety: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`. unsafe { ReadBuf::uninit(&mut self.unfilled_mut()[..max]) } } @@ -217,7 +217,7 @@ impl<'a> ReadBuf<'a> { /// /// # Panics /// - /// Panics if the filled region of the buffer would become larger than the intialized region. + /// Panics if the filled region of the buffer would become larger than the initialized region. #[inline] pub fn set_filled(&mut self, n: usize) { assert!( diff --git a/src/io/split.rs b/src/io/split.rs index 732eb3b..8258a0f 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -63,7 +63,7 @@ impl<T> ReadHalf<T> { /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool { - other.is_pair_of(&self) + other.is_pair_of(self) } /// Reunites with a previously split `WriteHalf`. @@ -90,7 +90,7 @@ impl<T> ReadHalf<T> { } impl<T> WriteHalf<T> { - /// Check if this `WriteHalf` and some `ReadHalf` were split from the same + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool { Arc::ptr_eq(&self.inner, &other.inner) diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs index d21c842..7e4a198 100644 --- a/src/io/stdio_common.rs +++ b/src/io/stdio_common.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; /// if buffer contents seems to be utf8. Otherwise it only trims buffer down to MAX_BUF. /// That's why, wrapped writer will always receive well-formed utf-8 bytes. /// # Other platforms -/// passes data to `inner` as is +/// Passes data to `inner` as is. #[derive(Debug)] pub(crate) struct SplitByUtf8BoundaryIfWindows<W> { inner: W, @@ -52,10 +52,10 @@ where buf = &buf[..crate::io::blocking::MAX_BUF]; - // Now there are two possibilites. + // Now there are two possibilities. // If caller gave is binary buffer, we **should not** shrink it // anymore, because excessive shrinking hits performance. - // If caller gave as binary buffer, we **must** additionaly + // If caller gave as binary buffer, we **must** additionally // shrink it to strip incomplete char at the end of buffer. // that's why check we will perform now is allowed to have // false-positive. diff --git a/src/io/util/async_buf_read_ext.rs b/src/io/util/async_buf_read_ext.rs index 233ac31..b241e35 100644 --- a/src/io/util/async_buf_read_ext.rs +++ b/src/io/util/async_buf_read_ext.rs @@ -1,3 +1,4 @@ +use crate::io::util::fill_buf::{fill_buf, FillBuf}; use crate::io::util::lines::{lines, Lines}; use crate::io::util::read_line::{read_line, ReadLine}; use crate::io::util::read_until::{read_until, ReadUntil}; @@ -36,6 +37,18 @@ cfg_io_util! { /// [`fill_buf`]: AsyncBufRead::poll_fill_buf /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted /// + /// # Cancel safety + /// + /// If the method is used as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then some data may have been partially read. Any + /// partially read bytes are appended to `buf`, and the method can be + /// called again to continue reading until `byte`. + /// + /// This method returns the total number of bytes read. If you cancel + /// the call to `read_until` and then call it again to continue reading, + /// the counter is reset. + /// /// # Examples /// /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In @@ -114,6 +127,30 @@ cfg_io_util! { /// /// [`read_until`]: AsyncBufReadExt::read_until /// + /// # Cancel safety + /// + /// This method is not cancellation safe. If the method is used as the + /// event in a [`tokio::select!`](crate::select) statement and some + /// other branch completes first, then some data may have been partially + /// read, and this data is lost. There are no guarantees regarding the + /// contents of `buf` when the call is cancelled. The current + /// implementation replaces `buf` with the empty string, but this may + /// change in the future. + /// + /// This function does not behave like [`read_until`] because of the + /// requirement that a string contains only valid utf-8. If you need a + /// cancellation safe `read_line`, there are three options: + /// + /// * Call [`read_until`] with a newline character and manually perform the utf-8 check. + /// * The stream returned by [`lines`] has a cancellation safe + /// [`next_line`] method. + /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec]. + /// + /// [LinesCodec]: https://docs.rs/tokio-util/0.6/tokio_util/codec/struct.LinesCodec.html + /// [`read_until`]: Self::read_until + /// [`lines`]: Self::lines + /// [`next_line`]: crate::io::Lines::next_line + /// /// # Examples /// /// [`std::io::Cursor`][`Cursor`] is a type that implements @@ -173,10 +210,11 @@ cfg_io_util! { /// [`BufRead::split`](std::io::BufRead::split). /// /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have + /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have /// the delimiter byte at the end. /// /// [`io::Result`]: std::io::Result + /// [`Option`]: core::option::Option /// [`Vec<u8>`]: std::vec::Vec /// /// # Errors @@ -206,14 +244,68 @@ cfg_io_util! { split(self, byte) } + /// Returns the contents of the internal buffer, filling it with more + /// data from the inner reader if it is empty. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`consume`] method to function properly. When calling this method, + /// none of the contents will be "read" in the sense that later calling + /// `read` may return the same contents. As such, [`consume`] must be + /// called with the number of bytes that are consumed from this buffer + /// to ensure that the bytes are never returned twice. + /// + /// An empty buffer returned indicates that the stream has reached EOF. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fill_buf(&mut self) -> io::Result<&[u8]>; + /// ``` + /// + /// # Errors + /// + /// This function will return an I/O error if the underlying reader was + /// read, but returned an error. + /// + /// [`consume`]: crate::io::AsyncBufReadExt::consume + fn fill_buf(&mut self) -> FillBuf<'_, Self> + where + Self: Unpin, + { + fill_buf(self) + } + + /// Tells this buffer that `amt` bytes have been consumed from the + /// buffer, so they should no longer be returned in calls to [`read`]. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`fill_buf`] method to function properly. This function does not + /// perform any I/O, it simply informs this object that some amount of + /// its buffer, returned from [`fill_buf`], has been consumed and should + /// no longer be returned. As such, this function may do odd things if + /// [`fill_buf`] isn't called before calling it. + /// + /// The `amt` must be less than the number of bytes in the buffer + /// returned by [`fill_buf`]. + /// + /// [`read`]: crate::io::AsyncReadExt::read + /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf + fn consume(&mut self, amt: usize) + where + Self: Unpin, + { + std::pin::Pin::new(self).consume(amt) + } + /// Returns a stream over the lines of this reader. /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). /// /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline + /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. /// /// [`io::Result`]: std::io::Result + /// [`Option`]: core::option::Option /// [`String`]: String /// /// # Errors diff --git a/src/io/util/async_read_ext.rs b/src/io/util/async_read_ext.rs index e715f9d..df5445c 100644 --- a/src/io/util/async_read_ext.rs +++ b/src/io/util/async_read_ext.rs @@ -2,6 +2,7 @@ use crate::io::util::chain::{chain, Chain}; use crate::io::util::read::{read, Read}; use crate::io::util::read_buf::{read_buf, ReadBuf}; use crate::io::util::read_exact::{read_exact, ReadExact}; +use crate::io::util::read_int::{ReadF32, ReadF32Le, ReadF64, ReadF64Le}; use crate::io::util::read_int::{ ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8, }; @@ -105,8 +106,10 @@ cfg_io_util! { /// async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>; /// ``` /// - /// This function does not provide any guarantees about whether it - /// completes immediately or asynchronously + /// This method does not provide any guarantees about whether it + /// completes immediately or asynchronously. + /// + /// # Return /// /// If the return value of this method is `Ok(n)`, then it must be /// guaranteed that `0 <= n <= buf.len()`. A nonzero `n` value indicates @@ -136,6 +139,12 @@ cfg_io_util! { /// variant will be returned. If an error is returned then it must be /// guaranteed that no bytes were read. /// + /// # Cancel safety + /// + /// This method is cancel safe. If you use it as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then it is guaranteed that no data was read. + /// /// # Examples /// /// [`File`][crate::fs::File]s implement `Read`: @@ -175,14 +184,19 @@ cfg_io_util! { /// Usually, only a single `read` syscall is issued, even if there is /// more space in the supplied buffer. /// - /// This function does not provide any guarantees about whether it - /// completes immediately or asynchronously + /// This method does not provide any guarantees about whether it + /// completes immediately or asynchronously. /// /// # Return /// - /// On a successful read, the number of read bytes is returned. If the - /// supplied buffer is not empty and the function returns `Ok(0)` then - /// the source has reached an "end-of-file" event. + /// A nonzero `n` value indicates that the buffer `buf` has been filled + /// in with `n` bytes of data from this source. If `n` is `0`, then it + /// can indicate one of two scenarios: + /// + /// 1. This reader has reached its "end of file" and will likely no longer + /// be able to produce bytes. Note that this does not mean that the + /// reader will *always* no longer be able to produce bytes. + /// 2. The buffer specified had a remaining capacity of zero. /// /// # Errors /// @@ -190,6 +204,12 @@ cfg_io_util! { /// variant will be returned. If an error is returned then it must be /// guaranteed that no bytes were read. /// + /// # Cancel safety + /// + /// This method is cancel safe. If you use it as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then it is guaranteed that no data was read. + /// /// # Examples /// /// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]: @@ -254,6 +274,13 @@ cfg_io_util! { /// it has read, but it will never read more than would be necessary to /// completely fill the buffer. /// + /// # Cancel safety + /// + /// This method is not cancellation safe. If the method is used as the + /// event in a [`tokio::select!`](crate::select) statement and some + /// other branch completes first, then some data may already have been + /// read into `buf`. + /// /// # Examples /// /// [`File`][crate::fs::File]s implement `Read`: @@ -579,7 +606,7 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut reader = Cursor::new(vec![0x80, 0, 0, 0, 0, 0, 0, 0]); /// - /// assert_eq!(i64::min_value(), reader.read_i64().await?); + /// assert_eq!(i64::MIN, reader.read_i64().await?); /// Ok(()) /// } /// ``` @@ -659,12 +686,88 @@ cfg_io_util! { /// 0, 0, 0, 0, 0, 0, 0, 0 /// ]); /// - /// assert_eq!(i128::min_value(), reader.read_i128().await?); + /// assert_eq!(i128::MIN, reader.read_i128().await?); /// Ok(()) /// } /// ``` fn read_i128(&mut self) -> ReadI128; + /// Reads an 32-bit floating point type in big-endian order from the + /// underlying reader. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn read_f32(&mut self) -> io::Result<f32>; + /// ``` + /// + /// It is recommended to use a buffered reader to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncReadExt::read_exact`]. + /// + /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact + /// + /// # Examples + /// + /// Read 32-bit floating point type from a `AsyncRead`: + /// + /// ```rust + /// use tokio::io::{self, AsyncReadExt}; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut reader = Cursor::new(vec![0xff, 0x7f, 0xff, 0xff]); + /// + /// assert_eq!(f32::MIN, reader.read_f32().await?); + /// Ok(()) + /// } + /// ``` + fn read_f32(&mut self) -> ReadF32; + + /// Reads an 64-bit floating point type in big-endian order from the + /// underlying reader. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn read_f64(&mut self) -> io::Result<f64>; + /// ``` + /// + /// It is recommended to use a buffered reader to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncReadExt::read_exact`]. + /// + /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact + /// + /// # Examples + /// + /// Read 64-bit floating point type from a `AsyncRead`: + /// + /// ```rust + /// use tokio::io::{self, AsyncReadExt}; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut reader = Cursor::new(vec![ + /// 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff + /// ]); + /// + /// assert_eq!(f64::MIN, reader.read_f64().await?); + /// Ok(()) + /// } + /// ``` + fn read_f64(&mut self) -> ReadF64; + /// Reads an unsigned 16-bit integer in little-endian order from the /// underlying reader. /// @@ -971,6 +1074,82 @@ cfg_io_util! { /// } /// ``` fn read_i128_le(&mut self) -> ReadI128Le; + + /// Reads an 32-bit floating point type in little-endian order from the + /// underlying reader. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn read_f32_le(&mut self) -> io::Result<f32>; + /// ``` + /// + /// It is recommended to use a buffered reader to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncReadExt::read_exact`]. + /// + /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact + /// + /// # Examples + /// + /// Read 32-bit floating point type from a `AsyncRead`: + /// + /// ```rust + /// use tokio::io::{self, AsyncReadExt}; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut reader = Cursor::new(vec![0xff, 0xff, 0x7f, 0xff]); + /// + /// assert_eq!(f32::MIN, reader.read_f32_le().await?); + /// Ok(()) + /// } + /// ``` + fn read_f32_le(&mut self) -> ReadF32Le; + + /// Reads an 64-bit floating point type in little-endian order from the + /// underlying reader. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn read_f64_le(&mut self) -> io::Result<f64>; + /// ``` + /// + /// It is recommended to use a buffered reader to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncReadExt::read_exact`]. + /// + /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact + /// + /// # Examples + /// + /// Read 64-bit floating point type from a `AsyncRead`: + /// + /// ```rust + /// use tokio::io::{self, AsyncReadExt}; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut reader = Cursor::new(vec![ + /// 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef, 0xff + /// ]); + /// + /// assert_eq!(f64::MIN, reader.read_f64_le().await?); + /// Ok(()) + /// } + /// ``` + fn read_f64_le(&mut self) -> ReadF64Le; } /// Reads all bytes until EOF in this source, placing them into `buf`. diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs index 297a4a6..46b3e6c 100644 --- a/src/io/util/async_seek_ext.rs +++ b/src/io/util/async_seek_ext.rs @@ -67,6 +67,16 @@ cfg_io_util! { seek(self, pos) } + /// Creates a future which will rewind to the beginning of the stream. + /// + /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`. + fn rewind(&mut self) -> Seek<'_, Self> + where + Self: Unpin, + { + self.seek(SeekFrom::Start(0)) + } + /// Creates a future which will return the current seek position from the /// start of the stream. /// diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs index d011d82..93a3183 100644 --- a/src/io/util/async_write_ext.rs +++ b/src/io/util/async_write_ext.rs @@ -2,7 +2,9 @@ use crate::io::util::flush::{flush, Flush}; use crate::io::util::shutdown::{shutdown, Shutdown}; use crate::io::util::write::{write, Write}; use crate::io::util::write_all::{write_all, WriteAll}; +use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf}; use crate::io::util::write_buf::{write_buf, WriteBuf}; +use crate::io::util::write_int::{WriteF32, WriteF32Le, WriteF64, WriteF64Le}; use crate::io::util::write_int::{ WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le, WriteI8, @@ -18,7 +20,7 @@ use std::io::IoSlice; use bytes::Buf; cfg_io_util! { - /// Defines numeric writer + /// Defines numeric writer. macro_rules! write_impl { ( $( @@ -96,6 +98,13 @@ cfg_io_util! { /// It is **not** considered an error if the entire buffer could not be /// written to this writer. /// + /// # Cancel safety + /// + /// This method is cancellation safe in the sense that if it is used as + /// the event in a [`tokio::select!`](crate::select) statement and some + /// other branch completes first, then it is guaranteed that no data was + /// written to this `AsyncWrite`. + /// /// # Examples /// /// ```no_run @@ -128,6 +137,13 @@ cfg_io_util! { /// /// See [`AsyncWrite::poll_write_vectored`] for more details. /// + /// # Cancel safety + /// + /// This method is cancellation safe in the sense that if it is used as + /// the event in a [`tokio::select!`](crate::select) statement and some + /// other branch completes first, then it is guaranteed that no data was + /// written to this `AsyncWrite`. + /// /// # Examples /// /// ```no_run @@ -159,7 +175,6 @@ cfg_io_util! { write_vectored(self, bufs) } - /// Writes a buffer into this writer, advancing the buffer's internal /// cursor. /// @@ -195,12 +210,20 @@ cfg_io_util! { /// It is **not** considered an error if the entire buffer could not be /// written to this writer. /// + /// # Cancel safety + /// + /// This method is cancellation safe in the sense that if it is used as + /// the event in a [`tokio::select!`](crate::select) statement and some + /// other branch completes first, then it is guaranteed that no data was + /// written to this `AsyncWrite`. + /// /// # Examples /// - /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: /// /// [`File`]: crate::fs::File /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor /// /// ```no_run /// use tokio::io::{self, AsyncWriteExt}; @@ -238,6 +261,70 @@ cfg_io_util! { /// Equivalent to: /// /// ```ignore + /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> { + /// while buf.has_remaining() { + /// self.write_buf(&mut buf).await?; + /// } + /// Ok(()) + /// } + /// ``` + /// + /// This method will continuously call [`write`] until + /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not + /// return until the entire buffer has been successfully written or an error occurs. The + /// first error generated will be returned. + /// + /// The buffer is advanced after each chunk is successfully written. After failure, + /// `src.chunk()` will return the chunk that failed to write. + /// + /// # Cancel safety + /// + /// If `write_all_buf` is used as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then the data in the provided buffer may have been + /// partially written. However, it is guaranteed that the provided + /// buffer has been [advanced] by the amount of bytes that have been + /// partially written. + /// + /// # Examples + /// + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: + /// + /// [`File`]: crate::fs::File + /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor + /// [advanced]: bytes::Buf::advance + /// + /// ```no_run + /// use tokio::io::{self, AsyncWriteExt}; + /// use tokio::fs::File; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// let mut buffer = Cursor::new(b"data to write"); + /// + /// file.write_all_buf(&mut buffer).await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`write`]: AsyncWriteExt::write + fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B> + where + Self: Sized + Unpin, + B: Buf, + { + write_all_buf(self, src) + } + + /// Attempts to write an entire buffer into this writer. + /// + /// Equivalent to: + /// + /// ```ignore /// async fn write_all(&mut self, buf: &[u8]) -> io::Result<()>; /// ``` /// @@ -246,6 +333,14 @@ cfg_io_util! { /// has been successfully written or such an error occurs. The first /// error generated from this method will be returned. /// + /// # Cancel safety + /// + /// This method is not cancellation safe. If it is used as the event + /// in a [`tokio::select!`](crate::select) statement and some other + /// branch completes first, then the provided buffer may have been + /// partially written, but future calls to `write_all` will start over + /// from the beginning of the buffer. + /// /// # Errors /// /// This function will return the first error that [`write`] returns. @@ -258,9 +353,9 @@ cfg_io_util! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut buffer = File::create("foo.txt").await?; + /// let mut file = File::create("foo.txt").await?; /// - /// buffer.write_all(b"some bytes").await?; + /// file.write_all(b"some bytes").await?; /// Ok(()) /// } /// ``` @@ -567,8 +662,8 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut writer = Vec::new(); /// - /// writer.write_i64(i64::min_value()).await?; - /// writer.write_i64(i64::max_value()).await?; + /// writer.write_i64(i64::MIN).await?; + /// writer.write_i64(i64::MAX).await?; /// /// assert_eq!(writer, b"\x80\x00\x00\x00\x00\x00\x00\x00\x7f\xff\xff\xff\xff\xff\xff\xff"); /// Ok(()) @@ -645,7 +740,7 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut writer = Vec::new(); /// - /// writer.write_i128(i128::min_value()).await?; + /// writer.write_i128(i128::MIN).await?; /// /// assert_eq!(writer, vec![ /// 0x80, 0, 0, 0, 0, 0, 0, 0, @@ -656,6 +751,81 @@ cfg_io_util! { /// ``` fn write_i128(&mut self, n: i128) -> WriteI128; + /// Writes an 32-bit floating point type in big-endian order to the + /// underlying writer. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_f32(&mut self, n: f32) -> io::Result<()>; + /// ``` + /// + /// It is recommended to use a buffered writer to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncWriteExt::write_all`]. + /// + /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all + /// + /// # Examples + /// + /// Write 32-bit floating point type to a `AsyncWrite`: + /// + /// ```rust + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut writer = Vec::new(); + /// + /// writer.write_f32(f32::MIN).await?; + /// + /// assert_eq!(writer, vec![0xff, 0x7f, 0xff, 0xff]); + /// Ok(()) + /// } + /// ``` + fn write_f32(&mut self, n: f32) -> WriteF32; + + /// Writes an 64-bit floating point type in big-endian order to the + /// underlying writer. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_f64(&mut self, n: f64) -> io::Result<()>; + /// ``` + /// + /// It is recommended to use a buffered writer to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncWriteExt::write_all`]. + /// + /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all + /// + /// # Examples + /// + /// Write 64-bit floating point type to a `AsyncWrite`: + /// + /// ```rust + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut writer = Vec::new(); + /// + /// writer.write_f64(f64::MIN).await?; + /// + /// assert_eq!(writer, vec![ + /// 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff + /// ]); + /// Ok(()) + /// } + /// ``` + fn write_f64(&mut self, n: f64) -> WriteF64; /// Writes an unsigned 16-bit integer in little-endian order to the /// underlying writer. @@ -876,8 +1046,8 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut writer = Vec::new(); /// - /// writer.write_i64_le(i64::min_value()).await?; - /// writer.write_i64_le(i64::max_value()).await?; + /// writer.write_i64_le(i64::MIN).await?; + /// writer.write_i64_le(i64::MAX).await?; /// /// assert_eq!(writer, b"\x00\x00\x00\x00\x00\x00\x00\x80\xff\xff\xff\xff\xff\xff\xff\x7f"); /// Ok(()) @@ -954,7 +1124,7 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut writer = Vec::new(); /// - /// writer.write_i128_le(i128::min_value()).await?; + /// writer.write_i128_le(i128::MIN).await?; /// /// assert_eq!(writer, vec![ /// 0, 0, 0, 0, 0, 0, 0, @@ -964,6 +1134,82 @@ cfg_io_util! { /// } /// ``` fn write_i128_le(&mut self, n: i128) -> WriteI128Le; + + /// Writes an 32-bit floating point type in little-endian order to the + /// underlying writer. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_f32_le(&mut self, n: f32) -> io::Result<()>; + /// ``` + /// + /// It is recommended to use a buffered writer to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncWriteExt::write_all`]. + /// + /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all + /// + /// # Examples + /// + /// Write 32-bit floating point type to a `AsyncWrite`: + /// + /// ```rust + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut writer = Vec::new(); + /// + /// writer.write_f32_le(f32::MIN).await?; + /// + /// assert_eq!(writer, vec![0xff, 0xff, 0x7f, 0xff]); + /// Ok(()) + /// } + /// ``` + fn write_f32_le(&mut self, n: f32) -> WriteF32Le; + + /// Writes an 64-bit floating point type in little-endian order to the + /// underlying writer. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_f64_le(&mut self, n: f64) -> io::Result<()>; + /// ``` + /// + /// It is recommended to use a buffered writer to avoid excessive + /// syscalls. + /// + /// # Errors + /// + /// This method returns the same errors as [`AsyncWriteExt::write_all`]. + /// + /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all + /// + /// # Examples + /// + /// Write 64-bit floating point type to a `AsyncWrite`: + /// + /// ```rust + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut writer = Vec::new(); + /// + /// writer.write_f64_le(f64::MIN).await?; + /// + /// assert_eq!(writer, vec![ + /// 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef, 0xff + /// ]); + /// Ok(()) + /// } + /// ``` + fn write_f64_le(&mut self, n: f64) -> WriteF64Le; } /// Flushes this output stream, ensuring that all intermediately buffered diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs index 271f61b..7df610b 100644 --- a/src/io/util/buf_reader.rs +++ b/src/io/util/buf_reader.rs @@ -1,11 +1,11 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; -use std::io; +use std::io::{self, IoSlice, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{cmp, fmt}; +use std::{cmp, fmt, mem}; pin_project! { /// The `BufReader` struct adds buffering to any reader. @@ -30,6 +30,7 @@ pin_project! { pub(super) buf: Box<[u8]>, pub(super) pos: usize, pub(super) cap: usize, + pub(super) seek_state: SeekState, } } @@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> { buf: buffer.into_boxed_slice(), pos: 0, cap: 0, + seek_state: SeekState::Init, } } @@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch. + PendingOverflowed(i64), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seeks to an offset, in bytes, in the underlying reader. +/// +/// The position used for seeking with `SeekFrom::Current(_)` is the +/// position the underlying reader would be at if the `BufReader` had no +/// internal buffer. +/// +/// Seeking always discards the internal buffer, even if the seek position +/// would otherwise fall within it. This guarantees that calling +/// `.into_inner()` immediately after a seek yields the underlying reader +/// at the same position. +/// +/// See [`AsyncSeek`] for more details. +/// +/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` +/// where `n` minus the internal buffer length overflows an `i64`, two +/// seeks will be performed instead of one. If the second seek returns +/// `Err`, the underlying reader will be left at the same position it would +/// have if you called `seek` with `SeekFrom::Current(0)`. +impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We needs to call seek operation multiple times. + // And we should always call both start_seek and poll_complete, + // as start_seek alone cannot guarantee that the operation will be completed. + // poll_complete receives a Context and returns a Poll, so it cannot be called + // inside start_seek. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) { + SeekState::Init => { + // 1.x AsyncSeek recommends calling poll_complete before start_seek. + // We don't have to guarantee that the value returned by + // poll_complete called without start_seek is correct, + // so we'll return 0. + return Poll::Ready(Ok(0)); + } + SeekState::Start(SeekFrom::Current(n)) => { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::MIN so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(offset))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } else { + // seek backwards by our remainder, and then by the offset + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(-remainder))?; + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + } + SeekState::PendingOverflowed(n) => { + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Start(pos) => { + // Seeking with Start/End doesn't care about our buffer length. + self.as_mut().get_pin_mut().start_seek(pos)?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?, + }; + + match res { + Poll::Ready(res) => { + self.discard_buffer(); + Poll::Ready(Ok(res)) + } + Poll::Pending => { + *self.as_mut().project().seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> { fn poll_write( self: Pin<&mut Self>, @@ -150,6 +268,18 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> { self.get_pin_mut().poll_write(cx, buf) } + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.get_ref().is_write_vectored() + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.get_pin_mut().poll_flush(cx) } diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs index cc857e2..595c142 100644 --- a/src/io/util/buf_stream.rs +++ b/src/io/util/buf_stream.rs @@ -1,8 +1,8 @@ use crate::io::util::{BufReader, BufWriter}; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; -use std::io; +use std::io::{self, IoSlice, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { buf: rbuf, pos, cap, + seek_state: rseek_state, }, buf: wbuf, written, + seek_state: wseek_state, } = b; BufStream { @@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { inner, buf: wbuf, written, + seek_state: wseek_state, }, buf: rbuf, pos, cap, + seek_state: rseek_state, }, } } @@ -123,6 +127,18 @@ impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { self.project().inner.poll_write(cx, buf) } + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.project().inner.poll_flush(cx) } @@ -142,6 +158,34 @@ impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { } } +/// Seek to an offset, in bytes, in the underlying stream. +/// +/// The position used for seeking with `SeekFrom::Current(_)` is the +/// position the underlying stream would be at if the `BufStream` had no +/// internal buffer. +/// +/// Seeking always discards the internal buffer, even if the seek position +/// would otherwise fall within it. This guarantees that calling +/// `.into_inner()` immediately after a seek yields the underlying reader +/// at the same position. +/// +/// See [`AsyncSeek`] for more details. +/// +/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` +/// where `n` minus the internal buffer length overflows an `i64`, two +/// seeks will be performed instead of one. If the second seek returns +/// `Err`, the underlying reader will be left at the same position it would +/// have if you called `seek` with `SeekFrom::Current(0)`. +impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> { + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { + self.project().inner.start_seek(position) + } + + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + self.project().inner.poll_complete(cx) + } +} + impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { self.project().inner.poll_fill_buf(cx) diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs index 5e3d4b7..8dd1bba 100644 --- a/src/io/util/buf_writer.rs +++ b/src/io/util/buf_writer.rs @@ -1,9 +1,9 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, Write}; +use std::io::{self, IoSlice, SeekFrom, Write}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,6 +34,7 @@ pin_project! { pub(super) inner: W, pub(super) buf: Vec<u8>, pub(super) written: usize, + pub(super) seek_state: SeekState, } } @@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> { inner, buf: Vec::with_capacity(cap), written: 0, + seek_state: SeekState::Init, } } @@ -131,6 +133,72 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { } } + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + if self.inner.is_write_vectored() { + let total_len = bufs + .iter() + .fold(0usize, |acc, b| acc.saturating_add(b.len())); + if total_len > self.buf.capacity() - self.buf.len() { + ready!(self.as_mut().flush_buf(cx))?; + } + let me = self.as_mut().project(); + if total_len >= me.buf.capacity() { + // It's more efficient to pass the slices directly to the + // underlying writer than to buffer them. + // The case when the total_len calculation saturates at + // usize::MAX is also handled here. + me.inner.poll_write_vectored(cx, bufs) + } else { + bufs.iter().for_each(|b| me.buf.extend_from_slice(b)); + Poll::Ready(Ok(total_len)) + } + } else { + // Remove empty buffers at the beginning of bufs. + while bufs.first().map(|buf| buf.len()) == Some(0) { + bufs = &bufs[1..]; + } + if bufs.is_empty() { + return Poll::Ready(Ok(0)); + } + // Flush if the first buffer doesn't fit. + let first_len = bufs[0].len(); + if first_len > self.buf.capacity() - self.buf.len() { + ready!(self.as_mut().flush_buf(cx))?; + debug_assert!(self.buf.is_empty()); + } + let me = self.as_mut().project(); + if first_len >= me.buf.capacity() { + // The slice is at least as large as the buffering capacity, + // so it's better to write it directly, bypassing the buffer. + debug_assert!(me.buf.is_empty()); + return me.inner.poll_write(cx, &bufs[0]); + } else { + me.buf.extend_from_slice(&bufs[0]); + bufs = &bufs[1..]; + } + let mut total_written = first_len; + debug_assert!(total_written != 0); + // Append the buffers that fit in the internal buffer. + for buf in bufs { + if buf.len() > me.buf.capacity() - me.buf.len() { + break; + } else { + me.buf.extend_from_slice(buf); + total_written += buf.len(); + } + } + Poll::Ready(Ok(total_written)) + } + } + + fn is_write_vectored(&self) -> bool { + true + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { ready!(self.as_mut().flush_buf(cx))?; self.get_pin_mut().poll_flush(cx) @@ -142,6 +210,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to the offset, in bytes, in the underlying writer. +/// +/// Seeking always writes out the internal buffer before seeking. +impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We need to flush the internal buffer before seeking. + // It receives a `Context` and returns a `Poll`, so it cannot be called + // inside `start_seek`. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let pos = match self.seek_state { + SeekState::Init => { + return self.project().inner.poll_complete(cx); + } + SeekState::Start(pos) => Some(pos), + SeekState::Pending => None, + }; + + // Flush the internal buffer before seeking. + ready!(self.as_mut().flush_buf(cx))?; + + let mut me = self.project(); + if let Some(pos) = pos { + // Ensure previous seeks have finished before starting a new one + ready!(me.inner.as_mut().poll_complete(cx))?; + if let Err(e) = me.inner.as_mut().start_seek(pos) { + *me.seek_state = SeekState::Init; + return Poll::Ready(Err(e)); + } + } + match me.inner.poll_complete(cx) { + Poll::Ready(res) => { + *me.seek_state = SeekState::Init; + Poll::Ready(res) + } + Poll::Pending => { + *me.seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> { fn poll_read( self: Pin<&mut Self>, diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs index 3cd425b..d0ab7cb 100644 --- a/src/io/util/copy.rs +++ b/src/io/util/copy.rs @@ -8,6 +8,7 @@ use std::task::{Context, Poll}; #[derive(Debug)] pub(super) struct CopyBuffer { read_done: bool, + need_flush: bool, pos: usize, cap: usize, amt: u64, @@ -18,10 +19,11 @@ impl CopyBuffer { pub(super) fn new() -> Self { Self { read_done: false, + need_flush: false, pos: 0, cap: 0, amt: 0, - buf: vec![0; 2048].into_boxed_slice(), + buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(), } } @@ -41,7 +43,22 @@ impl CopyBuffer { if self.pos == self.cap && !self.read_done { let me = &mut *self; let mut buf = ReadBuf::new(&mut me.buf); - ready!(reader.as_mut().poll_read(cx, &mut buf))?; + + match reader.as_mut().poll_read(cx, &mut buf) { + Poll::Ready(Ok(_)) => (), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => { + // Try flushing when the reader has no progress to avoid deadlock + // when the reader depends on buffered writer. + if self.need_flush { + ready!(writer.as_mut().poll_flush(cx))?; + self.need_flush = false; + } + + return Poll::Pending; + } + } + let n = buf.filled().len(); if n == 0 { self.read_done = true; @@ -63,9 +80,18 @@ impl CopyBuffer { } else { self.pos += i; self.amt += i as u64; + self.need_flush = true; } } + // If pos larger than cap, this loop will never stop. + // In particular, user's wrong poll_write implementation returning + // incorrect written length may lead to thread blocking. + debug_assert!( + self.pos <= self.cap, + "writer returned length larger than input slice" + ); + // If we've written all the data and we've seen EOF, flush out the // data and finish the transfer. if self.pos == self.cap && self.read_done { diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs index cc43f0f..c93060b 100644 --- a/src/io/util/copy_bidirectional.rs +++ b/src/io/util/copy_bidirectional.rs @@ -104,6 +104,7 @@ where /// # Return value /// /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs new file mode 100644 index 0000000..3655c01 --- /dev/null +++ b/src/io/util/fill_buf.rs @@ -0,0 +1,53 @@ +use crate::io::AsyncBufRead; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`fill_buf`](crate::io::AsyncBufReadExt::fill_buf) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct FillBuf<'a, R: ?Sized> { + reader: Option<&'a mut R>, + #[pin] + _pin: PhantomPinned, + } +} + +pub(crate) fn fill_buf<R>(reader: &mut R) -> FillBuf<'_, R> +where + R: AsyncBufRead + ?Sized + Unpin, +{ + FillBuf { + reader: Some(reader), + _pin: PhantomPinned, + } +} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> { + type Output = io::Result<&'a [u8]>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + let reader = me.reader.take().expect("Polled after completion."); + match Pin::new(&mut *reader).poll_fill_buf(cx) { + Poll::Ready(Ok(slice)) => unsafe { + // Safety: This is necessary only due to a limitation in the + // borrow checker. Once Rust starts using the polonius borrow + // checker, this can be simplified. + let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice); + Poll::Ready(Ok(slice)) + }, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => { + *me.reader = Some(reader); + Poll::Pending + } + } + } +} diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs index ed6a944..717f633 100644 --- a/src/io/util/lines.rs +++ b/src/io/util/lines.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Read lines from an [`AsyncBufRead`]. + /// Reads lines from an [`AsyncBufRead`]. /// /// A `Lines` can be turned into a `Stream` with [`LinesStream`]. /// @@ -47,6 +47,10 @@ where { /// Returns the next line in the stream. /// + /// # Cancel safety + /// + /// This method is cancellation safe. + /// /// # Examples /// /// ``` @@ -68,12 +72,12 @@ where poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await } - /// Obtain a mutable reference to the underlying reader + /// Obtains a mutable reference to the underlying reader. pub fn get_mut(&mut self) -> &mut R { &mut self.reader } - /// Obtain a reference to the underlying reader + /// Obtains a reference to the underlying reader. pub fn get_ref(&mut self) -> &R { &self.reader } @@ -102,11 +106,9 @@ where /// /// When the method returns `Poll::Pending`, the `Waker` in the provided /// `Context` is scheduled to receive a wakeup when more bytes become - /// available on the underlying IO resource. - /// - /// Note that on multiple calls to `poll_next_line`, only the `Waker` from - /// the `Context` passed to the most recent call is scheduled to receive a - /// wakeup. + /// available on the underlying IO resource. Note that on multiple calls to + /// `poll_next_line`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. pub fn poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -128,7 +130,7 @@ where } } - Poll::Ready(Ok(Some(mem::replace(me.buf, String::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs index e91a932..4eefe7b 100644 --- a/src/io/util/mem.rs +++ b/src/io/util/mem.rs @@ -16,6 +16,14 @@ use std::{ /// that can be used as in-memory IO types. Writing to one of the pairs will /// allow that data to be read from the other, and vice versa. /// +/// # Closing a `DuplexStream` +/// +/// If one end of the `DuplexStream` channel is dropped, any pending reads on +/// the other side will continue to read data until the buffer is drained, then +/// they will signal EOF by returning 0 bytes. Any writes to the other side, +/// including pending ones (that are waiting for free space in the buffer) will +/// return `Err(BrokenPipe)` immediately. +/// /// # Example /// /// ``` @@ -37,6 +45,7 @@ use std::{ /// # } /// ``` #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct DuplexStream { read: Arc<Mutex<Pipe>>, write: Arc<Mutex<Pipe>>, @@ -72,6 +81,7 @@ struct Pipe { /// /// The `max_buf_size` argument is the maximum amount of bytes that can be /// written to a side before the write returns `Poll::Pending`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); @@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream { impl Drop for DuplexStream { fn drop(&mut self) { // notify the other side of the closure - self.write.lock().close(); + self.write.lock().close_write(); + self.read.lock().close_read(); } } @@ -151,12 +162,21 @@ impl Pipe { } } - fn close(&mut self) { + fn close_write(&mut self) { self.is_closed = true; + // needs to notify any readers that no more data will come if let Some(waker) = self.read_waker.take() { waker.wake(); } } + + fn close_read(&mut self) { + self.is_closed = true; + // needs to notify any writers that they have to abort + if let Some(waker) = self.write_waker.take() { + waker.wake(); + } + } } impl AsyncRead for Pipe { @@ -217,7 +237,7 @@ impl AsyncWrite for Pipe { mut self: Pin<&mut Self>, _: &mut task::Context<'_>, ) -> Poll<std::io::Result<()>> { - self.close(); + self.close_write(); Poll::Ready(Ok(())) } } diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs index ab38664..21199d0 100644 --- a/src/io/util/mod.rs +++ b/src/io/util/mod.rs @@ -49,6 +49,7 @@ cfg_io_util! { mod read_exact; mod read_int; mod read_line; + mod fill_buf; mod read_to_end; mod vec_with_initialized; @@ -77,6 +78,7 @@ cfg_io_util! { mod write_vectored; mod write_all; mod write_buf; + mod write_all_buf; mod write_int; diff --git a/src/io/util/read_int.rs b/src/io/util/read_int.rs index 5b9fb7b..164dcf5 100644 --- a/src/io/util/read_int.rs +++ b/src/io/util/read_int.rs @@ -142,6 +142,9 @@ reader!(ReadI32, i32, get_i32); reader!(ReadI64, i64, get_i64); reader!(ReadI128, i128, get_i128); +reader!(ReadF32, f32, get_f32); +reader!(ReadF64, f64, get_f64); + reader!(ReadU16Le, u16, get_u16_le); reader!(ReadU32Le, u32, get_u32_le); reader!(ReadU64Le, u64, get_u64_le); @@ -151,3 +154,6 @@ reader!(ReadI16Le, i16, get_i16_le); reader!(ReadI32Le, i32, get_i32_le); reader!(ReadI64Le, i64, get_i64_le); reader!(ReadI128Le, i128, get_i128_le); + +reader!(ReadF32Le, f32, get_f32_le); +reader!(ReadF64Le, f64, get_f64_le); diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs index d38ffaf..e641f51 100644 --- a/src/io/util/read_line.rs +++ b/src/io/util/read_line.rs @@ -36,7 +36,7 @@ where { ReadLine { reader, - buf: mem::replace(string, String::new()).into_bytes(), + buf: mem::take(string).into_bytes(), output: string, read: 0, _pin: PhantomPinned, @@ -99,7 +99,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>( read: &mut usize, ) -> Poll<io::Result<usize>> { let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read)); - let utf8_res = String::from_utf8(mem::replace(buf, Vec::new())); + let utf8_res = String::from_utf8(mem::take(buf)); // At this point both buf and output are empty. The allocation is in utf8_res. diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs index 2c17383..b3d82a2 100644 --- a/src/io/util/read_to_string.rs +++ b/src/io/util/read_to_string.rs @@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>( where R: AsyncRead + ?Sized + Unpin, { - let buf = mem::replace(string, String::new()).into_bytes(); + let buf = mem::take(string).into_bytes(); ReadToString { reader, buf: VecWithInitialized::new(buf), diff --git a/src/io/util/read_until.rs b/src/io/util/read_until.rs index 3599cff..90a0e8a 100644 --- a/src/io/util/read_until.rs +++ b/src/io/util/read_until.rs @@ -10,12 +10,12 @@ use std::task::{Context, Poll}; pin_project! { /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method. - /// The delimeter is included in the resulting vector. + /// The delimiter is included in the resulting vector. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadUntil<'a, R: ?Sized> { reader: &'a mut R, - delimeter: u8, + delimiter: u8, buf: &'a mut Vec<u8>, // The number of bytes appended to buf. This can be less than buf.len() if // the buffer was not empty when the operation was started. @@ -28,7 +28,7 @@ pin_project! { pub(crate) fn read_until<'a, R>( reader: &'a mut R, - delimeter: u8, + delimiter: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, R> where @@ -36,7 +36,7 @@ where { ReadUntil { reader, - delimeter, + delimiter, buf, read: 0, _pin: PhantomPinned, @@ -46,14 +46,14 @@ where pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, - delimeter: u8, + delimiter: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>> { loop { let (done, used) = { let available = ready!(reader.as_mut().poll_fill_buf(cx))?; - if let Some(i) = memchr::memchr(delimeter, available) { + if let Some(i) = memchr::memchr(delimiter, available) { buf.extend_from_slice(&available[..=i]); (true, i + 1) } else { @@ -74,6 +74,6 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read) + read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read) } } diff --git a/src/io/util/split.rs b/src/io/util/split.rs index 4f3ce4e..7489c24 100644 --- a/src/io/util/split.rs +++ b/src/io/util/split.rs @@ -95,7 +95,7 @@ where let n = ready!(read_until_internal( me.reader, cx, *me.delim, me.buf, me.read, ))?; - // read_until_internal resets me.read to zero once it finds the delimeter + // read_until_internal resets me.read to zero once it finds the delimiter debug_assert_eq!(*me.read, 0); if n == 0 && me.buf.is_empty() { @@ -106,7 +106,7 @@ where me.buf.pop(); } - Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs new file mode 100644 index 0000000..05af7fe --- /dev/null +++ b/src/io/util/write_all_buf.rs @@ -0,0 +1,56 @@ +use crate::io::AsyncWrite; + +use bytes::Buf; +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// A future to write some of the buffer to an `AsyncWrite`. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct WriteAllBuf<'a, W, B> { + writer: &'a mut W, + buf: &'a mut B, + #[pin] + _pin: PhantomPinned, + } +} + +/// Tries to write some bytes from the given `buf` to the writer in an +/// asynchronous manner, returning a future. +pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + WriteAllBuf { + writer, + buf, + _pin: PhantomPinned, + } +} + +impl<W, B> Future for WriteAllBuf<'_, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let me = self.project(); + while me.buf.has_remaining() { + let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?); + me.buf.advance(n); + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/src/io/util/write_int.rs b/src/io/util/write_int.rs index 13bc191..63cd491 100644 --- a/src/io/util/write_int.rs +++ b/src/io/util/write_int.rs @@ -135,6 +135,9 @@ writer!(WriteI32, i32, put_i32); writer!(WriteI64, i64, put_i64); writer!(WriteI128, i128, put_i128); +writer!(WriteF32, f32, put_f32); +writer!(WriteF64, f64, put_f64); + writer!(WriteU16Le, u16, put_u16_le); writer!(WriteU32Le, u32, put_u32_le); writer!(WriteU64Le, u64, put_u64_le); @@ -144,3 +147,6 @@ writer!(WriteI16Le, i16, put_i16_le); writer!(WriteI32Le, i32, put_i32_le); writer!(WriteI64Le, i64, put_i64_le); writer!(WriteI128Le, i128, put_i128_le); + +writer!(WriteF32Le, f32, put_f32_le); +writer!(WriteF64Le, f64, put_f64_le); |