diff options
Diffstat (limited to 'src/io')
-rw-r--r-- | src/io/async_fd.rs | 34 | ||||
-rw-r--r-- | src/io/blocking.rs | 3 | ||||
-rw-r--r-- | src/io/bsd/poll_aio.rs | 6 | ||||
-rw-r--r-- | src/io/driver/mod.rs | 354 | ||||
-rw-r--r-- | src/io/driver/platform.rs | 44 | ||||
-rw-r--r-- | src/io/driver/registration.rs | 262 | ||||
-rw-r--r-- | src/io/driver/scheduled_io.rs | 533 | ||||
-rw-r--r-- | src/io/interest.rs (renamed from src/io/driver/interest.rs) | 4 | ||||
-rw-r--r-- | src/io/mod.rs | 16 | ||||
-rw-r--r-- | src/io/poll_evented.rs | 88 | ||||
-rw-r--r-- | src/io/read_buf.rs | 42 | ||||
-rw-r--r-- | src/io/ready.rs (renamed from src/io/driver/ready.rs) | 2 | ||||
-rw-r--r-- | src/io/split.rs | 1 | ||||
-rw-r--r-- | src/io/stdio_common.rs | 6 | ||||
-rw-r--r-- | src/io/util/async_seek_ext.rs | 2 | ||||
-rw-r--r-- | src/io/util/async_write_ext.rs | 10 | ||||
-rw-r--r-- | src/io/util/buf_reader.rs | 3 | ||||
-rw-r--r-- | src/io/util/copy.rs | 62 | ||||
-rw-r--r-- | src/io/util/empty.rs | 20 | ||||
-rw-r--r-- | src/io/util/fill_buf.rs | 6 | ||||
-rw-r--r-- | src/io/util/mem.rs | 64 | ||||
-rw-r--r-- | src/io/util/read_exact.rs | 4 | ||||
-rw-r--r-- | src/io/util/take.rs | 4 | ||||
-rw-r--r-- | src/io/util/vec_with_initialized.rs | 25 | ||||
-rw-r--r-- | src/io/util/write_all.rs | 2 |
25 files changed, 298 insertions, 1299 deletions
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs index 9ec5b7f..92fc6b3 100644 --- a/src/io/async_fd.rs +++ b/src/io/async_fd.rs @@ -1,4 +1,6 @@ -use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; +use crate::io::Interest; +use crate::runtime::io::{ReadyEvent, Registration}; +use crate::runtime::scheduler; use mio::unix::SourceFd; use std::io; @@ -81,6 +83,7 @@ use std::{task::Context, task::Poll}; /// /// impl AsyncTcpStream { /// pub fn new(tcp: TcpStream) -> io::Result<Self> { +/// tcp.set_nonblocking(true)?; /// Ok(Self { /// inner: AsyncFd::new(tcp)?, /// }) @@ -166,12 +169,18 @@ pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> { const ALL_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE); impl<T: AsRawFd> AsyncFd<T> { - #[inline] /// Creates an AsyncFd backed by (and taking ownership of) an object /// implementing [`AsRawFd`]. The backing file descriptor is cached at the /// time of creation. /// /// This method must be called in the context of a tokio runtime. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt` + /// feature flag is not enabled. + #[inline] + #[track_caller] pub fn new(inner: T) -> io::Result<Self> where T: AsRawFd, @@ -179,19 +188,26 @@ impl<T: AsRawFd> AsyncFd<T> { Self::with_interest(inner, ALL_INTEREST) } - #[inline] /// Creates new instance as `new` with additional ability to customize interest, /// allowing to specify whether file descriptor will be polled for read, write or both. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt` + /// feature flag is not enabled. + #[inline] + #[track_caller] pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self> where T: AsRawFd, { - Self::new_with_handle_and_interest(inner, Handle::current(), interest) + Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) } + #[track_caller] pub(crate) fn new_with_handle_and_interest( inner: T, - handle: Handle, + handle: scheduler::Handle, interest: Interest, ) -> io::Result<Self> { let fd = inner.as_raw_fd(); @@ -525,7 +541,7 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { #[cfg_attr(docsrs, doc(alias = "with_io"))] pub fn try_io<R>( &mut self, - f: impl FnOnce(&AsyncFd<Inner>) -> io::Result<R>, + f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError> { let result = f(self.async_fd); @@ -542,12 +558,12 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { } /// Returns a shared reference to the inner [`AsyncFd`]. - pub fn get_ref(&self) -> &AsyncFd<Inner> { + pub fn get_ref(&self) -> &'a AsyncFd<Inner> { self.async_fd } /// Returns a shared reference to the backing object of the inner [`AsyncFd`]. - pub fn get_inner(&self) -> &Inner { + pub fn get_inner(&self) -> &'a Inner { self.get_ref().get_ref() } } @@ -598,7 +614,7 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> { &mut self, f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError> { - let result = f(&mut self.async_fd); + let result = f(self.async_fd); if let Err(e) = result.as_ref() { if e.kind() == io::ErrorKind::WouldBlock { diff --git a/src/io/blocking.rs b/src/io/blocking.rs index 1d79ee7..f6db450 100644 --- a/src/io/blocking.rs +++ b/src/io/blocking.rs @@ -34,8 +34,9 @@ enum State<T> { Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), } -cfg_io_std! { +cfg_io_blocking! { impl<T> Blocking<T> { + #[cfg_attr(feature = "fs", allow(dead_code))] pub(crate) fn new(inner: T) -> Blocking<T> { Blocking { inner: Some(inner), diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs index f1ac4b2..6ac9e28 100644 --- a/src/io/bsd/poll_aio.rs +++ b/src/io/bsd/poll_aio.rs @@ -1,6 +1,8 @@ //! Use POSIX AIO futures with Tokio. -use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; +use crate::io::interest::Interest; +use crate::runtime::io::{ReadyEvent, Registration}; +use crate::runtime::scheduler; use mio::event::Source; use mio::Registry; use mio::Token; @@ -117,7 +119,7 @@ impl<E: AioSource> Aio<E> { fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { let mut io = MioSource(io); - let handle = Handle::current(); + let handle = scheduler::Handle::current(); let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { io, registration }) } diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs deleted file mode 100644 index 19f67a2..0000000 --- a/src/io/driver/mod.rs +++ /dev/null @@ -1,354 +0,0 @@ -#![cfg_attr(not(feature = "rt"), allow(dead_code))] - -mod interest; -#[allow(unreachable_pub)] -pub use interest::Interest; - -mod ready; -#[allow(unreachable_pub)] -pub use ready::Ready; - -mod registration; -pub(crate) use registration::Registration; - -mod scheduled_io; -use scheduled_io::ScheduledIo; - -use crate::park::{Park, Unpark}; -use crate::util::slab::{self, Slab}; -use crate::{loom::sync::Mutex, util::bit}; - -use std::fmt; -use std::io; -use std::sync::{Arc, Weak}; -use std::time::Duration; - -/// I/O driver, backed by Mio. -pub(crate) struct Driver { - /// Tracks the number of times `turn` is called. It is safe for this to wrap - /// as it is mostly used to determine when to call `compact()`. - tick: u8, - - /// Reuse the `mio::Events` value across calls to poll. - events: Option<mio::Events>, - - /// Primary slab handle containing the state for each resource registered - /// with this driver. During Drop this is moved into the Inner structure, so - /// this is an Option to allow it to be vacated (until Drop this is always - /// Some). - resources: Option<Slab<ScheduledIo>>, - - /// The system event queue. - poll: mio::Poll, - - /// State shared between the reactor and the handles. - inner: Arc<Inner>, -} - -/// A reference to an I/O driver. -#[derive(Clone)] -pub(crate) struct Handle { - inner: Weak<Inner>, -} - -#[derive(Debug)] -pub(crate) struct ReadyEvent { - tick: u8, - pub(crate) ready: Ready, -} - -pub(super) struct Inner { - /// Primary slab handle containing the state for each resource registered - /// with this driver. - /// - /// The ownership of this slab is moved into this structure during - /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles - /// without risking new ones being registered in the meantime. - resources: Mutex<Option<Slab<ScheduledIo>>>, - - /// Registers I/O resources. - registry: mio::Registry, - - /// Allocates `ScheduledIo` handles when creating new resources. - pub(super) io_dispatch: slab::Allocator<ScheduledIo>, - - /// Used to wake up the reactor from a call to `turn`. - waker: mio::Waker, -} - -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -enum Direction { - Read, - Write, -} - -enum Tick { - Set(u8), - Clear(u8), -} - -// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup -// token. -const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); - -const ADDRESS: bit::Pack = bit::Pack::least_significant(24); - -// Packs the generation value in the `readiness` field. -// -// The generation prevents a race condition where a slab slot is reused for a -// new socket while the I/O driver is about to apply a readiness event. The -// generation value is checked when setting new readiness. If the generation do -// not match, then the readiness event is discarded. -const GENERATION: bit::Pack = ADDRESS.then(7); - -fn _assert_kinds() { - fn _assert<T: Send + Sync>() {} - - _assert::<Handle>(); -} - -// ===== impl Driver ===== - -impl Driver { - /// Creates a new event loop, returning any error that happened during the - /// creation. - pub(crate) fn new() -> io::Result<Driver> { - let poll = mio::Poll::new()?; - let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; - let registry = poll.registry().try_clone()?; - - let slab = Slab::new(); - let allocator = slab.allocator(); - - Ok(Driver { - tick: 0, - events: Some(mio::Events::with_capacity(1024)), - poll, - resources: Some(slab), - inner: Arc::new(Inner { - resources: Mutex::new(None), - registry, - io_dispatch: allocator, - waker, - }), - }) - } - - /// Returns a handle to this event loop which can be sent across threads - /// and can be used as a proxy to the event loop itself. - /// - /// Handles are cloneable and clones always refer to the same event loop. - /// This handle is typically passed into functions that create I/O objects - /// to bind them to this event loop. - pub(crate) fn handle(&self) -> Handle { - Handle { - inner: Arc::downgrade(&self.inner), - } - } - - fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { - // How often to call `compact()` on the resource slab - const COMPACT_INTERVAL: u8 = 255; - - self.tick = self.tick.wrapping_add(1); - - if self.tick == COMPACT_INTERVAL { - self.resources.as_mut().unwrap().compact() - } - - let mut events = self.events.take().expect("i/o driver event store missing"); - - // Block waiting for an event to happen, peeling out how many events - // happened. - match self.poll.poll(&mut events, max_wait) { - Ok(_) => {} - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - - // Process all the events that came in, dispatching appropriately - for event in events.iter() { - let token = event.token(); - - if token != TOKEN_WAKEUP { - self.dispatch(token, Ready::from_mio(event)); - } - } - - self.events = Some(events); - - Ok(()) - } - - fn dispatch(&mut self, token: mio::Token, ready: Ready) { - let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - - let resources = self.resources.as_mut().unwrap(); - - let io = match resources.get(addr) { - Some(io) => io, - None => return, - }; - - let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); - - if res.is_err() { - // token no longer valid! - return; - } - - io.wake(ready); - } -} - -impl Drop for Driver { - fn drop(&mut self) { - (*self.inner.resources.lock()) = self.resources.take(); - } -} - -impl Drop for Inner { - fn drop(&mut self) { - let resources = self.resources.lock().take(); - - if let Some(mut slab) = resources { - slab.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.shutdown(); - }); - } - } -} - -impl Park for Driver { - type Unpark = Handle; - type Error = io::Error; - - fn unpark(&self) -> Self::Unpark { - self.handle() - } - - fn park(&mut self) -> io::Result<()> { - self.turn(None)?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { - self.turn(Some(duration))?; - Ok(()) - } - - fn shutdown(&mut self) {} -} - -impl fmt::Debug for Driver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Driver") - } -} - -// ===== impl Handle ===== - -cfg_rt! { - impl Handle { - /// Returns a handle to the current reactor. - /// - /// # Panics - /// - /// This function panics if there is no current reactor set and `rt` feature - /// flag is not enabled. - pub(super) fn current() -> Self { - crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") - } - } -} - -cfg_not_rt! { - impl Handle { - /// Returns a handle to the current reactor. - /// - /// # Panics - /// - /// This function panics if there is no current reactor set, or if the `rt` - /// feature flag is not enabled. - pub(super) fn current() -> Self { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) - } - } -} - -impl Handle { - /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise - /// makes the next call to `turn` return immediately. - /// - /// This method is intended to be used in situations where a notification - /// needs to otherwise be sent to the main reactor. If the reactor is - /// currently blocked inside of `turn` then it will wake up and soon return - /// after this method has been called. If the reactor is not currently - /// blocked in `turn`, then the next call to `turn` will not block and - /// return immediately. - fn wakeup(&self) { - if let Some(inner) = self.inner() { - inner.waker.wake().expect("failed to wake I/O driver"); - } - } - - pub(super) fn inner(&self) -> Option<Arc<Inner>> { - self.inner.upgrade() - } -} - -impl Unpark for Handle { - fn unpark(&self) { - self.wakeup(); - } -} - -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} - -// ===== impl Inner ===== - -impl Inner { - /// Registers an I/O resource with the reactor for a given `mio::Ready` state. - /// - /// The registration token is returned. - pub(super) fn add_source( - &self, - source: &mut impl mio::event::Source, - interest: Interest, - ) -> io::Result<slab::Ref<ScheduledIo>> { - let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "reactor at max registered I/O resources", - ) - })?; - - let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - - self.registry - .register(source, mio::Token(token), interest.to_mio())?; - - Ok(shared) - } - - /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { - self.registry.deregister(source) - } -} - -impl Direction { - pub(super) fn mask(self) -> Ready { - match self { - Direction::Read => Ready::READABLE | Ready::READ_CLOSED, - Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, - } - } -} diff --git a/src/io/driver/platform.rs b/src/io/driver/platform.rs deleted file mode 100644 index 6b27988..0000000 --- a/src/io/driver/platform.rs +++ /dev/null @@ -1,44 +0,0 @@ -pub(crate) use self::sys::*; - -#[cfg(unix)] -mod sys { - use mio::unix::UnixReady; - use mio::Ready; - - pub(crate) fn hup() -> Ready { - UnixReady::hup().into() - } - - pub(crate) fn is_hup(ready: Ready) -> bool { - UnixReady::from(ready).is_hup() - } - - pub(crate) fn error() -> Ready { - UnixReady::error().into() - } - - pub(crate) fn is_error(ready: Ready) -> bool { - UnixReady::from(ready).is_error() - } -} - -#[cfg(windows)] -mod sys { - use mio::Ready; - - pub(crate) fn hup() -> Ready { - Ready::empty() - } - - pub(crate) fn is_hup(_: Ready) -> bool { - false - } - - pub(crate) fn error() -> Ready { - Ready::empty() - } - - pub(crate) fn is_error(_: Ready) -> bool { - false - } -} diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs deleted file mode 100644 index 7350be6..0000000 --- a/src/io/driver/registration.rs +++ /dev/null @@ -1,262 +0,0 @@ -#![cfg_attr(not(feature = "net"), allow(dead_code))] - -use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; -use crate::util::slab; - -use mio::event::Source; -use std::io; -use std::task::{Context, Poll}; - -cfg_io_driver! { - /// Associates an I/O resource with the reactor instance that drives it. - /// - /// A registration represents an I/O resource registered with a Reactor such - /// that it will receive task notifications on readiness. This is the lowest - /// level API for integrating with a reactor. - /// - /// The association between an I/O resource is made by calling - /// [`new_with_interest_and_handle`]. - /// Once the association is established, it remains established until the - /// registration instance is dropped. - /// - /// A registration instance represents two separate readiness streams. One - /// for the read readiness and one for write readiness. These streams are - /// independent and can be consumed from separate tasks. - /// - /// **Note**: while `Registration` is `Sync`, the caller must ensure that - /// there are at most two tasks that use a registration instance - /// concurrently. One task for [`poll_read_ready`] and one task for - /// [`poll_write_ready`]. While violating this requirement is "safe" from a - /// Rust memory safety point of view, it will result in unexpected behavior - /// in the form of lost notifications and tasks hanging. - /// - /// ## Platform-specific events - /// - /// `Registration` also allows receiving platform-specific `mio::Ready` - /// events. These events are included as part of the read readiness event - /// stream. The write readiness event stream is only for `Ready::writable()` - /// events. - /// - /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle - /// [`poll_read_ready`]: method@Self::poll_read_ready` - /// [`poll_write_ready`]: method@Self::poll_write_ready` - #[derive(Debug)] - pub(crate) struct Registration { - /// Handle to the associated driver. - handle: Handle, - - /// Reference to state stored by the driver. - shared: slab::Ref<ScheduledIo>, - } -} - -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - -// ===== impl Registration ===== - -impl Registration { - /// Registers the I/O resource with the default reactor, for a specific - /// `Interest`. `new_with_interest` should be used over `new` when you need - /// control over the readiness state, such as when a file descriptor only - /// allows reads. This does not add `hup` or `error` so if you are - /// interested in those states, you will need to add them to the readiness - /// state passed to this function. - /// - /// # Return - /// - /// - `Ok` if the registration happened successfully - /// - `Err` if an error was encountered during registration - pub(crate) fn new_with_interest_and_handle( - io: &mut impl Source, - interest: Interest, - handle: Handle, - ) -> io::Result<Registration> { - let shared = if let Some(inner) = handle.inner() { - inner.add_source(io, interest)? - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to find event loop", - )); - }; - - Ok(Registration { handle, shared }) - } - - /// Deregisters the I/O resource from the reactor it is associated with. - /// - /// This function must be called before the I/O resource associated with the - /// registration is dropped. - /// - /// Note that deregistering does not guarantee that the I/O resource can be - /// registered with a different reactor. Some I/O resource types can only be - /// associated with a single reactor instance for their lifetime. - /// - /// # Return - /// - /// If the deregistration was successful, `Ok` is returned. Any calls to - /// `Reactor::turn` that happen after a successful call to `deregister` will - /// no longer result in notifications getting sent for this registration. - /// - /// `Err` is returned if an error is encountered. - pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.deregister_source(io) - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - self.shared.clear_readiness(event); - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.poll_ready(cx, Direction::Read) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.poll_ready(cx, Direction::Write) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_read_io<R>( - &self, - cx: &mut Context<'_>, - f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - self.poll_io(cx, Direction::Read, f) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_write_io<R>( - &self, - cx: &mut Context<'_>, - f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - self.poll_io(cx, Direction::Write, f) - } - - /// Polls for events on the I/O resource's `direction` readiness stream. - /// - /// If called with a task context, notify the task when a new event is - /// received. - fn poll_ready( - &self, - cx: &mut Context<'_>, - direction: Direction, - ) -> Poll<io::Result<ReadyEvent>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - let ev = ready!(self.shared.poll_readiness(cx, direction)); - - if self.handle.inner().is_none() { - return Poll::Ready(Err(gone())); - } - - coop.made_progress(); - Poll::Ready(Ok(ev)) - } - - fn poll_io<R>( - &self, - cx: &mut Context<'_>, - direction: Direction, - mut f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - loop { - let ev = ready!(self.poll_ready(cx, direction))?; - - match f() { - Ok(ret) => { - return Poll::Ready(Ok(ret)); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - } - } - } - - pub(crate) fn try_io<R>( - &self, - interest: Interest, - f: impl FnOnce() -> io::Result<R>, - ) -> io::Result<R> { - let ev = self.shared.ready_event(interest); - - // Don't attempt the operation if the resource is not ready. - if ev.ready.is_empty() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - match f() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(ev); - Err(io::ErrorKind::WouldBlock.into()) - } - res => res, - } - } -} - -impl Drop for Registration { - fn drop(&mut self) { - // It is possible for a cycle to be created between wakers stored in - // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this - // cycle, wakers are cleared. This is an imperfect solution as it is - // possible to store a `Registration` in a waker. In this case, the - // cycle would remain. - // - // See tokio-rs/tokio#3481 for more details. - self.shared.clear_wakers(); - } -} - -fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") -} - -cfg_io_readiness! { - impl Registration { - pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { - use std::future::Future; - use std::pin::Pin; - - let fut = self.shared.readiness(interest); - pin!(fut); - - crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ))); - } - - Pin::new(&mut fut).poll(cx).map(Ok) - }).await - } - - pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { - loop { - let event = self.readiness(interest).await?; - - match f() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(event); - } - x => return x, - } - } - } - } -} diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs deleted file mode 100644 index 76f9343..0000000 --- a/src/io/driver/scheduled_io.rs +++ /dev/null @@ -1,533 +0,0 @@ -use super::{Interest, Ready, ReadyEvent, Tick}; -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; -use crate::util::bit; -use crate::util::slab::Entry; -use crate::util::WakeList; - -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; -use std::task::{Context, Poll, Waker}; - -use super::Direction; - -cfg_io_readiness! { - use crate::util::linked_list::{self, LinkedList}; - - use std::cell::UnsafeCell; - use std::future::Future; - use std::marker::PhantomPinned; - use std::pin::Pin; - use std::ptr::NonNull; -} - -/// Stored in the I/O driver resource slab. -#[derive(Debug)] -pub(crate) struct ScheduledIo { - /// Packs the resource's readiness with the resource's generation. - readiness: AtomicUsize, - - waiters: Mutex<Waiters>, -} - -cfg_io_readiness! { - type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; -} - -#[derive(Debug, Default)] -struct Waiters { - #[cfg(feature = "net")] - /// List of all current waiters. - list: WaitList, - - /// Waker used for AsyncRead. - reader: Option<Waker>, - - /// Waker used for AsyncWrite. - writer: Option<Waker>, - - /// True if this ScheduledIo has been killed due to IO driver shutdown. - is_shutdown: bool, -} - -cfg_io_readiness! { - #[derive(Debug)] - struct Waiter { - pointers: linked_list::Pointers<Waiter>, - - /// The waker for this task. - waker: Option<Waker>, - - /// The interest this waiter is waiting on. - interest: Interest, - - is_ready: bool, - - /// Should never be `!Unpin`. - _p: PhantomPinned, - } - - /// Future returned by `readiness()`. - struct Readiness<'a> { - scheduled_io: &'a ScheduledIo, - - state: State, - - /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell<Waiter>, - } - - enum State { - Init, - Waiting, - Done, - } -} - -// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. -// -// | reserved | generation | driver tick | readiness | -// |----------+------------+--------------+-----------| -// | 1 bit | 7 bits + 8 bits + 16 bits | - -const READINESS: bit::Pack = bit::Pack::least_significant(16); - -const TICK: bit::Pack = READINESS.then(8); - -const GENERATION: bit::Pack = TICK.then(7); - -#[test] -fn test_generations_assert_same() { - assert_eq!(super::GENERATION, GENERATION); -} - -// ===== impl ScheduledIo ===== - -impl Entry for ScheduledIo { - fn reset(&self) { - let state = self.readiness.load(Acquire); - - let generation = GENERATION.unpack(state); - let next = GENERATION.pack_lossy(generation + 1, 0); - - self.readiness.store(next, Release); - } -} - -impl Default for ScheduledIo { - fn default() -> ScheduledIo { - ScheduledIo { - readiness: AtomicUsize::new(0), - waiters: Mutex::new(Default::default()), - } - } -} - -impl ScheduledIo { - pub(crate) fn generation(&self) -> usize { - GENERATION.unpack(self.readiness.load(Acquire)) - } - - /// Invoked when the IO driver is shut down; forces this ScheduledIo into a - /// permanently ready state. - pub(super) fn shutdown(&self) { - self.wake0(Ready::ALL, true) - } - - /// Sets the readiness on this `ScheduledIo` by invoking the given closure on - /// the current value, returning the previous readiness value. - /// - /// # Arguments - /// - `token`: the token for this `ScheduledIo`. - /// - `tick`: whether setting the tick or trying to clear readiness for a - /// specific tick. - /// - `f`: a closure returning a new readiness value given the previous - /// readiness. - /// - /// # Returns - /// - /// If the given token's generation no longer matches the `ScheduledIo`'s - /// generation, then the corresponding IO resource has been removed and - /// replaced with a new resource. In that case, this method returns `Err`. - /// Otherwise, this returns the previous readiness. - pub(super) fn set_readiness( - &self, - token: Option<usize>, - tick: Tick, - f: impl Fn(Ready) -> Ready, - ) -> Result<(), ()> { - let mut current = self.readiness.load(Acquire); - - loop { - let current_generation = GENERATION.unpack(current); - - if let Some(token) = token { - // Check that the generation for this access is still the - // current one. - if GENERATION.unpack(token) != current_generation { - return Err(()); - } - } - - // Mask out the tick/generation bits so that the modifying - // function doesn't see them. - let current_readiness = Ready::from_usize(current); - let new = f(current_readiness); - - let packed = match tick { - Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), - Tick::Clear(t) => { - if TICK.unpack(current) as u8 != t { - // Trying to clear readiness with an old event! - return Err(()); - } - - TICK.pack(t as usize, new.as_usize()) - } - }; - - let next = GENERATION.pack(current_generation, packed); - - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => return Ok(()), - // we lost the race, retry! - Err(actual) => current = actual, - } - } - } - - /// Notifies all pending waiters that have registered interest in `ready`. - /// - /// There may be many waiters to notify. Waking the pending task **must** be - /// done from outside of the lock otherwise there is a potential for a - /// deadlock. - /// - /// A stack array of wakers is created and filled with wakers to notify, the - /// lock is released, and the wakers are notified. Because there may be more - /// than 32 wakers to notify, if the stack array fills up, the lock is - /// released, the array is cleared, and the iteration continues. - pub(super) fn wake(&self, ready: Ready) { - self.wake0(ready, false); - } - - fn wake0(&self, ready: Ready, shutdown: bool) { - let mut wakers = WakeList::new(); - - let mut waiters = self.waiters.lock(); - - waiters.is_shutdown |= shutdown; - - // check for AsyncRead slot - if ready.is_readable() { - if let Some(waker) = waiters.reader.take() { - wakers.push(waker); - } - } - - // check for AsyncWrite slot - if ready.is_writable() { - if let Some(waker) = waiters.writer.take() { - wakers.push(waker); - } - } - - #[cfg(feature = "net")] - 'outer: loop { - let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - - while wakers.can_push() { - match iter.next() { - Some(waiter) => { - let waiter = unsafe { &mut *waiter.as_ptr() }; - - if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; - wakers.push(waker); - } - } - None => { - break 'outer; - } - } - } - - drop(waiters); - - wakers.wake_all(); - - // Acquire the lock again. - waiters = self.waiters.lock(); - } - - // Release the lock before notifying - drop(waiters); - - wakers.wake_all(); - } - - pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { - let curr = self.readiness.load(Acquire); - - ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), - } - } - - /// Polls for readiness events in a given direction. - /// - /// These are to support `AsyncRead` and `AsyncWrite` polling methods, - /// which cannot use the `async fn` version. This uses reserved reader - /// and writer slots. - pub(super) fn poll_readiness( - &self, - cx: &mut Context<'_>, - direction: Direction, - ) -> Poll<ReadyEvent> { - let curr = self.readiness.load(Acquire); - - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - - if ready.is_empty() { - // Update the task info - let mut waiters = self.waiters.lock(); - let slot = match direction { - Direction::Read => &mut waiters.reader, - Direction::Write => &mut waiters.writer, - }; - - // Avoid cloning the waker if one is already stored that matches the - // current task. - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { - *existing = cx.waker().clone(); - } - } - None => { - *slot = Some(cx.waker().clone()); - } - } - - // Try again, in case the readiness was changed while we were - // taking the waiters lock - let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if waiters.is_shutdown { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready: direction.mask(), - }) - } else if ready.is_empty() { - Poll::Pending - } else { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready, - }) - } - } else { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready, - }) - } - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - // This consumes the current readiness state **except** for closed - // states. Closed states are excluded because they are final states. - let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; - - // result isn't important - let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); - } - - pub(crate) fn clear_wakers(&self) { - let mut waiters = self.waiters.lock(); - waiters.reader.take(); - waiters.writer.take(); - } -} - -impl Drop for ScheduledIo { - fn drop(&mut self) { - self.wake(Ready::ALL); - } -} - -unsafe impl Send for ScheduledIo {} -unsafe impl Sync for ScheduledIo {} - -cfg_io_readiness! { - impl ScheduledIo { - /// An async version of `poll_readiness` which uses a linked list of wakers. - pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { - self.readiness_fut(interest).await - } - - // This is in a separate function so that the borrow checker doesn't think - // we are borrowing the `UnsafeCell` possibly over await boundaries. - // - // Go figure. - fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { - Readiness { - scheduled_io: self, - state: State::Init, - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - is_ready: false, - interest, - _p: PhantomPinned, - }), - } - } - } - - unsafe impl linked_list::Link for Waiter { - type Handle = NonNull<Waiter>; - type Target = Waiter; - - fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { - *handle - } - - unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { - ptr - } - - unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { - NonNull::from(&mut target.as_mut().pointers) - } - } - - // ===== impl Readiness ===== - - impl Future for Readiness<'_> { - type Output = ReadyEvent; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - use std::sync::atomic::Ordering::SeqCst; - - let (scheduled_io, state, waiter) = unsafe { - let me = self.get_unchecked_mut(); - (&me.scheduled_io, &mut me.state, &me.waiter) - }; - - loop { - match *state { - State::Init => { - // Optimistically check existing readiness - let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); - - // Safety: `waiter.interest` never changes - let interest = unsafe { (*waiter.get()).interest }; - let ready = ready.intersection(interest); - - if !ready.is_empty() { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); - } - - // Wasn't ready, take the lock (and check again while locked). - let mut waiters = scheduled_io.waiters.lock(); - - let curr = scheduled_io.readiness.load(SeqCst); - let mut ready = Ready::from_usize(READINESS.unpack(curr)); - - if waiters.is_shutdown { - ready = Ready::ALL; - } - - let ready = ready.intersection(interest); - - if !ready.is_empty() { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); - } - - // Not ready even after locked, insert into list... - - // Safety: called while locked - unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); - } - - // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters - .list - .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); - *state = State::Waiting; - } - State::Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. - - let waiters = scheduled_io.waiters.lock(); - - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; - - if w.is_ready { - // Our waker has been notified. - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } - - return Poll::Pending; - } - - // Explicit drop of the lock to indicate the scope that the - // lock is held. Because holding the lock is required to - // ensure safe access to fields not held within the lock, it - // is helpful to visualize the scope of the critical - // section. - drop(waiters); - } - State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - - // Safety: State::Done means it is no longer shared - let w = unsafe { &mut *waiter.get() }; - - return Poll::Ready(ReadyEvent { - tick, - ready: Ready::from_interest(w.interest), - }); - } - } - } - } - } - - impl Drop for Readiness<'_> { - fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock(); - - // Safety: `waiter` is only ever stored in `waiters` - unsafe { - waiters - .list - .remove(NonNull::new_unchecked(self.waiter.get())) - }; - } - } - - unsafe impl Send for Readiness<'_> {} - unsafe impl Sync for Readiness<'_> {} -} diff --git a/src/io/driver/interest.rs b/src/io/interest.rs index d6b46df..013c114 100644 --- a/src/io/driver/interest.rs +++ b/src/io/interest.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))] -use crate::io::driver::Ready; +use crate::io::ready::Ready; use std::fmt; use std::ops; @@ -100,7 +100,7 @@ impl Interest { self.0 } - pub(super) fn mask(self) -> Ready { + pub(crate) fn mask(self) -> Ready { match self { Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, diff --git a/src/io/mod.rs b/src/io/mod.rs index cfdda61..f48035a 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,5 +1,3 @@ -#![cfg_attr(loom, allow(dead_code, unreachable_pub))] - //! Traits, helpers, and type definitions for asynchronous I/O functionality. //! //! This module is the asynchronous version of `std::io`. Primarily, it @@ -180,6 +178,12 @@ //! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html //! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html //! [`Write`]: std::io::Write + +#![cfg_attr( + not(all(feature = "rt", feature = "net")), + allow(dead_code, unused_imports) +)] + cfg_io_blocking! { pub(crate) mod blocking; } @@ -205,15 +209,19 @@ pub use self::read_buf::ReadBuf; pub use std::io::{Error, ErrorKind, Result, SeekFrom}; cfg_io_driver_impl! { - pub(crate) mod driver; + pub(crate) mod interest; + pub(crate) mod ready; cfg_net! { - pub use driver::{Interest, Ready}; + pub use interest::Interest; + pub use ready::Ready; } + #[cfg_attr(tokio_wasi, allow(unused_imports))] mod poll_evented; #[cfg(not(loom))] + #[cfg_attr(tokio_wasi, allow(unused_imports))] pub(crate) use poll_evented::PollEvented; } diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 44e68a2..dfe9ae3 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -1,16 +1,19 @@ -use crate::io::driver::{Handle, Interest, Registration}; +use crate::io::interest::Interest; +use crate::runtime::io::Registration; +use crate::runtime::scheduler; use mio::event::Source; use std::fmt; use std::io; use std::ops::Deref; +use std::panic::{RefUnwindSafe, UnwindSafe}; cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or /// [`std::io::Write`] traits with the reactor that drives it. /// /// `PollEvented` uses [`Registration`] internally to take a type that - /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or + /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or /// [`std::io::Write`] and associate it with a reactor that will drive it. /// /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be @@ -40,12 +43,12 @@ cfg_io_driver! { /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O - /// resource not being ready, the caller must call `clear_readiness`. + /// resource not being ready, the caller must call [`clear_readiness`]. /// This clears the readiness state until a new readiness event is received. /// /// This allows the caller to implement additional functions. For example, /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and - /// `clear_read_ready`. + /// [`clear_readiness`]. /// /// ## Platform-specific events /// @@ -56,6 +59,7 @@ cfg_io_driver! { /// [`AsyncRead`]: crate::io::AsyncRead /// [`AsyncWrite`]: crate::io::AsyncWrite /// [`TcpListener`]: crate::net::TcpListener + /// [`clear_readiness`]: Registration::clear_readiness /// [`poll_read_ready`]: Registration::poll_read_ready /// [`poll_write_ready`]: Registration::poll_write_ready pub(crate) struct PollEvented<E: Source> { @@ -76,6 +80,7 @@ impl<E: Source> PollEvented<E> { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[track_caller] #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result<Self> { PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) @@ -96,15 +101,17 @@ impl<E: Source> PollEvented<E> { /// a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) /// function. + #[track_caller] #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { - Self::new_with_interest_and_handle(io, interest, Handle::current()) + Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) } + #[track_caller] pub(crate) fn new_with_interest_and_handle( mut io: E, interest: Interest, - handle: Handle, + handle: scheduler::Handle, ) -> io::Result<Self> { let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { @@ -114,11 +121,7 @@ impl<E: Source> PollEvented<E> { } /// Returns a reference to the registration. - #[cfg(any( - feature = "net", - all(unix, feature = "process"), - all(unix, feature = "signal"), - ))] + #[cfg(any(feature = "net"))] pub(crate) fn registration(&self) -> &Registration { &self.registration } @@ -133,7 +136,7 @@ impl<E: Source> PollEvented<E> { } feature! { - #![any(feature = "net", feature = "process")] + #![any(feature = "net", all(unix, feature = "process"))] use crate::io::ReadBuf; use std::task::{Context, Poll}; @@ -150,16 +153,32 @@ feature! { { use std::io::Read; - let n = ready!(self.registration.poll_read_io(cx, || { + loop { + let evt = ready!(self.registration.poll_read_ready(cx))?; + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); - self.io.as_ref().unwrap().read(b) - }))?; - - // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the - // buffer. - buf.assume_init(n); - buf.advance(n); - Poll::Ready(Ok(())) + let len = b.len(); + + match self.io.as_ref().unwrap().read(b) { + Ok(n) => { + // if we read a partially full buffer, this is sufficient on unix to show + // that the socket buffer has been drained + if n > 0 && (!cfg!(windows) && n < len) { + self.registration.clear_readiness(evt); + } + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + buf.advance(n); + return Poll::Ready(Ok(())); + }, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + self.registration.clear_readiness(evt); + } + Err(e) => return Poll::Ready(Err(e)), + } + } } pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> @@ -167,10 +186,29 @@ feature! { &'a E: io::Write + 'a, { use std::io::Write; - self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf)) + + loop { + let evt = ready!(self.registration.poll_write_ready(cx))?; + + match self.io.as_ref().unwrap().write(buf) { + Ok(n) => { + // if we write only part of our buffer, this is sufficient on unix to show + // that the socket buffer is full + if n > 0 && (!cfg!(windows) && n < buf.len()) { + self.registration.clear_readiness(evt); + } + + return Poll::Ready(Ok(n)); + }, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + self.registration.clear_readiness(evt); + } + Err(e) => return Poll::Ready(Err(e)), + } + } } - #[cfg(feature = "net")] + #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn poll_write_vectored<'a>( &'a self, cx: &mut Context<'_>, @@ -185,6 +223,10 @@ feature! { } } +impl<E: Source> UnwindSafe for PollEvented<E> {} + +impl<E: Source> RefUnwindSafe for PollEvented<E> {} + impl<E: Source> Deref for PollEvented<E> { type Target = E; diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs index ad58cbe..0dc595a 100644 --- a/src/io/read_buf.rs +++ b/src/io/read_buf.rs @@ -1,9 +1,5 @@ -// This lint claims ugly casting is somehow safer than transmute, but there's -// no evidence that is the case. Shush. -#![allow(clippy::transmute_ptr_to_ptr)] - use std::fmt; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; /// A wrapper around a byte buffer that is incrementally filled and initialized. /// @@ -35,7 +31,7 @@ impl<'a> ReadBuf<'a> { #[inline] pub fn new(buf: &'a mut [u8]) -> ReadBuf<'a> { let initialized = buf.len(); - let buf = unsafe { mem::transmute::<&mut [u8], &mut [MaybeUninit<u8>]>(buf) }; + let buf = unsafe { slice_to_uninit_mut(buf) }; ReadBuf { buf, filled: 0, @@ -67,8 +63,7 @@ impl<'a> ReadBuf<'a> { let slice = &self.buf[..self.filled]; // safety: filled describes how far into the buffer that the // user has filled with bytes, so it's been initialized. - // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable. - unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) } + unsafe { slice_assume_init(slice) } } /// Returns a mutable reference to the filled portion of the buffer. @@ -77,8 +72,7 @@ impl<'a> ReadBuf<'a> { let slice = &mut self.buf[..self.filled]; // safety: filled describes how far into the buffer that the // user has filled with bytes, so it's been initialized. - // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable. - unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) } + unsafe { slice_assume_init_mut(slice) } } /// Returns a new `ReadBuf` comprised of the unfilled section up to `n`. @@ -97,8 +91,7 @@ impl<'a> ReadBuf<'a> { let slice = &self.buf[..self.initialized]; // safety: initialized describes how far into the buffer that the // user has at some point initialized with bytes. - // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable. - unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) } + unsafe { slice_assume_init(slice) } } /// Returns a mutable reference to the initialized portion of the buffer. @@ -109,15 +102,14 @@ impl<'a> ReadBuf<'a> { let slice = &mut self.buf[..self.initialized]; // safety: initialized describes how far into the buffer that the // user has at some point initialized with bytes. - // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable. - unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) } + unsafe { slice_assume_init_mut(slice) } } /// Returns a mutable reference to the entire buffer, without ensuring that it has been fully /// initialized. /// /// The elements between 0 and `self.filled().len()` are filled, and those between 0 and - /// `self.initialized().len()` are initialized (and so can be transmuted to a `&mut [u8]`). + /// `self.initialized().len()` are initialized (and so can be converted to a `&mut [u8]`). /// /// The caller of this method must ensure that these invariants are upheld. For example, if the /// caller initializes some of the uninitialized section of the buffer, it must call @@ -160,6 +152,7 @@ impl<'a> ReadBuf<'a> { /// /// Panics if `self.remaining()` is less than `n`. #[inline] + #[track_caller] pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] { assert!(self.remaining() >= n, "n overflows remaining"); @@ -178,7 +171,7 @@ impl<'a> ReadBuf<'a> { let slice = &mut self.buf[self.filled..end]; // safety: just above, we checked that the end of the buf has // been initialized to some value. - unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) } + unsafe { slice_assume_init_mut(slice) } } /// Returns the number of bytes at the end of the slice that have not yet been filled. @@ -203,6 +196,7 @@ impl<'a> ReadBuf<'a> { /// /// Panics if the filled region of the buffer would become larger than the initialized region. #[inline] + #[track_caller] pub fn advance(&mut self, n: usize) { let new = self.filled.checked_add(n).expect("filled overflow"); self.set_filled(new); @@ -219,6 +213,7 @@ impl<'a> ReadBuf<'a> { /// /// Panics if the filled region of the buffer would become larger than the initialized region. #[inline] + #[track_caller] pub fn set_filled(&mut self, n: usize) { assert!( n <= self.initialized, @@ -249,6 +244,7 @@ impl<'a> ReadBuf<'a> { /// /// Panics if `self.remaining()` is less than `buf.len()`. #[inline] + #[track_caller] pub fn put_slice(&mut self, buf: &[u8]) { assert!( self.remaining() >= buf.len(), @@ -283,3 +279,17 @@ impl fmt::Debug for ReadBuf<'_> { .finish() } } + +unsafe fn slice_to_uninit_mut(slice: &mut [u8]) -> &mut [MaybeUninit<u8>] { + &mut *(slice as *mut [u8] as *mut [MaybeUninit<u8>]) +} + +// TODO: This could use `MaybeUninit::slice_assume_init` when it is stable. +unsafe fn slice_assume_init(slice: &[MaybeUninit<u8>]) -> &[u8] { + &*(slice as *const [MaybeUninit<u8>] as *const [u8]) +} + +// TODO: This could use `MaybeUninit::slice_assume_init_mut` when it is stable. +unsafe fn slice_assume_init_mut(slice: &mut [MaybeUninit<u8>]) -> &mut [u8] { + &mut *(slice as *mut [MaybeUninit<u8>] as *mut [u8]) +} diff --git a/src/io/driver/ready.rs b/src/io/ready.rs index 2430d30..ef135c4 100644 --- a/src/io/driver/ready.rs +++ b/src/io/ready.rs @@ -12,7 +12,7 @@ const WRITE_CLOSED: usize = 0b0_1000; /// /// `Ready` tracks which operation an I/O resource is ready to perform. #[cfg_attr(docsrs, doc(cfg(feature = "net")))] -#[derive(Clone, Copy, PartialEq, PartialOrd)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Ready(usize); impl Ready { diff --git a/src/io/split.rs b/src/io/split.rs index 8258a0f..2e0da95 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -74,6 +74,7 @@ impl<T> ReadHalf<T> { /// same `split` operation this method will panic. /// This can be checked ahead of time by comparing the stream ID /// of the two halves. + #[track_caller] pub fn unsplit(self, wr: WriteHalf<T>) -> T { if self.is_pair_of(&wr) { drop(wr); diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs index 7e4a198..2715ba7 100644 --- a/src/io/stdio_common.rs +++ b/src/io/stdio_common.rs @@ -42,7 +42,7 @@ where // for further code. Since `AsyncWrite` can always shrink // buffer at its discretion, excessive (i.e. in tests) shrinking // does not break correctness. - // 2. If buffer is small, it will not be shrinked. + // 2. If buffer is small, it will not be shrunk. // That's why, it's "textness" will not change, so we don't have // to fixup it. if cfg!(not(any(target_os = "windows", test))) || buf.len() <= crate::io::blocking::MAX_BUF @@ -193,7 +193,7 @@ mod tests { fn test_pseudo_text() { // In this test we write a piece of binary data, whose beginning is // text though. We then validate that even in this corner case buffer - // was not shrinked too much. + // was not shrunk too much. let checked_count = super::MAGIC_CONST * super::MAX_BYTES_PER_CHAR; let mut data: Vec<u8> = str::repeat("a", checked_count).into(); data.extend(std::iter::repeat(0b1010_1010).take(MAX_BUF - checked_count + 1)); @@ -212,7 +212,7 @@ mod tests { writer.write_history.iter().copied().sum::<usize>(), data.len() ); - // Check that at most MAX_BYTES_PER_CHAR + 1 (i.e. 5) bytes were shrinked + // Check that at most MAX_BYTES_PER_CHAR + 1 (i.e. 5) bytes were shrunk // from the buffer: one because it was outside of MAX_BUF boundary, and // up to one "utf8 code point". assert!(data.len() - writer.write_history[0] <= super::MAX_BYTES_PER_CHAR + 1); diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs index 46b3e6c..aadf3a7 100644 --- a/src/io/util/async_seek_ext.rs +++ b/src/io/util/async_seek_ext.rs @@ -69,7 +69,7 @@ cfg_io_util! { /// Creates a future which will rewind to the beginning of the stream. /// - /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`. + /// This is convenience method, equivalent to `self.seek(SeekFrom::Start(0))`. fn rewind(&mut self) -> Seek<'_, Self> where Self: Unpin, diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs index 93a3183..dfdde82 100644 --- a/src/io/util/async_write_ext.rs +++ b/src/io/util/async_write_ext.rs @@ -406,7 +406,7 @@ cfg_io_util! { /// ``` fn write_u8(&mut self, n: u8) -> WriteU8; - /// Writes an unsigned 8-bit integer to the underlying writer. + /// Writes a signed 8-bit integer to the underlying writer. /// /// Equivalent to: /// @@ -425,7 +425,7 @@ cfg_io_util! { /// /// # Examples /// - /// Write unsigned 8 bit integers to a `AsyncWrite`: + /// Write signed 8 bit integers to a `AsyncWrite`: /// /// ```rust /// use tokio::io::{self, AsyncWriteExt}; @@ -434,10 +434,10 @@ cfg_io_util! { /// async fn main() -> io::Result<()> { /// let mut writer = Vec::new(); /// - /// writer.write_u8(2).await?; - /// writer.write_u8(5).await?; + /// writer.write_i8(-2).await?; + /// writer.write_i8(126).await?; /// - /// assert_eq!(writer, b"\x02\x05"); + /// assert_eq!(writer, b"\xFE\x7E"); /// Ok(()) /// } /// ``` diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs index 7df610b..60879c0 100644 --- a/src/io/util/buf_reader.rs +++ b/src/io/util/buf_reader.rs @@ -204,7 +204,6 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { self.as_mut() .get_pin_mut() .start_seek(SeekFrom::Current(offset))?; - self.as_mut().get_pin_mut().poll_complete(cx)? } else { // seek backwards by our remainder, and then by the offset self.as_mut() @@ -221,8 +220,8 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { self.as_mut() .get_pin_mut() .start_seek(SeekFrom::Current(n))?; - self.as_mut().get_pin_mut().poll_complete(cx)? } + self.as_mut().get_pin_mut().poll_complete(cx)? } SeekState::PendingOverflowed(n) => { if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs index d0ab7cb..47dad89 100644 --- a/src/io/util/copy.rs +++ b/src/io/util/copy.rs @@ -27,6 +27,51 @@ impl CopyBuffer { } } + fn poll_fill_buf<R>( + &mut self, + cx: &mut Context<'_>, + reader: Pin<&mut R>, + ) -> Poll<io::Result<()>> + where + R: AsyncRead + ?Sized, + { + let me = &mut *self; + let mut buf = ReadBuf::new(&mut me.buf); + buf.set_filled(me.cap); + + let res = reader.poll_read(cx, &mut buf); + if let Poll::Ready(Ok(_)) = res { + let filled_len = buf.filled().len(); + me.read_done = me.cap == filled_len; + me.cap = filled_len; + } + res + } + + fn poll_write_buf<R, W>( + &mut self, + cx: &mut Context<'_>, + mut reader: Pin<&mut R>, + mut writer: Pin<&mut W>, + ) -> Poll<io::Result<usize>> + where + R: AsyncRead + ?Sized, + W: AsyncWrite + ?Sized, + { + let me = &mut *self; + match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) { + Poll::Pending => { + // Top up the buffer towards full if we can read a bit more + // data - this should improve the chances of a large write + if !me.read_done && me.cap < me.buf.len() { + ready!(me.poll_fill_buf(cx, reader.as_mut()))?; + } + Poll::Pending + } + res => res, + } + } + pub(super) fn poll_copy<R, W>( &mut self, cx: &mut Context<'_>, @@ -41,10 +86,10 @@ impl CopyBuffer { // If our buffer is empty, then we need to read some data to // continue. if self.pos == self.cap && !self.read_done { - let me = &mut *self; - let mut buf = ReadBuf::new(&mut me.buf); + self.pos = 0; + self.cap = 0; - match reader.as_mut().poll_read(cx, &mut buf) { + match self.poll_fill_buf(cx, reader.as_mut()) { Poll::Ready(Ok(_)) => (), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => { @@ -58,20 +103,11 @@ impl CopyBuffer { return Poll::Pending; } } - - let n = buf.filled().len(); - if n == 0 { - self.read_done = true; - } else { - self.pos = 0; - self.cap = n; - } } // If our buffer has some data, let's write it out! while self.pos < self.cap { - let me = &mut *self; - let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?; + let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?; if i == 0 { return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, diff --git a/src/io/util/empty.rs b/src/io/util/empty.rs index f964d18..9e648f8 100644 --- a/src/io/util/empty.rs +++ b/src/io/util/empty.rs @@ -50,16 +50,18 @@ impl AsyncRead for Empty { #[inline] fn poll_read( self: Pin<&mut Self>, - _: &mut Context<'_>, + cx: &mut Context<'_>, _: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { + ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(())) } } impl AsyncBufRead for Empty { #[inline] - fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(&[])) } @@ -73,6 +75,20 @@ impl fmt::Debug for Empty { } } +cfg_coop! { + fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> { + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + coop.made_progress(); + Poll::Ready(()) + } +} + +cfg_not_coop! { + fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs index 3655c01..bb07c76 100644 --- a/src/io/util/fill_buf.rs +++ b/src/io/util/fill_buf.rs @@ -40,6 +40,12 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> { // Safety: This is necessary only due to a limitation in the // borrow checker. Once Rust starts using the polonius borrow // checker, this can be simplified. + // + // The safety of this transmute relies on the fact that the + // value of `reader` is `None` when we return in this branch. + // Otherwise the caller could poll us again after + // completion, and access the mutable reference while the + // returned immutable reference still exists. let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice); Poll::Ready(Ok(slice)) }, diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs index 4eefe7b..31884b3 100644 --- a/src/io/util/mem.rs +++ b/src/io/util/mem.rs @@ -177,10 +177,8 @@ impl Pipe { waker.wake(); } } -} -impl AsyncRead for Pipe { - fn poll_read( + fn poll_read_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, @@ -204,10 +202,8 @@ impl AsyncRead for Pipe { Poll::Pending } } -} -impl AsyncWrite for Pipe { - fn poll_write( + fn poll_write_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], @@ -228,6 +224,62 @@ impl AsyncWrite for Pipe { } Poll::Ready(Ok(len)) } +} + +impl AsyncRead for Pipe { + cfg_coop! { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + + let ret = self.poll_read_internal(cx, buf); + if ret.is_ready() { + coop.made_progress(); + } + ret + } + } + + cfg_not_coop! { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + self.poll_read_internal(cx, buf) + } + } +} + +impl AsyncWrite for Pipe { + cfg_coop! { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<std::io::Result<usize>> { + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + + let ret = self.poll_write_internal(cx, buf); + if ret.is_ready() { + coop.made_progress(); + } + ret + } + } + + cfg_not_coop! { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<std::io::Result<usize>> { + self.poll_write_internal(cx, buf) + } + } fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> { Poll::Ready(Ok(())) diff --git a/src/io/util/read_exact.rs b/src/io/util/read_exact.rs index 1e8150e..dbdd58b 100644 --- a/src/io/util/read_exact.rs +++ b/src/io/util/read_exact.rs @@ -51,13 +51,13 @@ where type Output = io::Result<usize>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> { - let mut me = self.project(); + let me = self.project(); loop { // if our buffer is empty, then we need to read some data to continue. let rem = me.buf.remaining(); if rem != 0 { - ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?; + ready!(Pin::new(&mut *me.reader).poll_read(cx, me.buf))?; if me.buf.remaining() == rem { return Err(eof()).into(); } diff --git a/src/io/util/take.rs b/src/io/util/take.rs index b5e90c9..df2f61b 100644 --- a/src/io/util/take.rs +++ b/src/io/util/take.rs @@ -86,7 +86,11 @@ impl<R: AsyncRead> AsyncRead for Take<R> { let me = self.project(); let mut b = buf.take(*me.limit_ as usize); + + let buf_ptr = b.filled().as_ptr(); ready!(me.inner.poll_read(cx, &mut b))?; + assert_eq!(b.filled().as_ptr(), buf_ptr); + let n = b.filled().len(); // We need to update the original ReadBuf diff --git a/src/io/util/vec_with_initialized.rs b/src/io/util/vec_with_initialized.rs index 208cc93..a9b94e3 100644 --- a/src/io/util/vec_with_initialized.rs +++ b/src/io/util/vec_with_initialized.rs @@ -1,19 +1,18 @@ use crate::io::ReadBuf; use std::mem::MaybeUninit; -mod private { - pub trait Sealed {} - - impl Sealed for Vec<u8> {} - impl Sealed for &mut Vec<u8> {} -} +/// Something that looks like a `Vec<u8>`. +/// +/// # Safety +/// +/// The implementor must guarantee that the vector returned by the +/// `as_mut` and `as_mut` methods do not change from one call to +/// another. +pub(crate) unsafe trait VecU8: AsRef<Vec<u8>> + AsMut<Vec<u8>> {} -/// A sealed trait that constrains the generic type parameter in `VecWithInitialized<V>`. That struct's safety relies -/// on certain invariants upheld by `Vec<u8>`. -pub(crate) trait VecU8: AsMut<Vec<u8>> + private::Sealed {} +unsafe impl VecU8 for Vec<u8> {} +unsafe impl VecU8 for &mut Vec<u8> {} -impl VecU8 for Vec<u8> {} -impl VecU8 for &mut Vec<u8> {} /// This struct wraps a `Vec<u8>` or `&mut Vec<u8>`, combining it with a /// `num_initialized`, which keeps track of the number of initialized bytes /// in the unused capacity. @@ -64,8 +63,8 @@ where } #[cfg(feature = "io-util")] - pub(crate) fn is_empty(&mut self) -> bool { - self.vec.as_mut().is_empty() + pub(crate) fn is_empty(&self) -> bool { + self.vec.as_ref().is_empty() } pub(crate) fn get_read_buf<'a>(&'a mut self) -> ReadBuf<'a> { diff --git a/src/io/util/write_all.rs b/src/io/util/write_all.rs index e59d41e..abd3e39 100644 --- a/src/io/util/write_all.rs +++ b/src/io/util/write_all.rs @@ -42,7 +42,7 @@ where while !me.buf.is_empty() { let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf))?; { - let (_, rest) = mem::replace(&mut *me.buf, &[]).split_at(n); + let (_, rest) = mem::take(&mut *me.buf).split_at(n); *me.buf = rest; } if n == 0 { |