aboutsummaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/io')
-rw-r--r--src/io/async_fd.rs4
-rw-r--r--src/io/blocking.rs4
-rw-r--r--src/io/bsd/poll_aio.rs8
-rw-r--r--src/io/driver/interest.rs12
-rw-r--r--src/io/driver/mod.rs18
-rw-r--r--src/io/driver/ready.rs12
-rw-r--r--src/io/driver/scheduled_io.rs20
-rw-r--r--src/io/mod.rs2
-rw-r--r--src/io/poll_evented.rs4
-rw-r--r--src/io/split.rs2
-rw-r--r--src/io/stdio_common.rs2
-rw-r--r--src/io/util/async_write_ext.rs8
-rw-r--r--src/io/util/buf_reader.rs2
-rw-r--r--src/io/util/copy.rs10
-rw-r--r--src/io/util/lines.rs6
15 files changed, 61 insertions, 53 deletions
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
index fa5bec5..9ec5b7f 100644
--- a/src/io/async_fd.rs
+++ b/src/io/async_fd.rs
@@ -205,13 +205,13 @@ impl<T: AsRawFd> AsyncFd<T> {
})
}
- /// Returns a shared reference to the backing object of this [`AsyncFd`]
+ /// Returns a shared reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_ref(&self) -> &T {
self.inner.as_ref().unwrap()
}
- /// Returns a mutable reference to the backing object of this [`AsyncFd`]
+ /// Returns a mutable reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.inner.as_mut().unwrap()
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index 94a3484..1d79ee7 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -16,7 +16,7 @@ use self::State::*;
pub(crate) struct Blocking<T> {
inner: Option<T>,
state: State<T>,
- /// `true` if the lower IO layer needs flushing
+ /// `true` if the lower IO layer needs flushing.
need_flush: bool,
}
@@ -175,7 +175,7 @@ where
}
}
-/// Repeats operations that are interrupted
+/// Repeats operations that are interrupted.
macro_rules! uninterruptibly {
($e:expr) => {{
loop {
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
index a765d76..f1ac4b2 100644
--- a/src/io/bsd/poll_aio.rs
+++ b/src/io/bsd/poll_aio.rs
@@ -1,4 +1,4 @@
-//! Use POSIX AIO futures with Tokio
+//! Use POSIX AIO futures with Tokio.
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
@@ -16,14 +16,14 @@ use std::task::{Context, Poll};
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`Aio`] object.
pub trait AioSource {
- /// Register this AIO event source with Tokio's reactor
+ /// Registers this AIO event source with Tokio's reactor.
fn register(&mut self, kq: RawFd, token: usize);
- /// Deregister this AIO event source with Tokio's reactor
+ /// Deregisters this AIO event source with Tokio's reactor.
fn deregister(&mut self);
}
-/// Wrap the user's AioSource in order to implement mio::event::Source, which
+/// Wraps the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);
diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs
index c5b18ed..d6b46df 100644
--- a/src/io/driver/interest.rs
+++ b/src/io/driver/interest.rs
@@ -5,7 +5,7 @@ use crate::io::driver::Ready;
use std::fmt;
use std::ops;
-/// Readiness event interest
+/// Readiness event interest.
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
@@ -17,19 +17,19 @@ impl Interest {
// The non-FreeBSD definitions in this block are active only when
// building documentation.
cfg_aio! {
- /// Interest for POSIX AIO
+ /// Interest for POSIX AIO.
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);
- /// Interest for POSIX AIO
+ /// Interest for POSIX AIO.
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);
- /// Interest for POSIX AIO lio_listio events
+ /// Interest for POSIX AIO lio_listio events.
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);
- /// Interest for POSIX AIO lio_listio events
+ /// Interest for POSIX AIO lio_listio events.
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
}
@@ -39,7 +39,7 @@ impl Interest {
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
- /// Interest in all writable events
+ /// Interest in all writable events.
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index 1511884..19f67a2 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -23,10 +23,10 @@ use std::io;
use std::sync::{Arc, Weak};
use std::time::Duration;
-/// I/O driver, backed by Mio
+/// I/O driver, backed by Mio.
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
- /// as it is mostly used to determine when to call `compact()`
+ /// as it is mostly used to determine when to call `compact()`.
tick: u8,
/// Reuse the `mio::Events` value across calls to poll.
@@ -35,17 +35,17 @@ pub(crate) struct Driver {
/// Primary slab handle containing the state for each resource registered
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
- /// Some)
+ /// Some).
resources: Option<Slab<ScheduledIo>>,
- /// The system event queue
+ /// The system event queue.
poll: mio::Poll,
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
}
-/// A reference to an I/O driver
+/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
@@ -66,13 +66,13 @@ pub(super) struct Inner {
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,
- /// Registers I/O resources
+ /// Registers I/O resources.
registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
- /// Used to wake up the reactor from a call to `turn`
+ /// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
}
@@ -253,7 +253,7 @@ impl fmt::Debug for Driver {
cfg_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
@@ -267,7 +267,7 @@ cfg_rt! {
cfg_not_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs
index 305dc91..2430d30 100644
--- a/src/io/driver/ready.rs
+++ b/src/io/driver/ready.rs
@@ -68,7 +68,7 @@ impl Ready {
ready
}
- /// Returns true if `Ready` is the empty set
+ /// Returns true if `Ready` is the empty set.
///
/// # Examples
///
@@ -82,7 +82,7 @@ impl Ready {
self == Ready::EMPTY
}
- /// Returns `true` if the value includes `readable`
+ /// Returns `true` if the value includes `readable`.
///
/// # Examples
///
@@ -98,7 +98,7 @@ impl Ready {
self.contains(Ready::READABLE) || self.is_read_closed()
}
- /// Returns `true` if the value includes writable `readiness`
+ /// Returns `true` if the value includes writable `readiness`.
///
/// # Examples
///
@@ -114,7 +114,7 @@ impl Ready {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
- /// Returns `true` if the value includes read-closed `readiness`
+ /// Returns `true` if the value includes read-closed `readiness`.
///
/// # Examples
///
@@ -129,7 +129,7 @@ impl Ready {
self.contains(Ready::READ_CLOSED)
}
- /// Returns `true` if the value includes write-closed `readiness`
+ /// Returns `true` if the value includes write-closed `readiness`.
///
/// # Examples
///
@@ -154,7 +154,7 @@ impl Ready {
(self & other) == other
}
- /// Create a `Ready` instance using the given `usize` representation.
+ /// Creates a `Ready` instance using the given `usize` representation.
///
/// The `usize` representation must have been obtained from a call to
/// `Readiness::as_usize`.
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index a265720..76f9343 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -36,16 +36,16 @@ cfg_io_readiness! {
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "net")]
- /// List of all current waiters
+ /// List of all current waiters.
list: WaitList,
- /// Waker used for AsyncRead
+ /// Waker used for AsyncRead.
reader: Option<Waker>,
- /// Waker used for AsyncWrite
+ /// Waker used for AsyncWrite.
writer: Option<Waker>,
- /// True if this ScheduledIo has been killed due to IO driver shutdown
+ /// True if this ScheduledIo has been killed due to IO driver shutdown.
is_shutdown: bool,
}
@@ -54,19 +54,19 @@ cfg_io_readiness! {
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
- /// The waker for this task
+ /// The waker for this task.
waker: Option<Waker>,
- /// The interest this waiter is waiting on
+ /// The interest this waiter is waiting on.
interest: Interest,
is_ready: bool,
- /// Should never be `!Unpin`
+ /// Should never be `!Unpin`.
_p: PhantomPinned,
}
- /// Future returned by `readiness()`
+ /// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
@@ -276,7 +276,7 @@ impl ScheduledIo {
}
}
- /// Poll version of checking readiness for a certain direction.
+ /// Polls for readiness events in a given direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
@@ -363,7 +363,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
- /// An async version of `poll_readiness` which uses a linked list of wakers
+ /// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index a5ee108..cfdda61 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -218,7 +218,7 @@ cfg_io_driver_impl! {
}
cfg_aio! {
- /// BSD-specific I/O types
+ /// BSD-specific I/O types.
pub mod bsd {
mod poll_aio;
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 9872574..44e68a2 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -113,7 +113,7 @@ impl<E: Source> PollEvented<E> {
})
}
- /// Returns a reference to the registration
+ /// Returns a reference to the registration.
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
@@ -123,7 +123,7 @@ impl<E: Source> PollEvented<E> {
&self.registration
}
- /// Deregister the inner io from the registration and returns a Result containing the inner io
+ /// Deregisters the inner io from the registration and returns a Result containing the inner io.
#[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
diff --git a/src/io/split.rs b/src/io/split.rs
index f35273f..8258a0f 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -90,7 +90,7 @@ impl<T> ReadHalf<T> {
}
impl<T> WriteHalf<T> {
- /// Check if this `WriteHalf` and some `ReadHalf` were split from the same
+ /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same
/// stream.
pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs
index 56c4520..7e4a198 100644
--- a/src/io/stdio_common.rs
+++ b/src/io/stdio_common.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
/// if buffer contents seems to be utf8. Otherwise it only trims buffer down to MAX_BUF.
/// That's why, wrapped writer will always receive well-formed utf-8 bytes.
/// # Other platforms
-/// passes data to `inner` as is
+/// Passes data to `inner` as is.
#[derive(Debug)]
pub(crate) struct SplitByUtf8BoundaryIfWindows<W> {
inner: W,
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index a1f77f8..93a3183 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -20,7 +20,7 @@ use std::io::IoSlice;
use bytes::Buf;
cfg_io_util! {
- /// Defines numeric writer
+ /// Defines numeric writer.
macro_rules! write_impl {
(
$(
@@ -256,7 +256,7 @@ cfg_io_util! {
write_buf(self, src)
}
- /// Attempts to write an entire buffer into this writer
+ /// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
///
@@ -353,9 +353,9 @@ cfg_io_util! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut buffer = File::create("foo.txt").await?;
+ /// let mut file = File::create("foo.txt").await?;
///
- /// buffer.write_all(b"some bytes").await?;
+ /// file.write_all(b"some bytes").await?;
/// Ok(())
/// }
/// ```
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 7cfd46c..7df610b 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -155,7 +155,7 @@ pub(super) enum SeekState {
Pending,
}
-/// Seek to an offset, in bytes, in the underlying reader.
+/// Seeks to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with `SeekFrom::Current(_)` is the
/// position the underlying reader would be at if the `BufReader` had no
diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs
index fbd77b5..d0ab7cb 100644
--- a/src/io/util/copy.rs
+++ b/src/io/util/copy.rs
@@ -23,7 +23,7 @@ impl CopyBuffer {
pos: 0,
cap: 0,
amt: 0,
- buf: vec![0; 2048].into_boxed_slice(),
+ buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(),
}
}
@@ -84,6 +84,14 @@ impl CopyBuffer {
}
}
+ // If pos larger than cap, this loop will never stop.
+ // In particular, user's wrong poll_write implementation returning
+ // incorrect written length may lead to thread blocking.
+ debug_assert!(
+ self.pos <= self.cap,
+ "writer returned length larger than input slice"
+ );
+
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index 3fbf5e3..717f633 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -8,7 +8,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
- /// Read lines from an [`AsyncBufRead`].
+ /// Reads lines from an [`AsyncBufRead`].
///
/// A `Lines` can be turned into a `Stream` with [`LinesStream`].
///
@@ -72,12 +72,12 @@ where
poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
}
- /// Obtain a mutable reference to the underlying reader
+ /// Obtains a mutable reference to the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
- /// Obtain a reference to the underlying reader
+ /// Obtains a reference to the underlying reader.
pub fn get_ref(&mut self) -> &R {
&self.reader
}