aboutsummaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2021-11-25 20:05:29 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2021-11-25 20:05:29 +0000
commit2f0421fc344db1fc129b5ece09f6c9b662648c2e (patch)
tree7c474e5234417d4e078c4a115ce27112bf34c301 /src/io
parent6c9a00a78b690ca3f86b75d07c5270e21facad62 (diff)
parentb8563af5d8e399995ad8a5d01bf40ee260885b32 (diff)
downloadtokio-android12-mainline-mediaprovider-release.tar.gz
Change-Id: I1319ffb9800fb9ba5c4f6e30ce14f5fe2ae7153b
Diffstat (limited to 'src/io')
-rw-r--r--src/io/async_fd.rs34
-rw-r--r--src/io/async_write.rs6
-rw-r--r--src/io/blocking.rs4
-rw-r--r--src/io/bsd/poll_aio.rs195
-rw-r--r--src/io/driver/interest.rs26
-rw-r--r--src/io/driver/mod.rs23
-rw-r--r--src/io/driver/ready.rs23
-rw-r--r--src/io/driver/registration.rs7
-rw-r--r--src/io/driver/scheduled_io.rs53
-rw-r--r--src/io/mod.rs9
-rw-r--r--src/io/poll_evented.rs31
-rw-r--r--src/io/read_buf.rs6
-rw-r--r--src/io/split.rs4
-rw-r--r--src/io/stdio_common.rs6
-rw-r--r--src/io/util/async_buf_read_ext.rs96
-rw-r--r--src/io/util/async_read_ext.rs197
-rw-r--r--src/io/util/async_seek_ext.rs10
-rw-r--r--src/io/util/async_write_ext.rs268
-rw-r--r--src/io/util/buf_reader.rs136
-rw-r--r--src/io/util/buf_stream.rs48
-rw-r--r--src/io/util/buf_writer.rs128
-rw-r--r--src/io/util/copy.rs30
-rw-r--r--src/io/util/copy_bidirectional.rs1
-rw-r--r--src/io/util/fill_buf.rs53
-rw-r--r--src/io/util/lines.rs20
-rw-r--r--src/io/util/mem.rs26
-rw-r--r--src/io/util/mod.rs2
-rw-r--r--src/io/util/read_int.rs6
-rw-r--r--src/io/util/read_line.rs4
-rw-r--r--src/io/util/read_to_string.rs2
-rw-r--r--src/io/util/read_until.rs14
-rw-r--r--src/io/util/split.rs4
-rw-r--r--src/io/util/write_all_buf.rs56
-rw-r--r--src/io/util/write_int.rs6
34 files changed, 1392 insertions, 142 deletions
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
index 5a68d30..9ec5b7f 100644
--- a/src/io/async_fd.rs
+++ b/src/io/async_fd.rs
@@ -205,13 +205,13 @@ impl<T: AsRawFd> AsyncFd<T> {
})
}
- /// Returns a shared reference to the backing object of this [`AsyncFd`]
+ /// Returns a shared reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_ref(&self) -> &T {
self.inner.as_ref().unwrap()
}
- /// Returns a mutable reference to the backing object of this [`AsyncFd`]
+ /// Returns a mutable reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.inner.as_mut().unwrap()
@@ -540,6 +540,16 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
result => Ok(result),
}
}
+
+ /// Returns a shared reference to the inner [`AsyncFd`].
+ pub fn get_ref(&self) -> &AsyncFd<Inner> {
+ self.async_fd
+ }
+
+ /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
+ pub fn get_inner(&self) -> &Inner {
+ self.get_ref().get_ref()
+ }
}
impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
@@ -601,6 +611,26 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
result => Ok(result),
}
}
+
+ /// Returns a shared reference to the inner [`AsyncFd`].
+ pub fn get_ref(&self) -> &AsyncFd<Inner> {
+ self.async_fd
+ }
+
+ /// Returns a mutable reference to the inner [`AsyncFd`].
+ pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
+ self.async_fd
+ }
+
+ /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
+ pub fn get_inner(&self) -> &Inner {
+ self.get_ref().get_ref()
+ }
+
+ /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
+ pub fn get_inner_mut(&mut self) -> &mut Inner {
+ self.get_mut().get_mut()
+ }
}
impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
diff --git a/src/io/async_write.rs b/src/io/async_write.rs
index 569fb9c..7ec1a30 100644
--- a/src/io/async_write.rs
+++ b/src/io/async_write.rs
@@ -45,7 +45,11 @@ use std::task::{Context, Poll};
pub trait AsyncWrite {
/// Attempt to write bytes from `buf` into the object.
///
- /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
+ /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. If successful,
+ /// then it must be guaranteed that `n <= buf.len()`. A return value of `0`
+ /// typically means that the underlying object is no longer able to accept
+ /// bytes and will likely not be able to in the future as well, or that the
+ /// buffer provided is empty.
///
/// If the object is not ready for writing, the method returns
/// `Poll::Pending` and arranges for the current task (via
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index 94a3484..1d79ee7 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -16,7 +16,7 @@ use self::State::*;
pub(crate) struct Blocking<T> {
inner: Option<T>,
state: State<T>,
- /// `true` if the lower IO layer needs flushing
+ /// `true` if the lower IO layer needs flushing.
need_flush: bool,
}
@@ -175,7 +175,7 @@ where
}
}
-/// Repeats operations that are interrupted
+/// Repeats operations that are interrupted.
macro_rules! uninterruptibly {
($e:expr) => {{
loop {
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
new file mode 100644
index 0000000..f1ac4b2
--- /dev/null
+++ b/src/io/bsd/poll_aio.rs
@@ -0,0 +1,195 @@
+//! Use POSIX AIO futures with Tokio.
+
+use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
+use mio::event::Source;
+use mio::Registry;
+use mio::Token;
+use std::fmt;
+use std::io;
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::AsRawFd;
+use std::os::unix::prelude::RawFd;
+use std::task::{Context, Poll};
+
+/// Like [`mio::event::Source`], but for POSIX AIO only.
+///
+/// Tokio's consumer must pass an implementor of this trait to create a
+/// [`Aio`] object.
+pub trait AioSource {
+ /// Registers this AIO event source with Tokio's reactor.
+ fn register(&mut self, kq: RawFd, token: usize);
+
+ /// Deregisters this AIO event source with Tokio's reactor.
+ fn deregister(&mut self);
+}
+
+/// Wraps the user's AioSource in order to implement mio::event::Source, which
+/// is what the rest of the crate wants.
+struct MioSource<T>(T);
+
+impl<T: AioSource> Source for MioSource<T> {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+
+ fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
+ self.0.deregister();
+ Ok(())
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+}
+
+/// Associates a POSIX AIO control block with the reactor that drives it.
+///
+/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
+/// by the reactor.
+///
+/// The wrapped source may be accessed through the `Aio` via the `Deref` and
+/// `DerefMut` traits.
+///
+/// ## Clearing readiness
+///
+/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
+/// Source is not completely ready and must return to the Pending state,
+/// [`Aio::clear_ready`] may be used. This can be useful with
+/// [`lio_listio`], which may generate a kevent when only a portion of the
+/// operations have completed.
+///
+/// ## Platforms
+///
+/// Only FreeBSD implements POSIX AIO with kqueue notification, so
+/// `Aio` is only available for that operating system.
+///
+/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+// Note: Unlike every other kqueue event source, POSIX AIO registers events not
+// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
+// aio_write, etc. It needs the kqueue's file descriptor to do that. So
+// AsyncFd can't be used for POSIX AIO.
+//
+// Note that Aio doesn't implement Drop. There's no need. Unlike other
+// kqueue sources, simply dropping the object effectively deregisters it.
+pub struct Aio<E> {
+ io: MioSource<E>,
+ registration: Registration,
+}
+
+// ===== impl Aio =====
+
+impl<E: AioSource> Aio<E> {
+ /// Creates a new `Aio` suitable for use with POSIX AIO functions.
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn new_for_aio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::AIO)
+ }
+
+ /// Creates a new `Aio` suitable for use with [`lio_listio`].
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn new_for_lio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::LIO)
+ }
+
+ fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
+ let mut io = MioSource(io);
+ let handle = Handle::current();
+ let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
+ Ok(Self { io, registration })
+ }
+
+ /// Indicates to Tokio that the source is no longer ready. The internal
+ /// readiness flag will be cleared, and tokio will wait for the next
+ /// edge-triggered readiness notification from the OS.
+ ///
+ /// It is critical that this method not be called unless your code
+ /// _actually observes_ that the source is _not_ ready. The OS must
+ /// deliver a subsequent notification, or this source will block
+ /// forever. It is equally critical that you `do` call this method if you
+ /// resubmit the same structure to the kernel and poll it again.
+ ///
+ /// This method is not very useful with AIO readiness, since each `aiocb`
+ /// structure is typically only used once. It's main use with
+ /// [`lio_listio`], which will sometimes send notification when only a
+ /// portion of its elements are complete. In that case, the caller must
+ /// call `clear_ready` before resubmitting it.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn clear_ready(&self, ev: AioEvent) {
+ self.registration.clear_readiness(ev.0)
+ }
+
+ /// Destroy the [`Aio`] and return its inner source.
+ pub fn into_inner(self) -> E {
+ self.io.0
+ }
+
+ /// Polls for readiness. Either AIO or LIO counts.
+ ///
+ /// This method returns:
+ /// * `Poll::Pending` if the underlying operation is not complete, whether
+ /// or not it completed successfully. This will be true if the OS is
+ /// still processing it, or if it has not yet been submitted to the OS.
+ /// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
+ /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
+ /// _not_ indicate that the underlying operation encountered an error.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
+ /// is scheduled to receive a wakeup when the underlying operation
+ /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
+ /// `Context` passed to the most recent call is scheduled to receive a wakeup.
+ pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
+ let ev = ready!(self.registration.poll_read_ready(cx))?;
+ Poll::Ready(Ok(AioEvent(ev)))
+ }
+}
+
+impl<E: AioSource> Deref for Aio<E> {
+ type Target = E;
+
+ fn deref(&self) -> &E {
+ &self.io.0
+ }
+}
+
+impl<E: AioSource> DerefMut for Aio<E> {
+ fn deref_mut(&mut self) -> &mut E {
+ &mut self.io.0
+ }
+}
+
+impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Aio").field("io", &self.io.0).finish()
+ }
+}
+
+/// Opaque data returned by [`Aio::poll_ready`].
+///
+/// It can be fed back to [`Aio::clear_ready`].
+#[derive(Debug)]
+pub struct AioEvent(ReadyEvent);
diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs
index 9eead08..d6b46df 100644
--- a/src/io/driver/interest.rs
+++ b/src/io/driver/interest.rs
@@ -5,7 +5,7 @@ use crate::io::driver::Ready;
use std::fmt;
use std::ops;
-/// Readiness event interest
+/// Readiness event interest.
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
@@ -14,12 +14,32 @@ use std::ops;
pub struct Interest(mio::Interest);
impl Interest {
+ // The non-FreeBSD definitions in this block are active only when
+ // building documentation.
+ cfg_aio! {
+ /// Interest for POSIX AIO.
+ #[cfg(target_os = "freebsd")]
+ pub const AIO: Interest = Interest(mio::Interest::AIO);
+
+ /// Interest for POSIX AIO.
+ #[cfg(not(target_os = "freebsd"))]
+ pub const AIO: Interest = Interest(mio::Interest::READABLE);
+
+ /// Interest for POSIX AIO lio_listio events.
+ #[cfg(target_os = "freebsd")]
+ pub const LIO: Interest = Interest(mio::Interest::LIO);
+
+ /// Interest for POSIX AIO lio_listio events.
+ #[cfg(not(target_os = "freebsd"))]
+ pub const LIO: Interest = Interest(mio::Interest::READABLE);
+ }
+
/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
- /// Interest in all writable events
+ /// Interest in all writable events.
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
@@ -58,7 +78,7 @@ impl Interest {
self.0.is_writable()
}
- /// Add together two `Interst` values.
+ /// Add together two `Interest` values.
///
/// This function works from a `const` context.
///
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index fa2d420..19f67a2 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -23,10 +23,10 @@ use std::io;
use std::sync::{Arc, Weak};
use std::time::Duration;
-/// I/O driver, backed by Mio
+/// I/O driver, backed by Mio.
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
- /// as it is mostly used to determine when to call `compact()`
+ /// as it is mostly used to determine when to call `compact()`.
tick: u8,
/// Reuse the `mio::Events` value across calls to poll.
@@ -35,22 +35,23 @@ pub(crate) struct Driver {
/// Primary slab handle containing the state for each resource registered
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
- /// Some)
+ /// Some).
resources: Option<Slab<ScheduledIo>>,
- /// The system event queue
+ /// The system event queue.
poll: mio::Poll,
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
}
-/// A reference to an I/O driver
+/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
}
+#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
@@ -65,13 +66,13 @@ pub(super) struct Inner {
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,
- /// Registers I/O resources
+ /// Registers I/O resources.
registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
- /// Used to wake up the reactor from a call to `turn`
+ /// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
}
@@ -96,7 +97,7 @@ const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
//
// The generation prevents a race condition where a slab slot is reused for a
// new socket while the I/O driver is about to apply a readiness event. The
-// generaton value is checked when setting new readiness. If the generation do
+// generation value is checked when setting new readiness. If the generation do
// not match, then the readiness event is discarded.
const GENERATION: bit::Pack = ADDRESS.then(7);
@@ -252,7 +253,7 @@ impl fmt::Debug for Driver {
cfg_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
@@ -266,14 +267,14 @@ cfg_rt! {
cfg_not_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
pub(super) fn current() -> Self {
- panic!(crate::util::error::CONTEXT_MISSING_ERROR)
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs
index 2ac01bd..2430d30 100644
--- a/src/io/driver/ready.rs
+++ b/src/io/driver/ready.rs
@@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;
+ #[cfg(all(target_os = "freebsd", feature = "net"))]
+ {
+ if event.is_aio() {
+ ready |= Ready::READABLE;
+ }
+
+ if event.is_lio() {
+ ready |= Ready::READABLE;
+ }
+ }
+
if event.is_readable() {
ready |= Ready::READABLE;
}
@@ -57,7 +68,7 @@ impl Ready {
ready
}
- /// Returns true if `Ready` is the empty set
+ /// Returns true if `Ready` is the empty set.
///
/// # Examples
///
@@ -71,7 +82,7 @@ impl Ready {
self == Ready::EMPTY
}
- /// Returns `true` if the value includes `readable`
+ /// Returns `true` if the value includes `readable`.
///
/// # Examples
///
@@ -87,7 +98,7 @@ impl Ready {
self.contains(Ready::READABLE) || self.is_read_closed()
}
- /// Returns `true` if the value includes writable `readiness`
+ /// Returns `true` if the value includes writable `readiness`.
///
/// # Examples
///
@@ -103,7 +114,7 @@ impl Ready {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
- /// Returns `true` if the value includes read-closed `readiness`
+ /// Returns `true` if the value includes read-closed `readiness`.
///
/// # Examples
///
@@ -118,7 +129,7 @@ impl Ready {
self.contains(Ready::READ_CLOSED)
}
- /// Returns `true` if the value includes write-closed `readiness`
+ /// Returns `true` if the value includes write-closed `readiness`.
///
/// # Examples
///
@@ -143,7 +154,7 @@ impl Ready {
(self & other) == other
}
- /// Create a `Ready` instance using the given `usize` representation.
+ /// Creates a `Ready` instance using the given `usize` representation.
///
/// The `usize` representation must have been obtained from a call to
/// `Readiness::as_usize`.
diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs
index 8251fe6..7350be6 100644
--- a/src/io/driver/registration.rs
+++ b/src/io/driver/registration.rs
@@ -14,8 +14,9 @@ cfg_io_driver! {
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
- /// The association between an I/O resource is made by calling [`new`]. Once
- /// the association is established, it remains established until the
+ /// The association between an I/O resource is made by calling
+ /// [`new_with_interest_and_handle`].
+ /// Once the association is established, it remains established until the
/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One
@@ -36,7 +37,7 @@ cfg_io_driver! {
/// stream. The write readiness event stream is only for `Ready::writable()`
/// events.
///
- /// [`new`]: method@Self::new
+ /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index 2626b40..76f9343 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
+use crate::util::WakeList;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
@@ -35,16 +36,16 @@ cfg_io_readiness! {
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "net")]
- /// List of all current waiters
+ /// List of all current waiters.
list: WaitList,
- /// Waker used for AsyncRead
+ /// Waker used for AsyncRead.
reader: Option<Waker>,
- /// Waker used for AsyncWrite
+ /// Waker used for AsyncWrite.
writer: Option<Waker>,
- /// True if this ScheduledIo has been killed due to IO driver shutdown
+ /// True if this ScheduledIo has been killed due to IO driver shutdown.
is_shutdown: bool,
}
@@ -53,19 +54,19 @@ cfg_io_readiness! {
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
- /// The waker for this task
+ /// The waker for this task.
waker: Option<Waker>,
- /// The interest this waiter is waiting on
+ /// The interest this waiter is waiting on.
interest: Interest,
is_ready: bool,
- /// Should never be `!Unpin`
+ /// Should never be `!Unpin`.
_p: PhantomPinned,
}
- /// Future returned by `readiness()`
+ /// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
@@ -84,9 +85,9 @@ cfg_io_readiness! {
// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
-// | reserved | generation | driver tick | readinesss |
-// |----------+------------+--------------+------------|
-// | 1 bit | 7 bits + 8 bits + 16 bits |
+// | reserved | generation | driver tick | readiness |
+// |----------+------------+--------------+-----------|
+// | 1 bit | 7 bits + 8 bits + 16 bits |
const READINESS: bit::Pack = bit::Pack::least_significant(16);
@@ -212,10 +213,7 @@ impl ScheduledIo {
}
fn wake0(&self, ready: Ready, shutdown: bool) {
- const NUM_WAKERS: usize = 32;
-
- let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
- let mut curr = 0;
+ let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
@@ -224,16 +222,14 @@ impl ScheduledIo {
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
@@ -241,15 +237,14 @@ impl ScheduledIo {
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
- while curr < NUM_WAKERS {
+ while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
None => {
@@ -260,11 +255,7 @@ impl ScheduledIo {
drop(waiters);
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
-
- curr = 0;
+ wakers.wake_all();
// Acquire the lock again.
waiters = self.waiters.lock();
@@ -273,9 +264,7 @@ impl ScheduledIo {
// Release the lock before notifying
drop(waiters);
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
+ wakers.wake_all();
}
pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
@@ -287,7 +276,7 @@ impl ScheduledIo {
}
}
- /// Poll version of checking readiness for a certain direction.
+ /// Polls for readiness events in a given direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
@@ -374,7 +363,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
- /// An async version of `poll_readiness` which uses a linked list of wakers
+ /// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 14a4a63..cfdda61 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -217,6 +217,15 @@ cfg_io_driver_impl! {
pub(crate) use poll_evented::PollEvented;
}
+cfg_aio! {
+ /// BSD-specific I/O types.
+ pub mod bsd {
+ mod poll_aio;
+
+ pub use poll_aio::{Aio, AioEvent, AioSource};
+ }
+}
+
cfg_net_unix! {
mod async_fd;
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 47ae558..44e68a2 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -10,10 +10,10 @@ cfg_io_driver! {
/// [`std::io::Write`] traits with the reactor that drives it.
///
/// `PollEvented` uses [`Registration`] internally to take a type that
- /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
+ /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or
/// [`std::io::Write`] and associate it with a reactor that will drive it.
///
- /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
+ /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
/// used from within the future's execution model. As such, the
/// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
/// implementations using the underlying I/O resource as well as readiness
@@ -40,13 +40,12 @@ cfg_io_driver! {
/// [`poll_read_ready`] again will also indicate read readiness.
///
/// When the operation is attempted and is unable to succeed due to the I/O
- /// resource not being ready, the caller must call [`clear_read_ready`] or
- /// [`clear_write_ready`]. This clears the readiness state until a new
- /// readiness event is received.
+ /// resource not being ready, the caller must call `clear_readiness`.
+ /// This clears the readiness state until a new readiness event is received.
///
/// This allows the caller to implement additional functions. For example,
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
- /// [`clear_read_ready`].
+ /// `clear_read_ready`.
///
/// ## Platform-specific events
///
@@ -54,17 +53,11 @@ cfg_io_driver! {
/// These events are included as part of the read readiness event stream. The
/// write readiness event stream is only for `Ready::writable()` events.
///
- /// [`std::io::Read`]: trait@std::io::Read
- /// [`std::io::Write`]: trait@std::io::Write
- /// [`AsyncRead`]: trait@AsyncRead
- /// [`AsyncWrite`]: trait@AsyncWrite
- /// [`mio::Evented`]: trait@mio::Evented
- /// [`Registration`]: struct@Registration
- /// [`TcpListener`]: struct@crate::net::TcpListener
- /// [`clear_read_ready`]: method@Self::clear_read_ready
- /// [`clear_write_ready`]: method@Self::clear_write_ready
- /// [`poll_read_ready`]: method@Self::poll_read_ready
- /// [`poll_write_ready`]: method@Self::poll_write_ready
+ /// [`AsyncRead`]: crate::io::AsyncRead
+ /// [`AsyncWrite`]: crate::io::AsyncWrite
+ /// [`TcpListener`]: crate::net::TcpListener
+ /// [`poll_read_ready`]: Registration::poll_read_ready
+ /// [`poll_write_ready`]: Registration::poll_write_ready
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
@@ -120,7 +113,7 @@ impl<E: Source> PollEvented<E> {
})
}
- /// Returns a reference to the registration
+ /// Returns a reference to the registration.
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
@@ -130,7 +123,7 @@ impl<E: Source> PollEvented<E> {
&self.registration
}
- /// Deregister the inner io from the registration and returns a Result containing the inner io
+ /// Deregisters the inner io from the registration and returns a Result containing the inner io.
#[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs
index 38e857d..ad58cbe 100644
--- a/src/io/read_buf.rs
+++ b/src/io/read_buf.rs
@@ -45,7 +45,7 @@ impl<'a> ReadBuf<'a> {
/// Creates a new `ReadBuf` from a fully uninitialized buffer.
///
- /// Use `assume_init` if part of the buffer is known to be already inintialized.
+ /// Use `assume_init` if part of the buffer is known to be already initialized.
#[inline]
pub fn uninit(buf: &'a mut [MaybeUninit<u8>]) -> ReadBuf<'a> {
ReadBuf {
@@ -85,7 +85,7 @@ impl<'a> ReadBuf<'a> {
#[inline]
pub fn take(&mut self, n: usize) -> ReadBuf<'_> {
let max = std::cmp::min(self.remaining(), n);
- // Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`.
+ // Safety: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`.
unsafe { ReadBuf::uninit(&mut self.unfilled_mut()[..max]) }
}
@@ -217,7 +217,7 @@ impl<'a> ReadBuf<'a> {
///
/// # Panics
///
- /// Panics if the filled region of the buffer would become larger than the intialized region.
+ /// Panics if the filled region of the buffer would become larger than the initialized region.
#[inline]
pub fn set_filled(&mut self, n: usize) {
assert!(
diff --git a/src/io/split.rs b/src/io/split.rs
index 732eb3b..8258a0f 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -63,7 +63,7 @@ impl<T> ReadHalf<T> {
/// Checks if this `ReadHalf` and some `WriteHalf` were split from the same
/// stream.
pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool {
- other.is_pair_of(&self)
+ other.is_pair_of(self)
}
/// Reunites with a previously split `WriteHalf`.
@@ -90,7 +90,7 @@ impl<T> ReadHalf<T> {
}
impl<T> WriteHalf<T> {
- /// Check if this `WriteHalf` and some `ReadHalf` were split from the same
+ /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same
/// stream.
pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs
index d21c842..7e4a198 100644
--- a/src/io/stdio_common.rs
+++ b/src/io/stdio_common.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
/// if buffer contents seems to be utf8. Otherwise it only trims buffer down to MAX_BUF.
/// That's why, wrapped writer will always receive well-formed utf-8 bytes.
/// # Other platforms
-/// passes data to `inner` as is
+/// Passes data to `inner` as is.
#[derive(Debug)]
pub(crate) struct SplitByUtf8BoundaryIfWindows<W> {
inner: W,
@@ -52,10 +52,10 @@ where
buf = &buf[..crate::io::blocking::MAX_BUF];
- // Now there are two possibilites.
+ // Now there are two possibilities.
// If caller gave is binary buffer, we **should not** shrink it
// anymore, because excessive shrinking hits performance.
- // If caller gave as binary buffer, we **must** additionaly
+ // If caller gave as binary buffer, we **must** additionally
// shrink it to strip incomplete char at the end of buffer.
// that's why check we will perform now is allowed to have
// false-positive.
diff --git a/src/io/util/async_buf_read_ext.rs b/src/io/util/async_buf_read_ext.rs
index 233ac31..b241e35 100644
--- a/src/io/util/async_buf_read_ext.rs
+++ b/src/io/util/async_buf_read_ext.rs
@@ -1,3 +1,4 @@
+use crate::io::util::fill_buf::{fill_buf, FillBuf};
use crate::io::util::lines::{lines, Lines};
use crate::io::util::read_line::{read_line, ReadLine};
use crate::io::util::read_until::{read_until, ReadUntil};
@@ -36,6 +37,18 @@ cfg_io_util! {
/// [`fill_buf`]: AsyncBufRead::poll_fill_buf
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
///
+ /// # Cancel safety
+ ///
+ /// If the method is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then some data may have been partially read. Any
+ /// partially read bytes are appended to `buf`, and the method can be
+ /// called again to continue reading until `byte`.
+ ///
+ /// This method returns the total number of bytes read. If you cancel
+ /// the call to `read_until` and then call it again to continue reading,
+ /// the counter is reset.
+ ///
/// # Examples
///
/// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
@@ -114,6 +127,30 @@ cfg_io_util! {
///
/// [`read_until`]: AsyncBufReadExt::read_until
///
+ /// # Cancel safety
+ ///
+ /// This method is not cancellation safe. If the method is used as the
+ /// event in a [`tokio::select!`](crate::select) statement and some
+ /// other branch completes first, then some data may have been partially
+ /// read, and this data is lost. There are no guarantees regarding the
+ /// contents of `buf` when the call is cancelled. The current
+ /// implementation replaces `buf` with the empty string, but this may
+ /// change in the future.
+ ///
+ /// This function does not behave like [`read_until`] because of the
+ /// requirement that a string contains only valid utf-8. If you need a
+ /// cancellation safe `read_line`, there are three options:
+ ///
+ /// * Call [`read_until`] with a newline character and manually perform the utf-8 check.
+ /// * The stream returned by [`lines`] has a cancellation safe
+ /// [`next_line`] method.
+ /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec].
+ ///
+ /// [LinesCodec]: https://docs.rs/tokio-util/0.6/tokio_util/codec/struct.LinesCodec.html
+ /// [`read_until`]: Self::read_until
+ /// [`lines`]: Self::lines
+ /// [`next_line`]: crate::io::Lines::next_line
+ ///
/// # Examples
///
/// [`std::io::Cursor`][`Cursor`] is a type that implements
@@ -173,10 +210,11 @@ cfg_io_util! {
/// [`BufRead::split`](std::io::BufRead::split).
///
/// The stream returned from this function will yield instances of
- /// [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have
+ /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have
/// the delimiter byte at the end.
///
/// [`io::Result`]: std::io::Result
+ /// [`Option`]: core::option::Option
/// [`Vec<u8>`]: std::vec::Vec
///
/// # Errors
@@ -206,14 +244,68 @@ cfg_io_util! {
split(self, byte)
}
+ /// Returns the contents of the internal buffer, filling it with more
+ /// data from the inner reader if it is empty.
+ ///
+ /// This function is a lower-level call. It needs to be paired with the
+ /// [`consume`] method to function properly. When calling this method,
+ /// none of the contents will be "read" in the sense that later calling
+ /// `read` may return the same contents. As such, [`consume`] must be
+ /// called with the number of bytes that are consumed from this buffer
+ /// to ensure that the bytes are never returned twice.
+ ///
+ /// An empty buffer returned indicates that the stream has reached EOF.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn fill_buf(&mut self) -> io::Result<&[u8]>;
+ /// ```
+ ///
+ /// # Errors
+ ///
+ /// This function will return an I/O error if the underlying reader was
+ /// read, but returned an error.
+ ///
+ /// [`consume`]: crate::io::AsyncBufReadExt::consume
+ fn fill_buf(&mut self) -> FillBuf<'_, Self>
+ where
+ Self: Unpin,
+ {
+ fill_buf(self)
+ }
+
+ /// Tells this buffer that `amt` bytes have been consumed from the
+ /// buffer, so they should no longer be returned in calls to [`read`].
+ ///
+ /// This function is a lower-level call. It needs to be paired with the
+ /// [`fill_buf`] method to function properly. This function does not
+ /// perform any I/O, it simply informs this object that some amount of
+ /// its buffer, returned from [`fill_buf`], has been consumed and should
+ /// no longer be returned. As such, this function may do odd things if
+ /// [`fill_buf`] isn't called before calling it.
+ ///
+ /// The `amt` must be less than the number of bytes in the buffer
+ /// returned by [`fill_buf`].
+ ///
+ /// [`read`]: crate::io::AsyncReadExt::read
+ /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf
+ fn consume(&mut self, amt: usize)
+ where
+ Self: Unpin,
+ {
+ std::pin::Pin::new(self).consume(amt)
+ }
+
/// Returns a stream over the lines of this reader.
/// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
///
/// The stream returned from this function will yield instances of
- /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
+ /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline
/// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
///
/// [`io::Result`]: std::io::Result
+ /// [`Option`]: core::option::Option
/// [`String`]: String
///
/// # Errors
diff --git a/src/io/util/async_read_ext.rs b/src/io/util/async_read_ext.rs
index e715f9d..df5445c 100644
--- a/src/io/util/async_read_ext.rs
+++ b/src/io/util/async_read_ext.rs
@@ -2,6 +2,7 @@ use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
+use crate::io::util::read_int::{ReadF32, ReadF32Le, ReadF64, ReadF64Le};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
};
@@ -105,8 +106,10 @@ cfg_io_util! {
/// async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
/// ```
///
- /// This function does not provide any guarantees about whether it
- /// completes immediately or asynchronously
+ /// This method does not provide any guarantees about whether it
+ /// completes immediately or asynchronously.
+ ///
+ /// # Return
///
/// If the return value of this method is `Ok(n)`, then it must be
/// guaranteed that `0 <= n <= buf.len()`. A nonzero `n` value indicates
@@ -136,6 +139,12 @@ cfg_io_util! {
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If you use it as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that no data was read.
+ ///
/// # Examples
///
/// [`File`][crate::fs::File]s implement `Read`:
@@ -175,14 +184,19 @@ cfg_io_util! {
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
- /// This function does not provide any guarantees about whether it
- /// completes immediately or asynchronously
+ /// This method does not provide any guarantees about whether it
+ /// completes immediately or asynchronously.
///
/// # Return
///
- /// On a successful read, the number of read bytes is returned. If the
- /// supplied buffer is not empty and the function returns `Ok(0)` then
- /// the source has reached an "end-of-file" event.
+ /// A nonzero `n` value indicates that the buffer `buf` has been filled
+ /// in with `n` bytes of data from this source. If `n` is `0`, then it
+ /// can indicate one of two scenarios:
+ ///
+ /// 1. This reader has reached its "end of file" and will likely no longer
+ /// be able to produce bytes. Note that this does not mean that the
+ /// reader will *always* no longer be able to produce bytes.
+ /// 2. The buffer specified had a remaining capacity of zero.
///
/// # Errors
///
@@ -190,6 +204,12 @@ cfg_io_util! {
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If you use it as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that no data was read.
+ ///
/// # Examples
///
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
@@ -254,6 +274,13 @@ cfg_io_util! {
/// it has read, but it will never read more than would be necessary to
/// completely fill the buffer.
///
+ /// # Cancel safety
+ ///
+ /// This method is not cancellation safe. If the method is used as the
+ /// event in a [`tokio::select!`](crate::select) statement and some
+ /// other branch completes first, then some data may already have been
+ /// read into `buf`.
+ ///
/// # Examples
///
/// [`File`][crate::fs::File]s implement `Read`:
@@ -579,7 +606,7 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut reader = Cursor::new(vec![0x80, 0, 0, 0, 0, 0, 0, 0]);
///
- /// assert_eq!(i64::min_value(), reader.read_i64().await?);
+ /// assert_eq!(i64::MIN, reader.read_i64().await?);
/// Ok(())
/// }
/// ```
@@ -659,12 +686,88 @@ cfg_io_util! {
/// 0, 0, 0, 0, 0, 0, 0, 0
/// ]);
///
- /// assert_eq!(i128::min_value(), reader.read_i128().await?);
+ /// assert_eq!(i128::MIN, reader.read_i128().await?);
/// Ok(())
/// }
/// ```
fn read_i128(&mut self) -> ReadI128;
+ /// Reads an 32-bit floating point type in big-endian order from the
+ /// underlying reader.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn read_f32(&mut self) -> io::Result<f32>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered reader to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncReadExt::read_exact`].
+ ///
+ /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact
+ ///
+ /// # Examples
+ ///
+ /// Read 32-bit floating point type from a `AsyncRead`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncReadExt};
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut reader = Cursor::new(vec![0xff, 0x7f, 0xff, 0xff]);
+ ///
+ /// assert_eq!(f32::MIN, reader.read_f32().await?);
+ /// Ok(())
+ /// }
+ /// ```
+ fn read_f32(&mut self) -> ReadF32;
+
+ /// Reads an 64-bit floating point type in big-endian order from the
+ /// underlying reader.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn read_f64(&mut self) -> io::Result<f64>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered reader to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncReadExt::read_exact`].
+ ///
+ /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact
+ ///
+ /// # Examples
+ ///
+ /// Read 64-bit floating point type from a `AsyncRead`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncReadExt};
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut reader = Cursor::new(vec![
+ /// 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
+ /// ]);
+ ///
+ /// assert_eq!(f64::MIN, reader.read_f64().await?);
+ /// Ok(())
+ /// }
+ /// ```
+ fn read_f64(&mut self) -> ReadF64;
+
/// Reads an unsigned 16-bit integer in little-endian order from the
/// underlying reader.
///
@@ -971,6 +1074,82 @@ cfg_io_util! {
/// }
/// ```
fn read_i128_le(&mut self) -> ReadI128Le;
+
+ /// Reads an 32-bit floating point type in little-endian order from the
+ /// underlying reader.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn read_f32_le(&mut self) -> io::Result<f32>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered reader to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncReadExt::read_exact`].
+ ///
+ /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact
+ ///
+ /// # Examples
+ ///
+ /// Read 32-bit floating point type from a `AsyncRead`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncReadExt};
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut reader = Cursor::new(vec![0xff, 0xff, 0x7f, 0xff]);
+ ///
+ /// assert_eq!(f32::MIN, reader.read_f32_le().await?);
+ /// Ok(())
+ /// }
+ /// ```
+ fn read_f32_le(&mut self) -> ReadF32Le;
+
+ /// Reads an 64-bit floating point type in little-endian order from the
+ /// underlying reader.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn read_f64_le(&mut self) -> io::Result<f64>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered reader to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncReadExt::read_exact`].
+ ///
+ /// [`AsyncReadExt::read_exact`]: AsyncReadExt::read_exact
+ ///
+ /// # Examples
+ ///
+ /// Read 64-bit floating point type from a `AsyncRead`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncReadExt};
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut reader = Cursor::new(vec![
+ /// 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef, 0xff
+ /// ]);
+ ///
+ /// assert_eq!(f64::MIN, reader.read_f64_le().await?);
+ /// Ok(())
+ /// }
+ /// ```
+ fn read_f64_le(&mut self) -> ReadF64Le;
}
/// Reads all bytes until EOF in this source, placing them into `buf`.
diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs
index 297a4a6..46b3e6c 100644
--- a/src/io/util/async_seek_ext.rs
+++ b/src/io/util/async_seek_ext.rs
@@ -67,6 +67,16 @@ cfg_io_util! {
seek(self, pos)
}
+ /// Creates a future which will rewind to the beginning of the stream.
+ ///
+ /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`.
+ fn rewind(&mut self) -> Seek<'_, Self>
+ where
+ Self: Unpin,
+ {
+ self.seek(SeekFrom::Start(0))
+ }
+
/// Creates a future which will return the current seek position from the
/// start of the stream.
///
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index d011d82..93a3183 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -2,7 +2,9 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
+use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf};
use crate::io::util::write_buf::{write_buf, WriteBuf};
+use crate::io::util::write_int::{WriteF32, WriteF32Le, WriteF64, WriteF64Le};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
WriteI8,
@@ -18,7 +20,7 @@ use std::io::IoSlice;
use bytes::Buf;
cfg_io_util! {
- /// Defines numeric writer
+ /// Defines numeric writer.
macro_rules! write_impl {
(
$(
@@ -96,6 +98,13 @@ cfg_io_util! {
/// It is **not** considered an error if the entire buffer could not be
/// written to this writer.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe in the sense that if it is used as
+ /// the event in a [`tokio::select!`](crate::select) statement and some
+ /// other branch completes first, then it is guaranteed that no data was
+ /// written to this `AsyncWrite`.
+ ///
/// # Examples
///
/// ```no_run
@@ -128,6 +137,13 @@ cfg_io_util! {
///
/// See [`AsyncWrite::poll_write_vectored`] for more details.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe in the sense that if it is used as
+ /// the event in a [`tokio::select!`](crate::select) statement and some
+ /// other branch completes first, then it is guaranteed that no data was
+ /// written to this `AsyncWrite`.
+ ///
/// # Examples
///
/// ```no_run
@@ -159,7 +175,6 @@ cfg_io_util! {
write_vectored(self, bufs)
}
-
/// Writes a buffer into this writer, advancing the buffer's internal
/// cursor.
///
@@ -195,12 +210,20 @@ cfg_io_util! {
/// It is **not** considered an error if the entire buffer could not be
/// written to this writer.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe in the sense that if it is used as
+ /// the event in a [`tokio::select!`](crate::select) statement and some
+ /// other branch completes first, then it is guaranteed that no data was
+ /// written to this `AsyncWrite`.
+ ///
/// # Examples
///
- /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
@@ -238,6 +261,70 @@ cfg_io_util! {
/// Equivalent to:
///
/// ```ignore
+ /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> {
+ /// while buf.has_remaining() {
+ /// self.write_buf(&mut buf).await?;
+ /// }
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// This method will continuously call [`write`] until
+ /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not
+ /// return until the entire buffer has been successfully written or an error occurs. The
+ /// first error generated will be returned.
+ ///
+ /// The buffer is advanced after each chunk is successfully written. After failure,
+ /// `src.chunk()` will return the chunk that failed to write.
+ ///
+ /// # Cancel safety
+ ///
+ /// If `write_all_buf` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then the data in the provided buffer may have been
+ /// partially written. However, it is guaranteed that the provided
+ /// buffer has been [advanced] by the amount of bytes that have been
+ /// partially written.
+ ///
+ /// # Examples
+ ///
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
+ ///
+ /// [`File`]: crate::fs::File
+ /// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
+ /// [advanced]: bytes::Buf::advance
+ ///
+ /// ```no_run
+ /// use tokio::io::{self, AsyncWriteExt};
+ /// use tokio::fs::File;
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// let mut buffer = Cursor::new(b"data to write");
+ ///
+ /// file.write_all_buf(&mut buffer).await?;
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// [`write`]: AsyncWriteExt::write
+ fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B>
+ where
+ Self: Sized + Unpin,
+ B: Buf,
+ {
+ write_all_buf(self, src)
+ }
+
+ /// Attempts to write an entire buffer into this writer.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
/// async fn write_all(&mut self, buf: &[u8]) -> io::Result<()>;
/// ```
///
@@ -246,6 +333,14 @@ cfg_io_util! {
/// has been successfully written or such an error occurs. The first
/// error generated from this method will be returned.
///
+ /// # Cancel safety
+ ///
+ /// This method is not cancellation safe. If it is used as the event
+ /// in a [`tokio::select!`](crate::select) statement and some other
+ /// branch completes first, then the provided buffer may have been
+ /// partially written, but future calls to `write_all` will start over
+ /// from the beginning of the buffer.
+ ///
/// # Errors
///
/// This function will return the first error that [`write`] returns.
@@ -258,9 +353,9 @@ cfg_io_util! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut buffer = File::create("foo.txt").await?;
+ /// let mut file = File::create("foo.txt").await?;
///
- /// buffer.write_all(b"some bytes").await?;
+ /// file.write_all(b"some bytes").await?;
/// Ok(())
/// }
/// ```
@@ -567,8 +662,8 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut writer = Vec::new();
///
- /// writer.write_i64(i64::min_value()).await?;
- /// writer.write_i64(i64::max_value()).await?;
+ /// writer.write_i64(i64::MIN).await?;
+ /// writer.write_i64(i64::MAX).await?;
///
/// assert_eq!(writer, b"\x80\x00\x00\x00\x00\x00\x00\x00\x7f\xff\xff\xff\xff\xff\xff\xff");
/// Ok(())
@@ -645,7 +740,7 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut writer = Vec::new();
///
- /// writer.write_i128(i128::min_value()).await?;
+ /// writer.write_i128(i128::MIN).await?;
///
/// assert_eq!(writer, vec![
/// 0x80, 0, 0, 0, 0, 0, 0, 0,
@@ -656,6 +751,81 @@ cfg_io_util! {
/// ```
fn write_i128(&mut self, n: i128) -> WriteI128;
+ /// Writes an 32-bit floating point type in big-endian order to the
+ /// underlying writer.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_f32(&mut self, n: f32) -> io::Result<()>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered writer to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncWriteExt::write_all`].
+ ///
+ /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all
+ ///
+ /// # Examples
+ ///
+ /// Write 32-bit floating point type to a `AsyncWrite`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncWriteExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut writer = Vec::new();
+ ///
+ /// writer.write_f32(f32::MIN).await?;
+ ///
+ /// assert_eq!(writer, vec![0xff, 0x7f, 0xff, 0xff]);
+ /// Ok(())
+ /// }
+ /// ```
+ fn write_f32(&mut self, n: f32) -> WriteF32;
+
+ /// Writes an 64-bit floating point type in big-endian order to the
+ /// underlying writer.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_f64(&mut self, n: f64) -> io::Result<()>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered writer to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncWriteExt::write_all`].
+ ///
+ /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all
+ ///
+ /// # Examples
+ ///
+ /// Write 64-bit floating point type to a `AsyncWrite`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncWriteExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut writer = Vec::new();
+ ///
+ /// writer.write_f64(f64::MIN).await?;
+ ///
+ /// assert_eq!(writer, vec![
+ /// 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
+ /// ]);
+ /// Ok(())
+ /// }
+ /// ```
+ fn write_f64(&mut self, n: f64) -> WriteF64;
/// Writes an unsigned 16-bit integer in little-endian order to the
/// underlying writer.
@@ -876,8 +1046,8 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut writer = Vec::new();
///
- /// writer.write_i64_le(i64::min_value()).await?;
- /// writer.write_i64_le(i64::max_value()).await?;
+ /// writer.write_i64_le(i64::MIN).await?;
+ /// writer.write_i64_le(i64::MAX).await?;
///
/// assert_eq!(writer, b"\x00\x00\x00\x00\x00\x00\x00\x80\xff\xff\xff\xff\xff\xff\xff\x7f");
/// Ok(())
@@ -954,7 +1124,7 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut writer = Vec::new();
///
- /// writer.write_i128_le(i128::min_value()).await?;
+ /// writer.write_i128_le(i128::MIN).await?;
///
/// assert_eq!(writer, vec![
/// 0, 0, 0, 0, 0, 0, 0,
@@ -964,6 +1134,82 @@ cfg_io_util! {
/// }
/// ```
fn write_i128_le(&mut self, n: i128) -> WriteI128Le;
+
+ /// Writes an 32-bit floating point type in little-endian order to the
+ /// underlying writer.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_f32_le(&mut self, n: f32) -> io::Result<()>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered writer to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncWriteExt::write_all`].
+ ///
+ /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all
+ ///
+ /// # Examples
+ ///
+ /// Write 32-bit floating point type to a `AsyncWrite`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncWriteExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut writer = Vec::new();
+ ///
+ /// writer.write_f32_le(f32::MIN).await?;
+ ///
+ /// assert_eq!(writer, vec![0xff, 0xff, 0x7f, 0xff]);
+ /// Ok(())
+ /// }
+ /// ```
+ fn write_f32_le(&mut self, n: f32) -> WriteF32Le;
+
+ /// Writes an 64-bit floating point type in little-endian order to the
+ /// underlying writer.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_f64_le(&mut self, n: f64) -> io::Result<()>;
+ /// ```
+ ///
+ /// It is recommended to use a buffered writer to avoid excessive
+ /// syscalls.
+ ///
+ /// # Errors
+ ///
+ /// This method returns the same errors as [`AsyncWriteExt::write_all`].
+ ///
+ /// [`AsyncWriteExt::write_all`]: AsyncWriteExt::write_all
+ ///
+ /// # Examples
+ ///
+ /// Write 64-bit floating point type to a `AsyncWrite`:
+ ///
+ /// ```rust
+ /// use tokio::io::{self, AsyncWriteExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut writer = Vec::new();
+ ///
+ /// writer.write_f64_le(f64::MIN).await?;
+ ///
+ /// assert_eq!(writer, vec![
+ /// 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef, 0xff
+ /// ]);
+ /// Ok(())
+ /// }
+ /// ```
+ fn write_f64_le(&mut self, n: f64) -> WriteF64Le;
}
/// Flushes this output stream, ensuring that all intermediately buffered
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 271f61b..7df610b 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -1,11 +1,11 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
-use std::io;
+use std::io::{self, IoSlice, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::{cmp, fmt};
+use std::{cmp, fmt, mem};
pin_project! {
/// The `BufReader` struct adds buffering to any reader.
@@ -30,6 +30,7 @@ pin_project! {
pub(super) buf: Box<[u8]>,
pub(super) pos: usize,
pub(super) cap: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> {
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
+ seek_state: SeekState::Init,
}
}
@@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch.
+ PendingOverflowed(i64),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seeks to an offset, in bytes, in the underlying reader.
+///
+/// The position used for seeking with `SeekFrom::Current(_)` is the
+/// position the underlying reader would be at if the `BufReader` had no
+/// internal buffer.
+///
+/// Seeking always discards the internal buffer, even if the seek position
+/// would otherwise fall within it. This guarantees that calling
+/// `.into_inner()` immediately after a seek yields the underlying reader
+/// at the same position.
+///
+/// See [`AsyncSeek`] for more details.
+///
+/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+/// where `n` minus the internal buffer length overflows an `i64`, two
+/// seeks will be performed instead of one. If the second seek returns
+/// `Err`, the underlying reader will be left at the same position it would
+/// have if you called `seek` with `SeekFrom::Current(0)`.
+impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We needs to call seek operation multiple times.
+ // And we should always call both start_seek and poll_complete,
+ // as start_seek alone cannot guarantee that the operation will be completed.
+ // poll_complete receives a Context and returns a Poll, so it cannot be called
+ // inside start_seek.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) {
+ SeekState::Init => {
+ // 1.x AsyncSeek recommends calling poll_complete before start_seek.
+ // We don't have to guarantee that the value returned by
+ // poll_complete called without start_seek is correct,
+ // so we'll return 0.
+ return Poll::Ready(Ok(0));
+ }
+ SeekState::Start(SeekFrom::Current(n)) => {
+ let remainder = (self.cap - self.pos) as i64;
+ // it should be safe to assume that remainder fits within an i64 as the alternative
+ // means we managed to allocate 8 exbibytes and that's absurd.
+ // But it's not out of the realm of possibility for some weird underlying reader to
+ // support seeking by i64::MIN so we need to handle underflow when subtracting
+ // remainder.
+ if let Some(offset) = n.checked_sub(remainder) {
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(offset))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ } else {
+ // seek backwards by our remainder, and then by the offset
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(-remainder))?;
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ }
+ SeekState::PendingOverflowed(n) => {
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Start(pos) => {
+ // Seeking with Start/End doesn't care about our buffer length.
+ self.as_mut().get_pin_mut().start_seek(pos)?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?,
+ };
+
+ match res {
+ Poll::Ready(res) => {
+ self.discard_buffer();
+ Poll::Ready(Ok(res))
+ }
+ Poll::Pending => {
+ *self.as_mut().project().seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
@@ -150,6 +268,18 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
self.get_pin_mut().poll_write(cx, buf)
}
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.get_pin_mut().poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.get_ref().is_write_vectored()
+ }
+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}
diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs
index cc857e2..595c142 100644
--- a/src/io/util/buf_stream.rs
+++ b/src/io/util/buf_stream.rs
@@ -1,8 +1,8 @@
use crate::io::util::{BufReader, BufWriter};
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
-use std::io;
+use std::io::{self, IoSlice, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
buf: wbuf,
written,
+ seek_state: wseek_state,
} = b;
BufStream {
@@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
inner,
buf: wbuf,
written,
+ seek_state: wseek_state,
},
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
}
}
@@ -123,6 +127,18 @@ impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
self.project().inner.poll_write(cx, buf)
}
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
@@ -142,6 +158,34 @@ impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
}
}
+/// Seek to an offset, in bytes, in the underlying stream.
+///
+/// The position used for seeking with `SeekFrom::Current(_)` is the
+/// position the underlying stream would be at if the `BufStream` had no
+/// internal buffer.
+///
+/// Seeking always discards the internal buffer, even if the seek position
+/// would otherwise fall within it. This guarantees that calling
+/// `.into_inner()` immediately after a seek yields the underlying reader
+/// at the same position.
+///
+/// See [`AsyncSeek`] for more details.
+///
+/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+/// where `n` minus the internal buffer length overflows an `i64`, two
+/// seeks will be performed instead of one. If the second seek returns
+/// `Err`, the underlying reader will be left at the same position it would
+/// have if you called `seek` with `SeekFrom::Current(0)`.
+impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
+ fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
+ self.project().inner.start_seek(position)
+ }
+
+ fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ self.project().inner.poll_complete(cx)
+ }
+}
+
impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs
index 5e3d4b7..8dd1bba 100644
--- a/src/io/util/buf_writer.rs
+++ b/src/io/util/buf_writer.rs
@@ -1,9 +1,9 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
use std::fmt;
-use std::io::{self, Write};
+use std::io::{self, IoSlice, SeekFrom, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -34,6 +34,7 @@ pin_project! {
pub(super) inner: W,
pub(super) buf: Vec<u8>,
pub(super) written: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> {
inner,
buf: Vec::with_capacity(cap),
written: 0,
+ seek_state: SeekState::Init,
}
}
@@ -131,6 +133,72 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ if self.inner.is_write_vectored() {
+ let total_len = bufs
+ .iter()
+ .fold(0usize, |acc, b| acc.saturating_add(b.len()));
+ if total_len > self.buf.capacity() - self.buf.len() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ let me = self.as_mut().project();
+ if total_len >= me.buf.capacity() {
+ // It's more efficient to pass the slices directly to the
+ // underlying writer than to buffer them.
+ // The case when the total_len calculation saturates at
+ // usize::MAX is also handled here.
+ me.inner.poll_write_vectored(cx, bufs)
+ } else {
+ bufs.iter().for_each(|b| me.buf.extend_from_slice(b));
+ Poll::Ready(Ok(total_len))
+ }
+ } else {
+ // Remove empty buffers at the beginning of bufs.
+ while bufs.first().map(|buf| buf.len()) == Some(0) {
+ bufs = &bufs[1..];
+ }
+ if bufs.is_empty() {
+ return Poll::Ready(Ok(0));
+ }
+ // Flush if the first buffer doesn't fit.
+ let first_len = bufs[0].len();
+ if first_len > self.buf.capacity() - self.buf.len() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ debug_assert!(self.buf.is_empty());
+ }
+ let me = self.as_mut().project();
+ if first_len >= me.buf.capacity() {
+ // The slice is at least as large as the buffering capacity,
+ // so it's better to write it directly, bypassing the buffer.
+ debug_assert!(me.buf.is_empty());
+ return me.inner.poll_write(cx, &bufs[0]);
+ } else {
+ me.buf.extend_from_slice(&bufs[0]);
+ bufs = &bufs[1..];
+ }
+ let mut total_written = first_len;
+ debug_assert!(total_written != 0);
+ // Append the buffers that fit in the internal buffer.
+ for buf in bufs {
+ if buf.len() > me.buf.capacity() - me.buf.len() {
+ break;
+ } else {
+ me.buf.extend_from_slice(buf);
+ total_written += buf.len();
+ }
+ }
+ Poll::Ready(Ok(total_written))
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ true
+ }
+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.as_mut().flush_buf(cx))?;
self.get_pin_mut().poll_flush(cx)
@@ -142,6 +210,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seek to the offset, in bytes, in the underlying writer.
+///
+/// Seeking always writes out the internal buffer before seeking.
+impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We need to flush the internal buffer before seeking.
+ // It receives a `Context` and returns a `Poll`, so it cannot be called
+ // inside `start_seek`.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let pos = match self.seek_state {
+ SeekState::Init => {
+ return self.project().inner.poll_complete(cx);
+ }
+ SeekState::Start(pos) => Some(pos),
+ SeekState::Pending => None,
+ };
+
+ // Flush the internal buffer before seeking.
+ ready!(self.as_mut().flush_buf(cx))?;
+
+ let mut me = self.project();
+ if let Some(pos) = pos {
+ // Ensure previous seeks have finished before starting a new one
+ ready!(me.inner.as_mut().poll_complete(cx))?;
+ if let Err(e) = me.inner.as_mut().start_seek(pos) {
+ *me.seek_state = SeekState::Init;
+ return Poll::Ready(Err(e));
+ }
+ }
+ match me.inner.poll_complete(cx) {
+ Poll::Ready(res) => {
+ *me.seek_state = SeekState::Init;
+ Poll::Ready(res)
+ }
+ Poll::Pending => {
+ *me.seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs
index 3cd425b..d0ab7cb 100644
--- a/src/io/util/copy.rs
+++ b/src/io/util/copy.rs
@@ -8,6 +8,7 @@ use std::task::{Context, Poll};
#[derive(Debug)]
pub(super) struct CopyBuffer {
read_done: bool,
+ need_flush: bool,
pos: usize,
cap: usize,
amt: u64,
@@ -18,10 +19,11 @@ impl CopyBuffer {
pub(super) fn new() -> Self {
Self {
read_done: false,
+ need_flush: false,
pos: 0,
cap: 0,
amt: 0,
- buf: vec![0; 2048].into_boxed_slice(),
+ buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(),
}
}
@@ -41,7 +43,22 @@ impl CopyBuffer {
if self.pos == self.cap && !self.read_done {
let me = &mut *self;
let mut buf = ReadBuf::new(&mut me.buf);
- ready!(reader.as_mut().poll_read(cx, &mut buf))?;
+
+ match reader.as_mut().poll_read(cx, &mut buf) {
+ Poll::Ready(Ok(_)) => (),
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => {
+ // Try flushing when the reader has no progress to avoid deadlock
+ // when the reader depends on buffered writer.
+ if self.need_flush {
+ ready!(writer.as_mut().poll_flush(cx))?;
+ self.need_flush = false;
+ }
+
+ return Poll::Pending;
+ }
+ }
+
let n = buf.filled().len();
if n == 0 {
self.read_done = true;
@@ -63,9 +80,18 @@ impl CopyBuffer {
} else {
self.pos += i;
self.amt += i as u64;
+ self.need_flush = true;
}
}
+ // If pos larger than cap, this loop will never stop.
+ // In particular, user's wrong poll_write implementation returning
+ // incorrect written length may lead to thread blocking.
+ debug_assert!(
+ self.pos <= self.cap,
+ "writer returned length larger than input slice"
+ );
+
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs
index cc43f0f..c93060b 100644
--- a/src/io/util/copy_bidirectional.rs
+++ b/src/io/util/copy_bidirectional.rs
@@ -104,6 +104,7 @@ where
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs
new file mode 100644
index 0000000..3655c01
--- /dev/null
+++ b/src/io/util/fill_buf.rs
@@ -0,0 +1,53 @@
+use crate::io::AsyncBufRead;
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Future for the [`fill_buf`](crate::io::AsyncBufReadExt::fill_buf) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct FillBuf<'a, R: ?Sized> {
+ reader: Option<&'a mut R>,
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+pub(crate) fn fill_buf<R>(reader: &mut R) -> FillBuf<'_, R>
+where
+ R: AsyncBufRead + ?Sized + Unpin,
+{
+ FillBuf {
+ reader: Some(reader),
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> {
+ type Output = io::Result<&'a [u8]>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ let reader = me.reader.take().expect("Polled after completion.");
+ match Pin::new(&mut *reader).poll_fill_buf(cx) {
+ Poll::Ready(Ok(slice)) => unsafe {
+ // Safety: This is necessary only due to a limitation in the
+ // borrow checker. Once Rust starts using the polonius borrow
+ // checker, this can be simplified.
+ let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice);
+ Poll::Ready(Ok(slice))
+ },
+ Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
+ Poll::Pending => {
+ *me.reader = Some(reader);
+ Poll::Pending
+ }
+ }
+ }
+}
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index ed6a944..717f633 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -8,7 +8,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
- /// Read lines from an [`AsyncBufRead`].
+ /// Reads lines from an [`AsyncBufRead`].
///
/// A `Lines` can be turned into a `Stream` with [`LinesStream`].
///
@@ -47,6 +47,10 @@ where
{
/// Returns the next line in the stream.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe.
+ ///
/// # Examples
///
/// ```
@@ -68,12 +72,12 @@ where
poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
}
- /// Obtain a mutable reference to the underlying reader
+ /// Obtains a mutable reference to the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
- /// Obtain a reference to the underlying reader
+ /// Obtains a reference to the underlying reader.
pub fn get_ref(&mut self) -> &R {
&self.reader
}
@@ -102,11 +106,9 @@ where
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when more bytes become
- /// available on the underlying IO resource.
- ///
- /// Note that on multiple calls to `poll_next_line`, only the `Waker` from
- /// the `Context` passed to the most recent call is scheduled to receive a
- /// wakeup.
+ /// available on the underlying IO resource. Note that on multiple calls to
+ /// `poll_next_line`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup.
pub fn poll_next_line(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -128,7 +130,7 @@ where
}
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, String::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs
index e91a932..4eefe7b 100644
--- a/src/io/util/mem.rs
+++ b/src/io/util/mem.rs
@@ -16,6 +16,14 @@ use std::{
/// that can be used as in-memory IO types. Writing to one of the pairs will
/// allow that data to be read from the other, and vice versa.
///
+/// # Closing a `DuplexStream`
+///
+/// If one end of the `DuplexStream` channel is dropped, any pending reads on
+/// the other side will continue to read data until the buffer is drained, then
+/// they will signal EOF by returning 0 bytes. Any writes to the other side,
+/// including pending ones (that are waiting for free space in the buffer) will
+/// return `Err(BrokenPipe)` immediately.
+///
/// # Example
///
/// ```
@@ -37,6 +45,7 @@ use std::{
/// # }
/// ```
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
@@ -72,6 +81,7 @@ struct Pipe {
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a side before the write returns `Poll::Pending`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
@@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream {
impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
- self.write.lock().close();
+ self.write.lock().close_write();
+ self.read.lock().close_read();
}
}
@@ -151,12 +162,21 @@ impl Pipe {
}
}
- fn close(&mut self) {
+ fn close_write(&mut self) {
self.is_closed = true;
+ // needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
}
+
+ fn close_read(&mut self) {
+ self.is_closed = true;
+ // needs to notify any writers that they have to abort
+ if let Some(waker) = self.write_waker.take() {
+ waker.wake();
+ }
+ }
}
impl AsyncRead for Pipe {
@@ -217,7 +237,7 @@ impl AsyncWrite for Pipe {
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
- self.close();
+ self.close_write();
Poll::Ready(Ok(()))
}
}
diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs
index ab38664..21199d0 100644
--- a/src/io/util/mod.rs
+++ b/src/io/util/mod.rs
@@ -49,6 +49,7 @@ cfg_io_util! {
mod read_exact;
mod read_int;
mod read_line;
+ mod fill_buf;
mod read_to_end;
mod vec_with_initialized;
@@ -77,6 +78,7 @@ cfg_io_util! {
mod write_vectored;
mod write_all;
mod write_buf;
+ mod write_all_buf;
mod write_int;
diff --git a/src/io/util/read_int.rs b/src/io/util/read_int.rs
index 5b9fb7b..164dcf5 100644
--- a/src/io/util/read_int.rs
+++ b/src/io/util/read_int.rs
@@ -142,6 +142,9 @@ reader!(ReadI32, i32, get_i32);
reader!(ReadI64, i64, get_i64);
reader!(ReadI128, i128, get_i128);
+reader!(ReadF32, f32, get_f32);
+reader!(ReadF64, f64, get_f64);
+
reader!(ReadU16Le, u16, get_u16_le);
reader!(ReadU32Le, u32, get_u32_le);
reader!(ReadU64Le, u64, get_u64_le);
@@ -151,3 +154,6 @@ reader!(ReadI16Le, i16, get_i16_le);
reader!(ReadI32Le, i32, get_i32_le);
reader!(ReadI64Le, i64, get_i64_le);
reader!(ReadI128Le, i128, get_i128_le);
+
+reader!(ReadF32Le, f32, get_f32_le);
+reader!(ReadF64Le, f64, get_f64_le);
diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs
index d38ffaf..e641f51 100644
--- a/src/io/util/read_line.rs
+++ b/src/io/util/read_line.rs
@@ -36,7 +36,7 @@ where
{
ReadLine {
reader,
- buf: mem::replace(string, String::new()).into_bytes(),
+ buf: mem::take(string).into_bytes(),
output: string,
read: 0,
_pin: PhantomPinned,
@@ -99,7 +99,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
read: &mut usize,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
- let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+ let utf8_res = String::from_utf8(mem::take(buf));
// At this point both buf and output are empty. The allocation is in utf8_res.
diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs
index 2c17383..b3d82a2 100644
--- a/src/io/util/read_to_string.rs
+++ b/src/io/util/read_to_string.rs
@@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>(
where
R: AsyncRead + ?Sized + Unpin,
{
- let buf = mem::replace(string, String::new()).into_bytes();
+ let buf = mem::take(string).into_bytes();
ReadToString {
reader,
buf: VecWithInitialized::new(buf),
diff --git a/src/io/util/read_until.rs b/src/io/util/read_until.rs
index 3599cff..90a0e8a 100644
--- a/src/io/util/read_until.rs
+++ b/src/io/util/read_until.rs
@@ -10,12 +10,12 @@ use std::task::{Context, Poll};
pin_project! {
/// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
- /// The delimeter is included in the resulting vector.
+ /// The delimiter is included in the resulting vector.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadUntil<'a, R: ?Sized> {
reader: &'a mut R,
- delimeter: u8,
+ delimiter: u8,
buf: &'a mut Vec<u8>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
@@ -28,7 +28,7 @@ pin_project! {
pub(crate) fn read_until<'a, R>(
reader: &'a mut R,
- delimeter: u8,
+ delimiter: u8,
buf: &'a mut Vec<u8>,
) -> ReadUntil<'a, R>
where
@@ -36,7 +36,7 @@ where
{
ReadUntil {
reader,
- delimeter,
+ delimiter,
buf,
read: 0,
_pin: PhantomPinned,
@@ -46,14 +46,14 @@ where
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
- delimeter: u8,
+ delimiter: u8,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
- if let Some(i) = memchr::memchr(delimeter, available) {
+ if let Some(i) = memchr::memchr(delimiter, available) {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
@@ -74,6 +74,6 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read)
+ read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read)
}
}
diff --git a/src/io/util/split.rs b/src/io/util/split.rs
index 4f3ce4e..7489c24 100644
--- a/src/io/util/split.rs
+++ b/src/io/util/split.rs
@@ -95,7 +95,7 @@ where
let n = ready!(read_until_internal(
me.reader, cx, *me.delim, me.buf, me.read,
))?;
- // read_until_internal resets me.read to zero once it finds the delimeter
+ // read_until_internal resets me.read to zero once it finds the delimiter
debug_assert_eq!(*me.read, 0);
if n == 0 && me.buf.is_empty() {
@@ -106,7 +106,7 @@ where
me.buf.pop();
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs
new file mode 100644
index 0000000..05af7fe
--- /dev/null
+++ b/src/io/util/write_all_buf.rs
@@ -0,0 +1,56 @@
+use crate::io::AsyncWrite;
+
+use bytes::Buf;
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A future to write some of the buffer to an `AsyncWrite`.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct WriteAllBuf<'a, W, B> {
+ writer: &'a mut W,
+ buf: &'a mut B,
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+/// Tries to write some bytes from the given `buf` to the writer in an
+/// asynchronous manner, returning a future.
+pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ WriteAllBuf {
+ writer,
+ buf,
+ _pin: PhantomPinned,
+ }
+}
+
+impl<W, B> Future for WriteAllBuf<'_, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ type Output = io::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let me = self.project();
+ while me.buf.has_remaining() {
+ let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?);
+ me.buf.advance(n);
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/src/io/util/write_int.rs b/src/io/util/write_int.rs
index 13bc191..63cd491 100644
--- a/src/io/util/write_int.rs
+++ b/src/io/util/write_int.rs
@@ -135,6 +135,9 @@ writer!(WriteI32, i32, put_i32);
writer!(WriteI64, i64, put_i64);
writer!(WriteI128, i128, put_i128);
+writer!(WriteF32, f32, put_f32);
+writer!(WriteF64, f64, put_f64);
+
writer!(WriteU16Le, u16, put_u16_le);
writer!(WriteU32Le, u32, put_u32_le);
writer!(WriteU64Le, u64, put_u64_le);
@@ -144,3 +147,6 @@ writer!(WriteI16Le, i16, put_i16_le);
writer!(WriteI32Le, i32, put_i32_le);
writer!(WriteI64Le, i64, put_i64_le);
writer!(WriteI128Le, i128, put_i128_le);
+
+writer!(WriteF32Le, f32, put_f32_le);
+writer!(WriteF64Le, f64, put_f64_le);