aboutsummaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/io')
-rw-r--r--src/io/async_fd.rs34
-rw-r--r--src/io/blocking.rs3
-rw-r--r--src/io/bsd/poll_aio.rs6
-rw-r--r--src/io/driver/mod.rs354
-rw-r--r--src/io/driver/platform.rs44
-rw-r--r--src/io/driver/registration.rs262
-rw-r--r--src/io/driver/scheduled_io.rs533
-rw-r--r--src/io/interest.rs (renamed from src/io/driver/interest.rs)4
-rw-r--r--src/io/mod.rs16
-rw-r--r--src/io/poll_evented.rs88
-rw-r--r--src/io/read_buf.rs42
-rw-r--r--src/io/ready.rs (renamed from src/io/driver/ready.rs)2
-rw-r--r--src/io/split.rs1
-rw-r--r--src/io/stdio_common.rs6
-rw-r--r--src/io/util/async_seek_ext.rs2
-rw-r--r--src/io/util/async_write_ext.rs10
-rw-r--r--src/io/util/buf_reader.rs3
-rw-r--r--src/io/util/copy.rs62
-rw-r--r--src/io/util/empty.rs20
-rw-r--r--src/io/util/fill_buf.rs6
-rw-r--r--src/io/util/mem.rs64
-rw-r--r--src/io/util/read_exact.rs4
-rw-r--r--src/io/util/take.rs4
-rw-r--r--src/io/util/vec_with_initialized.rs25
-rw-r--r--src/io/util/write_all.rs2
25 files changed, 298 insertions, 1299 deletions
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
index 9ec5b7f..92fc6b3 100644
--- a/src/io/async_fd.rs
+++ b/src/io/async_fd.rs
@@ -1,4 +1,6 @@
-use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
+use crate::io::Interest;
+use crate::runtime::io::{ReadyEvent, Registration};
+use crate::runtime::scheduler;
use mio::unix::SourceFd;
use std::io;
@@ -81,6 +83,7 @@ use std::{task::Context, task::Poll};
///
/// impl AsyncTcpStream {
/// pub fn new(tcp: TcpStream) -> io::Result<Self> {
+/// tcp.set_nonblocking(true)?;
/// Ok(Self {
/// inner: AsyncFd::new(tcp)?,
/// })
@@ -166,12 +169,18 @@ pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
const ALL_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE);
impl<T: AsRawFd> AsyncFd<T> {
- #[inline]
/// Creates an AsyncFd backed by (and taking ownership of) an object
/// implementing [`AsRawFd`]. The backing file descriptor is cached at the
/// time of creation.
///
/// This method must be called in the context of a tokio runtime.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current reactor set, or if the `rt`
+ /// feature flag is not enabled.
+ #[inline]
+ #[track_caller]
pub fn new(inner: T) -> io::Result<Self>
where
T: AsRawFd,
@@ -179,19 +188,26 @@ impl<T: AsRawFd> AsyncFd<T> {
Self::with_interest(inner, ALL_INTEREST)
}
- #[inline]
/// Creates new instance as `new` with additional ability to customize interest,
/// allowing to specify whether file descriptor will be polled for read, write or both.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current reactor set, or if the `rt`
+ /// feature flag is not enabled.
+ #[inline]
+ #[track_caller]
pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
where
T: AsRawFd,
{
- Self::new_with_handle_and_interest(inner, Handle::current(), interest)
+ Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
}
+ #[track_caller]
pub(crate) fn new_with_handle_and_interest(
inner: T,
- handle: Handle,
+ handle: scheduler::Handle,
interest: Interest,
) -> io::Result<Self> {
let fd = inner.as_raw_fd();
@@ -525,7 +541,7 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
#[cfg_attr(docsrs, doc(alias = "with_io"))]
pub fn try_io<R>(
&mut self,
- f: impl FnOnce(&AsyncFd<Inner>) -> io::Result<R>,
+ f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
) -> Result<io::Result<R>, TryIoError> {
let result = f(self.async_fd);
@@ -542,12 +558,12 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
}
/// Returns a shared reference to the inner [`AsyncFd`].
- pub fn get_ref(&self) -> &AsyncFd<Inner> {
+ pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
self.async_fd
}
/// Returns a shared reference to the backing object of the inner [`AsyncFd`].
- pub fn get_inner(&self) -> &Inner {
+ pub fn get_inner(&self) -> &'a Inner {
self.get_ref().get_ref()
}
}
@@ -598,7 +614,7 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
&mut self,
f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
) -> Result<io::Result<R>, TryIoError> {
- let result = f(&mut self.async_fd);
+ let result = f(self.async_fd);
if let Err(e) = result.as_ref() {
if e.kind() == io::ErrorKind::WouldBlock {
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index 1d79ee7..f6db450 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}
-cfg_io_std! {
+cfg_io_blocking! {
impl<T> Blocking<T> {
+ #[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
index f1ac4b2..6ac9e28 100644
--- a/src/io/bsd/poll_aio.rs
+++ b/src/io/bsd/poll_aio.rs
@@ -1,6 +1,8 @@
//! Use POSIX AIO futures with Tokio.
-use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
+use crate::io::interest::Interest;
+use crate::runtime::io::{ReadyEvent, Registration};
+use crate::runtime::scheduler;
use mio::event::Source;
use mio::Registry;
use mio::Token;
@@ -117,7 +119,7 @@ impl<E: AioSource> Aio<E> {
fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
- let handle = Handle::current();
+ let handle = scheduler::Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
deleted file mode 100644
index 19f67a2..0000000
--- a/src/io/driver/mod.rs
+++ /dev/null
@@ -1,354 +0,0 @@
-#![cfg_attr(not(feature = "rt"), allow(dead_code))]
-
-mod interest;
-#[allow(unreachable_pub)]
-pub use interest::Interest;
-
-mod ready;
-#[allow(unreachable_pub)]
-pub use ready::Ready;
-
-mod registration;
-pub(crate) use registration::Registration;
-
-mod scheduled_io;
-use scheduled_io::ScheduledIo;
-
-use crate::park::{Park, Unpark};
-use crate::util::slab::{self, Slab};
-use crate::{loom::sync::Mutex, util::bit};
-
-use std::fmt;
-use std::io;
-use std::sync::{Arc, Weak};
-use std::time::Duration;
-
-/// I/O driver, backed by Mio.
-pub(crate) struct Driver {
- /// Tracks the number of times `turn` is called. It is safe for this to wrap
- /// as it is mostly used to determine when to call `compact()`.
- tick: u8,
-
- /// Reuse the `mio::Events` value across calls to poll.
- events: Option<mio::Events>,
-
- /// Primary slab handle containing the state for each resource registered
- /// with this driver. During Drop this is moved into the Inner structure, so
- /// this is an Option to allow it to be vacated (until Drop this is always
- /// Some).
- resources: Option<Slab<ScheduledIo>>,
-
- /// The system event queue.
- poll: mio::Poll,
-
- /// State shared between the reactor and the handles.
- inner: Arc<Inner>,
-}
-
-/// A reference to an I/O driver.
-#[derive(Clone)]
-pub(crate) struct Handle {
- inner: Weak<Inner>,
-}
-
-#[derive(Debug)]
-pub(crate) struct ReadyEvent {
- tick: u8,
- pub(crate) ready: Ready,
-}
-
-pub(super) struct Inner {
- /// Primary slab handle containing the state for each resource registered
- /// with this driver.
- ///
- /// The ownership of this slab is moved into this structure during
- /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
- /// without risking new ones being registered in the meantime.
- resources: Mutex<Option<Slab<ScheduledIo>>>,
-
- /// Registers I/O resources.
- registry: mio::Registry,
-
- /// Allocates `ScheduledIo` handles when creating new resources.
- pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
-
- /// Used to wake up the reactor from a call to `turn`.
- waker: mio::Waker,
-}
-
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-enum Direction {
- Read,
- Write,
-}
-
-enum Tick {
- Set(u8),
- Clear(u8),
-}
-
-// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
-// token.
-const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
-
-const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
-
-// Packs the generation value in the `readiness` field.
-//
-// The generation prevents a race condition where a slab slot is reused for a
-// new socket while the I/O driver is about to apply a readiness event. The
-// generation value is checked when setting new readiness. If the generation do
-// not match, then the readiness event is discarded.
-const GENERATION: bit::Pack = ADDRESS.then(7);
-
-fn _assert_kinds() {
- fn _assert<T: Send + Sync>() {}
-
- _assert::<Handle>();
-}
-
-// ===== impl Driver =====
-
-impl Driver {
- /// Creates a new event loop, returning any error that happened during the
- /// creation.
- pub(crate) fn new() -> io::Result<Driver> {
- let poll = mio::Poll::new()?;
- let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
- let registry = poll.registry().try_clone()?;
-
- let slab = Slab::new();
- let allocator = slab.allocator();
-
- Ok(Driver {
- tick: 0,
- events: Some(mio::Events::with_capacity(1024)),
- poll,
- resources: Some(slab),
- inner: Arc::new(Inner {
- resources: Mutex::new(None),
- registry,
- io_dispatch: allocator,
- waker,
- }),
- })
- }
-
- /// Returns a handle to this event loop which can be sent across threads
- /// and can be used as a proxy to the event loop itself.
- ///
- /// Handles are cloneable and clones always refer to the same event loop.
- /// This handle is typically passed into functions that create I/O objects
- /// to bind them to this event loop.
- pub(crate) fn handle(&self) -> Handle {
- Handle {
- inner: Arc::downgrade(&self.inner),
- }
- }
-
- fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
- // How often to call `compact()` on the resource slab
- const COMPACT_INTERVAL: u8 = 255;
-
- self.tick = self.tick.wrapping_add(1);
-
- if self.tick == COMPACT_INTERVAL {
- self.resources.as_mut().unwrap().compact()
- }
-
- let mut events = self.events.take().expect("i/o driver event store missing");
-
- // Block waiting for an event to happen, peeling out how many events
- // happened.
- match self.poll.poll(&mut events, max_wait) {
- Ok(_) => {}
- Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
- Err(e) => return Err(e),
- }
-
- // Process all the events that came in, dispatching appropriately
- for event in events.iter() {
- let token = event.token();
-
- if token != TOKEN_WAKEUP {
- self.dispatch(token, Ready::from_mio(event));
- }
- }
-
- self.events = Some(events);
-
- Ok(())
- }
-
- fn dispatch(&mut self, token: mio::Token, ready: Ready) {
- let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
-
- let resources = self.resources.as_mut().unwrap();
-
- let io = match resources.get(addr) {
- Some(io) => io,
- None => return,
- };
-
- let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
-
- if res.is_err() {
- // token no longer valid!
- return;
- }
-
- io.wake(ready);
- }
-}
-
-impl Drop for Driver {
- fn drop(&mut self) {
- (*self.inner.resources.lock()) = self.resources.take();
- }
-}
-
-impl Drop for Inner {
- fn drop(&mut self) {
- let resources = self.resources.lock().take();
-
- if let Some(mut slab) = resources {
- slab.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown.
- io.shutdown();
- });
- }
- }
-}
-
-impl Park for Driver {
- type Unpark = Handle;
- type Error = io::Error;
-
- fn unpark(&self) -> Self::Unpark {
- self.handle()
- }
-
- fn park(&mut self) -> io::Result<()> {
- self.turn(None)?;
- Ok(())
- }
-
- fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
- self.turn(Some(duration))?;
- Ok(())
- }
-
- fn shutdown(&mut self) {}
-}
-
-impl fmt::Debug for Driver {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Driver")
- }
-}
-
-// ===== impl Handle =====
-
-cfg_rt! {
- impl Handle {
- /// Returns a handle to the current reactor.
- ///
- /// # Panics
- ///
- /// This function panics if there is no current reactor set and `rt` feature
- /// flag is not enabled.
- pub(super) fn current() -> Self {
- crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
- }
- }
-}
-
-cfg_not_rt! {
- impl Handle {
- /// Returns a handle to the current reactor.
- ///
- /// # Panics
- ///
- /// This function panics if there is no current reactor set, or if the `rt`
- /// feature flag is not enabled.
- pub(super) fn current() -> Self {
- panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
- }
- }
-}
-
-impl Handle {
- /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
- /// makes the next call to `turn` return immediately.
- ///
- /// This method is intended to be used in situations where a notification
- /// needs to otherwise be sent to the main reactor. If the reactor is
- /// currently blocked inside of `turn` then it will wake up and soon return
- /// after this method has been called. If the reactor is not currently
- /// blocked in `turn`, then the next call to `turn` will not block and
- /// return immediately.
- fn wakeup(&self) {
- if let Some(inner) = self.inner() {
- inner.waker.wake().expect("failed to wake I/O driver");
- }
- }
-
- pub(super) fn inner(&self) -> Option<Arc<Inner>> {
- self.inner.upgrade()
- }
-}
-
-impl Unpark for Handle {
- fn unpark(&self) {
- self.wakeup();
- }
-}
-
-impl fmt::Debug for Handle {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Handle")
- }
-}
-
-// ===== impl Inner =====
-
-impl Inner {
- /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
- ///
- /// The registration token is returned.
- pub(super) fn add_source(
- &self,
- source: &mut impl mio::event::Source,
- interest: Interest,
- ) -> io::Result<slab::Ref<ScheduledIo>> {
- let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- "reactor at max registered I/O resources",
- )
- })?;
-
- let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
-
- self.registry
- .register(source, mio::Token(token), interest.to_mio())?;
-
- Ok(shared)
- }
-
- /// Deregisters an I/O resource from the reactor.
- pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
- self.registry.deregister(source)
- }
-}
-
-impl Direction {
- pub(super) fn mask(self) -> Ready {
- match self {
- Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
- Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
- }
- }
-}
diff --git a/src/io/driver/platform.rs b/src/io/driver/platform.rs
deleted file mode 100644
index 6b27988..0000000
--- a/src/io/driver/platform.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-pub(crate) use self::sys::*;
-
-#[cfg(unix)]
-mod sys {
- use mio::unix::UnixReady;
- use mio::Ready;
-
- pub(crate) fn hup() -> Ready {
- UnixReady::hup().into()
- }
-
- pub(crate) fn is_hup(ready: Ready) -> bool {
- UnixReady::from(ready).is_hup()
- }
-
- pub(crate) fn error() -> Ready {
- UnixReady::error().into()
- }
-
- pub(crate) fn is_error(ready: Ready) -> bool {
- UnixReady::from(ready).is_error()
- }
-}
-
-#[cfg(windows)]
-mod sys {
- use mio::Ready;
-
- pub(crate) fn hup() -> Ready {
- Ready::empty()
- }
-
- pub(crate) fn is_hup(_: Ready) -> bool {
- false
- }
-
- pub(crate) fn error() -> Ready {
- Ready::empty()
- }
-
- pub(crate) fn is_error(_: Ready) -> bool {
- false
- }
-}
diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs
deleted file mode 100644
index 7350be6..0000000
--- a/src/io/driver/registration.rs
+++ /dev/null
@@ -1,262 +0,0 @@
-#![cfg_attr(not(feature = "net"), allow(dead_code))]
-
-use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
-use crate::util::slab;
-
-use mio::event::Source;
-use std::io;
-use std::task::{Context, Poll};
-
-cfg_io_driver! {
- /// Associates an I/O resource with the reactor instance that drives it.
- ///
- /// A registration represents an I/O resource registered with a Reactor such
- /// that it will receive task notifications on readiness. This is the lowest
- /// level API for integrating with a reactor.
- ///
- /// The association between an I/O resource is made by calling
- /// [`new_with_interest_and_handle`].
- /// Once the association is established, it remains established until the
- /// registration instance is dropped.
- ///
- /// A registration instance represents two separate readiness streams. One
- /// for the read readiness and one for write readiness. These streams are
- /// independent and can be consumed from separate tasks.
- ///
- /// **Note**: while `Registration` is `Sync`, the caller must ensure that
- /// there are at most two tasks that use a registration instance
- /// concurrently. One task for [`poll_read_ready`] and one task for
- /// [`poll_write_ready`]. While violating this requirement is "safe" from a
- /// Rust memory safety point of view, it will result in unexpected behavior
- /// in the form of lost notifications and tasks hanging.
- ///
- /// ## Platform-specific events
- ///
- /// `Registration` also allows receiving platform-specific `mio::Ready`
- /// events. These events are included as part of the read readiness event
- /// stream. The write readiness event stream is only for `Ready::writable()`
- /// events.
- ///
- /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
- /// [`poll_read_ready`]: method@Self::poll_read_ready`
- /// [`poll_write_ready`]: method@Self::poll_write_ready`
- #[derive(Debug)]
- pub(crate) struct Registration {
- /// Handle to the associated driver.
- handle: Handle,
-
- /// Reference to state stored by the driver.
- shared: slab::Ref<ScheduledIo>,
- }
-}
-
-unsafe impl Send for Registration {}
-unsafe impl Sync for Registration {}
-
-// ===== impl Registration =====
-
-impl Registration {
- /// Registers the I/O resource with the default reactor, for a specific
- /// `Interest`. `new_with_interest` should be used over `new` when you need
- /// control over the readiness state, such as when a file descriptor only
- /// allows reads. This does not add `hup` or `error` so if you are
- /// interested in those states, you will need to add them to the readiness
- /// state passed to this function.
- ///
- /// # Return
- ///
- /// - `Ok` if the registration happened successfully
- /// - `Err` if an error was encountered during registration
- pub(crate) fn new_with_interest_and_handle(
- io: &mut impl Source,
- interest: Interest,
- handle: Handle,
- ) -> io::Result<Registration> {
- let shared = if let Some(inner) = handle.inner() {
- inner.add_source(io, interest)?
- } else {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to find event loop",
- ));
- };
-
- Ok(Registration { handle, shared })
- }
-
- /// Deregisters the I/O resource from the reactor it is associated with.
- ///
- /// This function must be called before the I/O resource associated with the
- /// registration is dropped.
- ///
- /// Note that deregistering does not guarantee that the I/O resource can be
- /// registered with a different reactor. Some I/O resource types can only be
- /// associated with a single reactor instance for their lifetime.
- ///
- /// # Return
- ///
- /// If the deregistration was successful, `Ok` is returned. Any calls to
- /// `Reactor::turn` that happen after a successful call to `deregister` will
- /// no longer result in notifications getting sent for this registration.
- ///
- /// `Err` is returned if an error is encountered.
- pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
- };
- inner.deregister_source(io)
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- self.shared.clear_readiness(event);
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.poll_ready(cx, Direction::Read)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.poll_ready(cx, Direction::Write)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_read_io<R>(
- &self,
- cx: &mut Context<'_>,
- f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- self.poll_io(cx, Direction::Read, f)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_write_io<R>(
- &self,
- cx: &mut Context<'_>,
- f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- self.poll_io(cx, Direction::Write, f)
- }
-
- /// Polls for events on the I/O resource's `direction` readiness stream.
- ///
- /// If called with a task context, notify the task when a new event is
- /// received.
- fn poll_ready(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- ) -> Poll<io::Result<ReadyEvent>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
- let ev = ready!(self.shared.poll_readiness(cx, direction));
-
- if self.handle.inner().is_none() {
- return Poll::Ready(Err(gone()));
- }
-
- coop.made_progress();
- Poll::Ready(Ok(ev))
- }
-
- fn poll_io<R>(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- mut f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- loop {
- let ev = ready!(self.poll_ready(cx, direction))?;
-
- match f() {
- Ok(ret) => {
- return Poll::Ready(Ok(ret));
- }
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(ev);
- }
- Err(e) => return Poll::Ready(Err(e)),
- }
- }
- }
-
- pub(crate) fn try_io<R>(
- &self,
- interest: Interest,
- f: impl FnOnce() -> io::Result<R>,
- ) -> io::Result<R> {
- let ev = self.shared.ready_event(interest);
-
- // Don't attempt the operation if the resource is not ready.
- if ev.ready.is_empty() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- match f() {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(ev);
- Err(io::ErrorKind::WouldBlock.into())
- }
- res => res,
- }
- }
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- // It is possible for a cycle to be created between wakers stored in
- // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
- // cycle, wakers are cleared. This is an imperfect solution as it is
- // possible to store a `Registration` in a waker. In this case, the
- // cycle would remain.
- //
- // See tokio-rs/tokio#3481 for more details.
- self.shared.clear_wakers();
- }
-}
-
-fn gone() -> io::Error {
- io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
-}
-
-cfg_io_readiness! {
- impl Registration {
- pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
- use std::future::Future;
- use std::pin::Pin;
-
- let fut = self.shared.readiness(interest);
- pin!(fut);
-
- crate::future::poll_fn(|cx| {
- if self.handle.inner().is_none() {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
- )));
- }
-
- Pin::new(&mut fut).poll(cx).map(Ok)
- }).await
- }
-
- pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
- loop {
- let event = self.readiness(interest).await?;
-
- match f() {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(event);
- }
- x => return x,
- }
- }
- }
- }
-}
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
deleted file mode 100644
index 76f9343..0000000
--- a/src/io/driver/scheduled_io.rs
+++ /dev/null
@@ -1,533 +0,0 @@
-use super::{Interest, Ready, ReadyEvent, Tick};
-use crate::loom::sync::atomic::AtomicUsize;
-use crate::loom::sync::Mutex;
-use crate::util::bit;
-use crate::util::slab::Entry;
-use crate::util::WakeList;
-
-use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
-use std::task::{Context, Poll, Waker};
-
-use super::Direction;
-
-cfg_io_readiness! {
- use crate::util::linked_list::{self, LinkedList};
-
- use std::cell::UnsafeCell;
- use std::future::Future;
- use std::marker::PhantomPinned;
- use std::pin::Pin;
- use std::ptr::NonNull;
-}
-
-/// Stored in the I/O driver resource slab.
-#[derive(Debug)]
-pub(crate) struct ScheduledIo {
- /// Packs the resource's readiness with the resource's generation.
- readiness: AtomicUsize,
-
- waiters: Mutex<Waiters>,
-}
-
-cfg_io_readiness! {
- type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
-}
-
-#[derive(Debug, Default)]
-struct Waiters {
- #[cfg(feature = "net")]
- /// List of all current waiters.
- list: WaitList,
-
- /// Waker used for AsyncRead.
- reader: Option<Waker>,
-
- /// Waker used for AsyncWrite.
- writer: Option<Waker>,
-
- /// True if this ScheduledIo has been killed due to IO driver shutdown.
- is_shutdown: bool,
-}
-
-cfg_io_readiness! {
- #[derive(Debug)]
- struct Waiter {
- pointers: linked_list::Pointers<Waiter>,
-
- /// The waker for this task.
- waker: Option<Waker>,
-
- /// The interest this waiter is waiting on.
- interest: Interest,
-
- is_ready: bool,
-
- /// Should never be `!Unpin`.
- _p: PhantomPinned,
- }
-
- /// Future returned by `readiness()`.
- struct Readiness<'a> {
- scheduled_io: &'a ScheduledIo,
-
- state: State,
-
- /// Entry in the waiter `LinkedList`.
- waiter: UnsafeCell<Waiter>,
- }
-
- enum State {
- Init,
- Waiting,
- Done,
- }
-}
-
-// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
-//
-// | reserved | generation | driver tick | readiness |
-// |----------+------------+--------------+-----------|
-// | 1 bit | 7 bits + 8 bits + 16 bits |
-
-const READINESS: bit::Pack = bit::Pack::least_significant(16);
-
-const TICK: bit::Pack = READINESS.then(8);
-
-const GENERATION: bit::Pack = TICK.then(7);
-
-#[test]
-fn test_generations_assert_same() {
- assert_eq!(super::GENERATION, GENERATION);
-}
-
-// ===== impl ScheduledIo =====
-
-impl Entry for ScheduledIo {
- fn reset(&self) {
- let state = self.readiness.load(Acquire);
-
- let generation = GENERATION.unpack(state);
- let next = GENERATION.pack_lossy(generation + 1, 0);
-
- self.readiness.store(next, Release);
- }
-}
-
-impl Default for ScheduledIo {
- fn default() -> ScheduledIo {
- ScheduledIo {
- readiness: AtomicUsize::new(0),
- waiters: Mutex::new(Default::default()),
- }
- }
-}
-
-impl ScheduledIo {
- pub(crate) fn generation(&self) -> usize {
- GENERATION.unpack(self.readiness.load(Acquire))
- }
-
- /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
- /// permanently ready state.
- pub(super) fn shutdown(&self) {
- self.wake0(Ready::ALL, true)
- }
-
- /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
- /// the current value, returning the previous readiness value.
- ///
- /// # Arguments
- /// - `token`: the token for this `ScheduledIo`.
- /// - `tick`: whether setting the tick or trying to clear readiness for a
- /// specific tick.
- /// - `f`: a closure returning a new readiness value given the previous
- /// readiness.
- ///
- /// # Returns
- ///
- /// If the given token's generation no longer matches the `ScheduledIo`'s
- /// generation, then the corresponding IO resource has been removed and
- /// replaced with a new resource. In that case, this method returns `Err`.
- /// Otherwise, this returns the previous readiness.
- pub(super) fn set_readiness(
- &self,
- token: Option<usize>,
- tick: Tick,
- f: impl Fn(Ready) -> Ready,
- ) -> Result<(), ()> {
- let mut current = self.readiness.load(Acquire);
-
- loop {
- let current_generation = GENERATION.unpack(current);
-
- if let Some(token) = token {
- // Check that the generation for this access is still the
- // current one.
- if GENERATION.unpack(token) != current_generation {
- return Err(());
- }
- }
-
- // Mask out the tick/generation bits so that the modifying
- // function doesn't see them.
- let current_readiness = Ready::from_usize(current);
- let new = f(current_readiness);
-
- let packed = match tick {
- Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
- Tick::Clear(t) => {
- if TICK.unpack(current) as u8 != t {
- // Trying to clear readiness with an old event!
- return Err(());
- }
-
- TICK.pack(t as usize, new.as_usize())
- }
- };
-
- let next = GENERATION.pack(current_generation, packed);
-
- match self
- .readiness
- .compare_exchange(current, next, AcqRel, Acquire)
- {
- Ok(_) => return Ok(()),
- // we lost the race, retry!
- Err(actual) => current = actual,
- }
- }
- }
-
- /// Notifies all pending waiters that have registered interest in `ready`.
- ///
- /// There may be many waiters to notify. Waking the pending task **must** be
- /// done from outside of the lock otherwise there is a potential for a
- /// deadlock.
- ///
- /// A stack array of wakers is created and filled with wakers to notify, the
- /// lock is released, and the wakers are notified. Because there may be more
- /// than 32 wakers to notify, if the stack array fills up, the lock is
- /// released, the array is cleared, and the iteration continues.
- pub(super) fn wake(&self, ready: Ready) {
- self.wake0(ready, false);
- }
-
- fn wake0(&self, ready: Ready, shutdown: bool) {
- let mut wakers = WakeList::new();
-
- let mut waiters = self.waiters.lock();
-
- waiters.is_shutdown |= shutdown;
-
- // check for AsyncRead slot
- if ready.is_readable() {
- if let Some(waker) = waiters.reader.take() {
- wakers.push(waker);
- }
- }
-
- // check for AsyncWrite slot
- if ready.is_writable() {
- if let Some(waker) = waiters.writer.take() {
- wakers.push(waker);
- }
- }
-
- #[cfg(feature = "net")]
- 'outer: loop {
- let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
-
- while wakers.can_push() {
- match iter.next() {
- Some(waiter) => {
- let waiter = unsafe { &mut *waiter.as_ptr() };
-
- if let Some(waker) = waiter.waker.take() {
- waiter.is_ready = true;
- wakers.push(waker);
- }
- }
- None => {
- break 'outer;
- }
- }
- }
-
- drop(waiters);
-
- wakers.wake_all();
-
- // Acquire the lock again.
- waiters = self.waiters.lock();
- }
-
- // Release the lock before notifying
- drop(waiters);
-
- wakers.wake_all();
- }
-
- pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
- let curr = self.readiness.load(Acquire);
-
- ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
- }
- }
-
- /// Polls for readiness events in a given direction.
- ///
- /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
- /// which cannot use the `async fn` version. This uses reserved reader
- /// and writer slots.
- pub(super) fn poll_readiness(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- ) -> Poll<ReadyEvent> {
- let curr = self.readiness.load(Acquire);
-
- let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
-
- if ready.is_empty() {
- // Update the task info
- let mut waiters = self.waiters.lock();
- let slot = match direction {
- Direction::Read => &mut waiters.reader,
- Direction::Write => &mut waiters.writer,
- };
-
- // Avoid cloning the waker if one is already stored that matches the
- // current task.
- match slot {
- Some(existing) => {
- if !existing.will_wake(cx.waker()) {
- *existing = cx.waker().clone();
- }
- }
- None => {
- *slot = Some(cx.waker().clone());
- }
- }
-
- // Try again, in case the readiness was changed while we were
- // taking the waiters lock
- let curr = self.readiness.load(Acquire);
- let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if waiters.is_shutdown {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready: direction.mask(),
- })
- } else if ready.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready,
- })
- }
- } else {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready,
- })
- }
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- // This consumes the current readiness state **except** for closed
- // states. Closed states are excluded because they are final states.
- let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
-
- // result isn't important
- let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
- }
-
- pub(crate) fn clear_wakers(&self) {
- let mut waiters = self.waiters.lock();
- waiters.reader.take();
- waiters.writer.take();
- }
-}
-
-impl Drop for ScheduledIo {
- fn drop(&mut self) {
- self.wake(Ready::ALL);
- }
-}
-
-unsafe impl Send for ScheduledIo {}
-unsafe impl Sync for ScheduledIo {}
-
-cfg_io_readiness! {
- impl ScheduledIo {
- /// An async version of `poll_readiness` which uses a linked list of wakers.
- pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
- self.readiness_fut(interest).await
- }
-
- // This is in a separate function so that the borrow checker doesn't think
- // we are borrowing the `UnsafeCell` possibly over await boundaries.
- //
- // Go figure.
- fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
- Readiness {
- scheduled_io: self,
- state: State::Init,
- waiter: UnsafeCell::new(Waiter {
- pointers: linked_list::Pointers::new(),
- waker: None,
- is_ready: false,
- interest,
- _p: PhantomPinned,
- }),
- }
- }
- }
-
- unsafe impl linked_list::Link for Waiter {
- type Handle = NonNull<Waiter>;
- type Target = Waiter;
-
- fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
- *handle
- }
-
- unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
- ptr
- }
-
- unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
- NonNull::from(&mut target.as_mut().pointers)
- }
- }
-
- // ===== impl Readiness =====
-
- impl Future for Readiness<'_> {
- type Output = ReadyEvent;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- use std::sync::atomic::Ordering::SeqCst;
-
- let (scheduled_io, state, waiter) = unsafe {
- let me = self.get_unchecked_mut();
- (&me.scheduled_io, &mut me.state, &me.waiter)
- };
-
- loop {
- match *state {
- State::Init => {
- // Optimistically check existing readiness
- let curr = scheduled_io.readiness.load(SeqCst);
- let ready = Ready::from_usize(READINESS.unpack(curr));
-
- // Safety: `waiter.interest` never changes
- let interest = unsafe { (*waiter.get()).interest };
- let ready = ready.intersection(interest);
-
- if !ready.is_empty() {
- // Currently ready!
- let tick = TICK.unpack(curr) as u8;
- *state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
- }
-
- // Wasn't ready, take the lock (and check again while locked).
- let mut waiters = scheduled_io.waiters.lock();
-
- let curr = scheduled_io.readiness.load(SeqCst);
- let mut ready = Ready::from_usize(READINESS.unpack(curr));
-
- if waiters.is_shutdown {
- ready = Ready::ALL;
- }
-
- let ready = ready.intersection(interest);
-
- if !ready.is_empty() {
- // Currently ready!
- let tick = TICK.unpack(curr) as u8;
- *state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
- }
-
- // Not ready even after locked, insert into list...
-
- // Safety: called while locked
- unsafe {
- (*waiter.get()).waker = Some(cx.waker().clone());
- }
-
- // Insert the waiter into the linked list
- //
- // safety: pointers from `UnsafeCell` are never null.
- waiters
- .list
- .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
- *state = State::Waiting;
- }
- State::Waiting => {
- // Currently in the "Waiting" state, implying the caller has
- // a waiter stored in the waiter list (guarded by
- // `notify.waiters`). In order to access the waker fields,
- // we must hold the lock.
-
- let waiters = scheduled_io.waiters.lock();
-
- // Safety: called while locked
- let w = unsafe { &mut *waiter.get() };
-
- if w.is_ready {
- // Our waker has been notified.
- *state = State::Done;
- } else {
- // Update the waker, if necessary.
- if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
- w.waker = Some(cx.waker().clone());
- }
-
- return Poll::Pending;
- }
-
- // Explicit drop of the lock to indicate the scope that the
- // lock is held. Because holding the lock is required to
- // ensure safe access to fields not held within the lock, it
- // is helpful to visualize the scope of the critical
- // section.
- drop(waiters);
- }
- State::Done => {
- let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
-
- // Safety: State::Done means it is no longer shared
- let w = unsafe { &mut *waiter.get() };
-
- return Poll::Ready(ReadyEvent {
- tick,
- ready: Ready::from_interest(w.interest),
- });
- }
- }
- }
- }
- }
-
- impl Drop for Readiness<'_> {
- fn drop(&mut self) {
- let mut waiters = self.scheduled_io.waiters.lock();
-
- // Safety: `waiter` is only ever stored in `waiters`
- unsafe {
- waiters
- .list
- .remove(NonNull::new_unchecked(self.waiter.get()))
- };
- }
- }
-
- unsafe impl Send for Readiness<'_> {}
- unsafe impl Sync for Readiness<'_> {}
-}
diff --git a/src/io/driver/interest.rs b/src/io/interest.rs
index d6b46df..013c114 100644
--- a/src/io/driver/interest.rs
+++ b/src/io/interest.rs
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))]
-use crate::io::driver::Ready;
+use crate::io::ready::Ready;
use std::fmt;
use std::ops;
@@ -100,7 +100,7 @@ impl Interest {
self.0
}
- pub(super) fn mask(self) -> Ready {
+ pub(crate) fn mask(self) -> Ready {
match self {
Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
diff --git a/src/io/mod.rs b/src/io/mod.rs
index cfdda61..f48035a 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -1,5 +1,3 @@
-#![cfg_attr(loom, allow(dead_code, unreachable_pub))]
-
//! Traits, helpers, and type definitions for asynchronous I/O functionality.
//!
//! This module is the asynchronous version of `std::io`. Primarily, it
@@ -180,6 +178,12 @@
//! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//! [`Write`]: std::io::Write
+
+#![cfg_attr(
+ not(all(feature = "rt", feature = "net")),
+ allow(dead_code, unused_imports)
+)]
+
cfg_io_blocking! {
pub(crate) mod blocking;
}
@@ -205,15 +209,19 @@ pub use self::read_buf::ReadBuf;
pub use std::io::{Error, ErrorKind, Result, SeekFrom};
cfg_io_driver_impl! {
- pub(crate) mod driver;
+ pub(crate) mod interest;
+ pub(crate) mod ready;
cfg_net! {
- pub use driver::{Interest, Ready};
+ pub use interest::Interest;
+ pub use ready::Ready;
}
+ #[cfg_attr(tokio_wasi, allow(unused_imports))]
mod poll_evented;
#[cfg(not(loom))]
+ #[cfg_attr(tokio_wasi, allow(unused_imports))]
pub(crate) use poll_evented::PollEvented;
}
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 44e68a2..dfe9ae3 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -1,16 +1,19 @@
-use crate::io::driver::{Handle, Interest, Registration};
+use crate::io::interest::Interest;
+use crate::runtime::io::Registration;
+use crate::runtime::scheduler;
use mio::event::Source;
use std::fmt;
use std::io;
use std::ops::Deref;
+use std::panic::{RefUnwindSafe, UnwindSafe};
cfg_io_driver! {
/// Associates an I/O resource that implements the [`std::io::Read`] and/or
/// [`std::io::Write`] traits with the reactor that drives it.
///
/// `PollEvented` uses [`Registration`] internally to take a type that
- /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or
+ /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
/// [`std::io::Write`] and associate it with a reactor that will drive it.
///
/// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
@@ -40,12 +43,12 @@ cfg_io_driver! {
/// [`poll_read_ready`] again will also indicate read readiness.
///
/// When the operation is attempted and is unable to succeed due to the I/O
- /// resource not being ready, the caller must call `clear_readiness`.
+ /// resource not being ready, the caller must call [`clear_readiness`].
/// This clears the readiness state until a new readiness event is received.
///
/// This allows the caller to implement additional functions. For example,
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
- /// `clear_read_ready`.
+ /// [`clear_readiness`].
///
/// ## Platform-specific events
///
@@ -56,6 +59,7 @@ cfg_io_driver! {
/// [`AsyncRead`]: crate::io::AsyncRead
/// [`AsyncWrite`]: crate::io::AsyncWrite
/// [`TcpListener`]: crate::net::TcpListener
+ /// [`clear_readiness`]: Registration::clear_readiness
/// [`poll_read_ready`]: Registration::poll_read_ready
/// [`poll_write_ready`]: Registration::poll_write_ready
pub(crate) struct PollEvented<E: Source> {
@@ -76,6 +80,7 @@ impl<E: Source> PollEvented<E> {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
@@ -96,15 +101,17 @@ impl<E: Source> PollEvented<E> {
/// a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
/// function.
+ #[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
- Self::new_with_interest_and_handle(io, interest, Handle::current())
+ Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
}
+ #[track_caller]
pub(crate) fn new_with_interest_and_handle(
mut io: E,
interest: Interest,
- handle: Handle,
+ handle: scheduler::Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self {
@@ -114,11 +121,7 @@ impl<E: Source> PollEvented<E> {
}
/// Returns a reference to the registration.
- #[cfg(any(
- feature = "net",
- all(unix, feature = "process"),
- all(unix, feature = "signal"),
- ))]
+ #[cfg(any(feature = "net"))]
pub(crate) fn registration(&self) -> &Registration {
&self.registration
}
@@ -133,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}
feature! {
- #![any(feature = "net", feature = "process")]
+ #![any(feature = "net", all(unix, feature = "process"))]
use crate::io::ReadBuf;
use std::task::{Context, Poll};
@@ -150,16 +153,32 @@ feature! {
{
use std::io::Read;
- let n = ready!(self.registration.poll_read_io(cx, || {
+ loop {
+ let evt = ready!(self.registration.poll_read_ready(cx))?;
+
let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
- self.io.as_ref().unwrap().read(b)
- }))?;
-
- // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
- // buffer.
- buf.assume_init(n);
- buf.advance(n);
- Poll::Ready(Ok(()))
+ let len = b.len();
+
+ match self.io.as_ref().unwrap().read(b) {
+ Ok(n) => {
+ // if we read a partially full buffer, this is sufficient on unix to show
+ // that the socket buffer has been drained
+ if n > 0 && (!cfg!(windows) && n < len) {
+ self.registration.clear_readiness(evt);
+ }
+
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
+ // buffer.
+ buf.assume_init(n);
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ },
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.registration.clear_readiness(evt);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
}
pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
@@ -167,10 +186,29 @@ feature! {
&'a E: io::Write + 'a,
{
use std::io::Write;
- self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
+
+ loop {
+ let evt = ready!(self.registration.poll_write_ready(cx))?;
+
+ match self.io.as_ref().unwrap().write(buf) {
+ Ok(n) => {
+ // if we write only part of our buffer, this is sufficient on unix to show
+ // that the socket buffer is full
+ if n > 0 && (!cfg!(windows) && n < buf.len()) {
+ self.registration.clear_readiness(evt);
+ }
+
+ return Poll::Ready(Ok(n));
+ },
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.registration.clear_readiness(evt);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
}
- #[cfg(feature = "net")]
+ #[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn poll_write_vectored<'a>(
&'a self,
cx: &mut Context<'_>,
@@ -185,6 +223,10 @@ feature! {
}
}
+impl<E: Source> UnwindSafe for PollEvented<E> {}
+
+impl<E: Source> RefUnwindSafe for PollEvented<E> {}
+
impl<E: Source> Deref for PollEvented<E> {
type Target = E;
diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs
index ad58cbe..0dc595a 100644
--- a/src/io/read_buf.rs
+++ b/src/io/read_buf.rs
@@ -1,9 +1,5 @@
-// This lint claims ugly casting is somehow safer than transmute, but there's
-// no evidence that is the case. Shush.
-#![allow(clippy::transmute_ptr_to_ptr)]
-
use std::fmt;
-use std::mem::{self, MaybeUninit};
+use std::mem::MaybeUninit;
/// A wrapper around a byte buffer that is incrementally filled and initialized.
///
@@ -35,7 +31,7 @@ impl<'a> ReadBuf<'a> {
#[inline]
pub fn new(buf: &'a mut [u8]) -> ReadBuf<'a> {
let initialized = buf.len();
- let buf = unsafe { mem::transmute::<&mut [u8], &mut [MaybeUninit<u8>]>(buf) };
+ let buf = unsafe { slice_to_uninit_mut(buf) };
ReadBuf {
buf,
filled: 0,
@@ -67,8 +63,7 @@ impl<'a> ReadBuf<'a> {
let slice = &self.buf[..self.filled];
// safety: filled describes how far into the buffer that the
// user has filled with bytes, so it's been initialized.
- // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable.
- unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) }
+ unsafe { slice_assume_init(slice) }
}
/// Returns a mutable reference to the filled portion of the buffer.
@@ -77,8 +72,7 @@ impl<'a> ReadBuf<'a> {
let slice = &mut self.buf[..self.filled];
// safety: filled describes how far into the buffer that the
// user has filled with bytes, so it's been initialized.
- // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable.
- unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
+ unsafe { slice_assume_init_mut(slice) }
}
/// Returns a new `ReadBuf` comprised of the unfilled section up to `n`.
@@ -97,8 +91,7 @@ impl<'a> ReadBuf<'a> {
let slice = &self.buf[..self.initialized];
// safety: initialized describes how far into the buffer that the
// user has at some point initialized with bytes.
- // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable.
- unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) }
+ unsafe { slice_assume_init(slice) }
}
/// Returns a mutable reference to the initialized portion of the buffer.
@@ -109,15 +102,14 @@ impl<'a> ReadBuf<'a> {
let slice = &mut self.buf[..self.initialized];
// safety: initialized describes how far into the buffer that the
// user has at some point initialized with bytes.
- // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable.
- unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
+ unsafe { slice_assume_init_mut(slice) }
}
/// Returns a mutable reference to the entire buffer, without ensuring that it has been fully
/// initialized.
///
/// The elements between 0 and `self.filled().len()` are filled, and those between 0 and
- /// `self.initialized().len()` are initialized (and so can be transmuted to a `&mut [u8]`).
+ /// `self.initialized().len()` are initialized (and so can be converted to a `&mut [u8]`).
///
/// The caller of this method must ensure that these invariants are upheld. For example, if the
/// caller initializes some of the uninitialized section of the buffer, it must call
@@ -160,6 +152,7 @@ impl<'a> ReadBuf<'a> {
///
/// Panics if `self.remaining()` is less than `n`.
#[inline]
+ #[track_caller]
pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] {
assert!(self.remaining() >= n, "n overflows remaining");
@@ -178,7 +171,7 @@ impl<'a> ReadBuf<'a> {
let slice = &mut self.buf[self.filled..end];
// safety: just above, we checked that the end of the buf has
// been initialized to some value.
- unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
+ unsafe { slice_assume_init_mut(slice) }
}
/// Returns the number of bytes at the end of the slice that have not yet been filled.
@@ -203,6 +196,7 @@ impl<'a> ReadBuf<'a> {
///
/// Panics if the filled region of the buffer would become larger than the initialized region.
#[inline]
+ #[track_caller]
pub fn advance(&mut self, n: usize) {
let new = self.filled.checked_add(n).expect("filled overflow");
self.set_filled(new);
@@ -219,6 +213,7 @@ impl<'a> ReadBuf<'a> {
///
/// Panics if the filled region of the buffer would become larger than the initialized region.
#[inline]
+ #[track_caller]
pub fn set_filled(&mut self, n: usize) {
assert!(
n <= self.initialized,
@@ -249,6 +244,7 @@ impl<'a> ReadBuf<'a> {
///
/// Panics if `self.remaining()` is less than `buf.len()`.
#[inline]
+ #[track_caller]
pub fn put_slice(&mut self, buf: &[u8]) {
assert!(
self.remaining() >= buf.len(),
@@ -283,3 +279,17 @@ impl fmt::Debug for ReadBuf<'_> {
.finish()
}
}
+
+unsafe fn slice_to_uninit_mut(slice: &mut [u8]) -> &mut [MaybeUninit<u8>] {
+ &mut *(slice as *mut [u8] as *mut [MaybeUninit<u8>])
+}
+
+// TODO: This could use `MaybeUninit::slice_assume_init` when it is stable.
+unsafe fn slice_assume_init(slice: &[MaybeUninit<u8>]) -> &[u8] {
+ &*(slice as *const [MaybeUninit<u8>] as *const [u8])
+}
+
+// TODO: This could use `MaybeUninit::slice_assume_init_mut` when it is stable.
+unsafe fn slice_assume_init_mut(slice: &mut [MaybeUninit<u8>]) -> &mut [u8] {
+ &mut *(slice as *mut [MaybeUninit<u8>] as *mut [u8])
+}
diff --git a/src/io/driver/ready.rs b/src/io/ready.rs
index 2430d30..ef135c4 100644
--- a/src/io/driver/ready.rs
+++ b/src/io/ready.rs
@@ -12,7 +12,7 @@ const WRITE_CLOSED: usize = 0b0_1000;
///
/// `Ready` tracks which operation an I/O resource is ready to perform.
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
-#[derive(Clone, Copy, PartialEq, PartialOrd)]
+#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Ready(usize);
impl Ready {
diff --git a/src/io/split.rs b/src/io/split.rs
index 8258a0f..2e0da95 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -74,6 +74,7 @@ impl<T> ReadHalf<T> {
/// same `split` operation this method will panic.
/// This can be checked ahead of time by comparing the stream ID
/// of the two halves.
+ #[track_caller]
pub fn unsplit(self, wr: WriteHalf<T>) -> T {
if self.is_pair_of(&wr) {
drop(wr);
diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs
index 7e4a198..2715ba7 100644
--- a/src/io/stdio_common.rs
+++ b/src/io/stdio_common.rs
@@ -42,7 +42,7 @@ where
// for further code. Since `AsyncWrite` can always shrink
// buffer at its discretion, excessive (i.e. in tests) shrinking
// does not break correctness.
- // 2. If buffer is small, it will not be shrinked.
+ // 2. If buffer is small, it will not be shrunk.
// That's why, it's "textness" will not change, so we don't have
// to fixup it.
if cfg!(not(any(target_os = "windows", test))) || buf.len() <= crate::io::blocking::MAX_BUF
@@ -193,7 +193,7 @@ mod tests {
fn test_pseudo_text() {
// In this test we write a piece of binary data, whose beginning is
// text though. We then validate that even in this corner case buffer
- // was not shrinked too much.
+ // was not shrunk too much.
let checked_count = super::MAGIC_CONST * super::MAX_BYTES_PER_CHAR;
let mut data: Vec<u8> = str::repeat("a", checked_count).into();
data.extend(std::iter::repeat(0b1010_1010).take(MAX_BUF - checked_count + 1));
@@ -212,7 +212,7 @@ mod tests {
writer.write_history.iter().copied().sum::<usize>(),
data.len()
);
- // Check that at most MAX_BYTES_PER_CHAR + 1 (i.e. 5) bytes were shrinked
+ // Check that at most MAX_BYTES_PER_CHAR + 1 (i.e. 5) bytes were shrunk
// from the buffer: one because it was outside of MAX_BUF boundary, and
// up to one "utf8 code point".
assert!(data.len() - writer.write_history[0] <= super::MAX_BYTES_PER_CHAR + 1);
diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs
index 46b3e6c..aadf3a7 100644
--- a/src/io/util/async_seek_ext.rs
+++ b/src/io/util/async_seek_ext.rs
@@ -69,7 +69,7 @@ cfg_io_util! {
/// Creates a future which will rewind to the beginning of the stream.
///
- /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`.
+ /// This is convenience method, equivalent to `self.seek(SeekFrom::Start(0))`.
fn rewind(&mut self) -> Seek<'_, Self>
where
Self: Unpin,
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index 93a3183..dfdde82 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -406,7 +406,7 @@ cfg_io_util! {
/// ```
fn write_u8(&mut self, n: u8) -> WriteU8;
- /// Writes an unsigned 8-bit integer to the underlying writer.
+ /// Writes a signed 8-bit integer to the underlying writer.
///
/// Equivalent to:
///
@@ -425,7 +425,7 @@ cfg_io_util! {
///
/// # Examples
///
- /// Write unsigned 8 bit integers to a `AsyncWrite`:
+ /// Write signed 8 bit integers to a `AsyncWrite`:
///
/// ```rust
/// use tokio::io::{self, AsyncWriteExt};
@@ -434,10 +434,10 @@ cfg_io_util! {
/// async fn main() -> io::Result<()> {
/// let mut writer = Vec::new();
///
- /// writer.write_u8(2).await?;
- /// writer.write_u8(5).await?;
+ /// writer.write_i8(-2).await?;
+ /// writer.write_i8(126).await?;
///
- /// assert_eq!(writer, b"\x02\x05");
+ /// assert_eq!(writer, b"\xFE\x7E");
/// Ok(())
/// }
/// ```
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 7df610b..60879c0 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -204,7 +204,6 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(offset))?;
- self.as_mut().get_pin_mut().poll_complete(cx)?
} else {
// seek backwards by our remainder, and then by the offset
self.as_mut()
@@ -221,8 +220,8 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(n))?;
- self.as_mut().get_pin_mut().poll_complete(cx)?
}
+ self.as_mut().get_pin_mut().poll_complete(cx)?
}
SeekState::PendingOverflowed(n) => {
if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs
index d0ab7cb..47dad89 100644
--- a/src/io/util/copy.rs
+++ b/src/io/util/copy.rs
@@ -27,6 +27,51 @@ impl CopyBuffer {
}
}
+ fn poll_fill_buf<R>(
+ &mut self,
+ cx: &mut Context<'_>,
+ reader: Pin<&mut R>,
+ ) -> Poll<io::Result<()>>
+ where
+ R: AsyncRead + ?Sized,
+ {
+ let me = &mut *self;
+ let mut buf = ReadBuf::new(&mut me.buf);
+ buf.set_filled(me.cap);
+
+ let res = reader.poll_read(cx, &mut buf);
+ if let Poll::Ready(Ok(_)) = res {
+ let filled_len = buf.filled().len();
+ me.read_done = me.cap == filled_len;
+ me.cap = filled_len;
+ }
+ res
+ }
+
+ fn poll_write_buf<R, W>(
+ &mut self,
+ cx: &mut Context<'_>,
+ mut reader: Pin<&mut R>,
+ mut writer: Pin<&mut W>,
+ ) -> Poll<io::Result<usize>>
+ where
+ R: AsyncRead + ?Sized,
+ W: AsyncWrite + ?Sized,
+ {
+ let me = &mut *self;
+ match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) {
+ Poll::Pending => {
+ // Top up the buffer towards full if we can read a bit more
+ // data - this should improve the chances of a large write
+ if !me.read_done && me.cap < me.buf.len() {
+ ready!(me.poll_fill_buf(cx, reader.as_mut()))?;
+ }
+ Poll::Pending
+ }
+ res => res,
+ }
+ }
+
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
@@ -41,10 +86,10 @@ impl CopyBuffer {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
- let me = &mut *self;
- let mut buf = ReadBuf::new(&mut me.buf);
+ self.pos = 0;
+ self.cap = 0;
- match reader.as_mut().poll_read(cx, &mut buf) {
+ match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
@@ -58,20 +103,11 @@ impl CopyBuffer {
return Poll::Pending;
}
}
-
- let n = buf.filled().len();
- if n == 0 {
- self.read_done = true;
- } else {
- self.pos = 0;
- self.cap = n;
- }
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
- let me = &mut *self;
- let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?;
+ let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
if i == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
diff --git a/src/io/util/empty.rs b/src/io/util/empty.rs
index f964d18..9e648f8 100644
--- a/src/io/util/empty.rs
+++ b/src/io/util/empty.rs
@@ -50,16 +50,18 @@ impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
- _: &mut Context<'_>,
+ cx: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
+ ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}
}
impl AsyncBufRead for Empty {
#[inline]
- fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(&[]))
}
@@ -73,6 +75,20 @@ impl fmt::Debug for Empty {
}
}
+cfg_coop! {
+ fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
+ coop.made_progress();
+ Poll::Ready(())
+ }
+}
+
+cfg_not_coop! {
+ fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> {
+ Poll::Ready(())
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs
index 3655c01..bb07c76 100644
--- a/src/io/util/fill_buf.rs
+++ b/src/io/util/fill_buf.rs
@@ -40,6 +40,12 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> {
// Safety: This is necessary only due to a limitation in the
// borrow checker. Once Rust starts using the polonius borrow
// checker, this can be simplified.
+ //
+ // The safety of this transmute relies on the fact that the
+ // value of `reader` is `None` when we return in this branch.
+ // Otherwise the caller could poll us again after
+ // completion, and access the mutable reference while the
+ // returned immutable reference still exists.
let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice);
Poll::Ready(Ok(slice))
},
diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs
index 4eefe7b..31884b3 100644
--- a/src/io/util/mem.rs
+++ b/src/io/util/mem.rs
@@ -177,10 +177,8 @@ impl Pipe {
waker.wake();
}
}
-}
-impl AsyncRead for Pipe {
- fn poll_read(
+ fn poll_read_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
@@ -204,10 +202,8 @@ impl AsyncRead for Pipe {
Poll::Pending
}
}
-}
-impl AsyncWrite for Pipe {
- fn poll_write(
+ fn poll_write_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
@@ -228,6 +224,62 @@ impl AsyncWrite for Pipe {
}
Poll::Ready(Ok(len))
}
+}
+
+impl AsyncRead for Pipe {
+ cfg_coop! {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
+
+ let ret = self.poll_read_internal(cx, buf);
+ if ret.is_ready() {
+ coop.made_progress();
+ }
+ ret
+ }
+ }
+
+ cfg_not_coop! {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ self.poll_read_internal(cx, buf)
+ }
+ }
+}
+
+impl AsyncWrite for Pipe {
+ cfg_coop! {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::io::Result<usize>> {
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
+
+ let ret = self.poll_write_internal(cx, buf);
+ if ret.is_ready() {
+ coop.made_progress();
+ }
+ ret
+ }
+ }
+
+ cfg_not_coop! {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::io::Result<usize>> {
+ self.poll_write_internal(cx, buf)
+ }
+ }
fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
diff --git a/src/io/util/read_exact.rs b/src/io/util/read_exact.rs
index 1e8150e..dbdd58b 100644
--- a/src/io/util/read_exact.rs
+++ b/src/io/util/read_exact.rs
@@ -51,13 +51,13 @@ where
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
- let mut me = self.project();
+ let me = self.project();
loop {
// if our buffer is empty, then we need to read some data to continue.
let rem = me.buf.remaining();
if rem != 0 {
- ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?;
+ ready!(Pin::new(&mut *me.reader).poll_read(cx, me.buf))?;
if me.buf.remaining() == rem {
return Err(eof()).into();
}
diff --git a/src/io/util/take.rs b/src/io/util/take.rs
index b5e90c9..df2f61b 100644
--- a/src/io/util/take.rs
+++ b/src/io/util/take.rs
@@ -86,7 +86,11 @@ impl<R: AsyncRead> AsyncRead for Take<R> {
let me = self.project();
let mut b = buf.take(*me.limit_ as usize);
+
+ let buf_ptr = b.filled().as_ptr();
ready!(me.inner.poll_read(cx, &mut b))?;
+ assert_eq!(b.filled().as_ptr(), buf_ptr);
+
let n = b.filled().len();
// We need to update the original ReadBuf
diff --git a/src/io/util/vec_with_initialized.rs b/src/io/util/vec_with_initialized.rs
index 208cc93..a9b94e3 100644
--- a/src/io/util/vec_with_initialized.rs
+++ b/src/io/util/vec_with_initialized.rs
@@ -1,19 +1,18 @@
use crate::io::ReadBuf;
use std::mem::MaybeUninit;
-mod private {
- pub trait Sealed {}
-
- impl Sealed for Vec<u8> {}
- impl Sealed for &mut Vec<u8> {}
-}
+/// Something that looks like a `Vec<u8>`.
+///
+/// # Safety
+///
+/// The implementor must guarantee that the vector returned by the
+/// `as_mut` and `as_mut` methods do not change from one call to
+/// another.
+pub(crate) unsafe trait VecU8: AsRef<Vec<u8>> + AsMut<Vec<u8>> {}
-/// A sealed trait that constrains the generic type parameter in `VecWithInitialized<V>`. That struct's safety relies
-/// on certain invariants upheld by `Vec<u8>`.
-pub(crate) trait VecU8: AsMut<Vec<u8>> + private::Sealed {}
+unsafe impl VecU8 for Vec<u8> {}
+unsafe impl VecU8 for &mut Vec<u8> {}
-impl VecU8 for Vec<u8> {}
-impl VecU8 for &mut Vec<u8> {}
/// This struct wraps a `Vec<u8>` or `&mut Vec<u8>`, combining it with a
/// `num_initialized`, which keeps track of the number of initialized bytes
/// in the unused capacity.
@@ -64,8 +63,8 @@ where
}
#[cfg(feature = "io-util")]
- pub(crate) fn is_empty(&mut self) -> bool {
- self.vec.as_mut().is_empty()
+ pub(crate) fn is_empty(&self) -> bool {
+ self.vec.as_ref().is_empty()
}
pub(crate) fn get_read_buf<'a>(&'a mut self) -> ReadBuf<'a> {
diff --git a/src/io/util/write_all.rs b/src/io/util/write_all.rs
index e59d41e..abd3e39 100644
--- a/src/io/util/write_all.rs
+++ b/src/io/util/write_all.rs
@@ -42,7 +42,7 @@ where
while !me.buf.is_empty() {
let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf))?;
{
- let (_, rest) = mem::replace(&mut *me.buf, &[]).split_at(n);
+ let (_, rest) = mem::take(&mut *me.buf).split_at(n);
*me.buf = rest;
}
if n == 0 {