diff options
Diffstat (limited to 'src/io')
-rw-r--r-- | src/io/allow_std.rs | 11 | ||||
-rw-r--r-- | src/io/buf_reader.rs | 8 | ||||
-rw-r--r-- | src/io/buf_writer.rs | 65 | ||||
-rw-r--r-- | src/io/chain.rs | 12 | ||||
-rw-r--r-- | src/io/empty.rs | 8 | ||||
-rw-r--r-- | src/io/line_writer.rs | 155 | ||||
-rw-r--r-- | src/io/mod.rs | 15 | ||||
-rw-r--r-- | src/io/repeat.rs | 8 | ||||
-rw-r--r-- | src/io/take.rs | 7 |
9 files changed, 223 insertions, 66 deletions
diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs index 1d13e0c..ec30ee3 100644 --- a/src/io/allow_std.rs +++ b/src/io/allow_std.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::pin::Pin; use std::{fmt, io}; @@ -121,10 +119,6 @@ where fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { self.0.read_vectored(bufs) } - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { self.0.read_to_end(buf) } @@ -155,11 +149,6 @@ where ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } } impl<T> io::Seek for AllowStdIo<T> diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 2d585a9..0334a9f 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -2,8 +2,6 @@ use super::DEFAULT_BUF_SIZE; use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; use pin_project_lite::pin_project; use std::io::{self, Read}; @@ -144,12 +142,6 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> { self.consume(nread); Poll::Ready(Ok(nread)) } - - // we can't skip unconditionally because of the large buffer case in read. - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl<R: AsyncRead> AsyncBufRead for BufReader<R> { diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index f292b87..cb74863 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -6,6 +6,7 @@ use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; +use std::ptr; pin_project! { /// Wraps a writer and buffers its output. @@ -49,7 +50,7 @@ impl<W: AsyncWrite> BufWriter<W> { Self { inner, buf: Vec::with_capacity(cap), written: 0 } } - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { let mut this = self.project(); let len = this.buf.len(); @@ -83,6 +84,68 @@ impl<W: AsyncWrite> BufWriter<W> { pub fn buffer(&self) -> &[u8] { &self.buf } + + /// Capacity of `buf`. how many chars can be held in buffer + pub(super) fn capacity(&self) -> usize { + self.buf.capacity() + } + + /// Remaining number of bytes to reach `buf` 's capacity + #[inline] + pub(super) fn spare_capacity(&self) -> usize { + self.buf.capacity() - self.buf.len() + } + + /// Write a byte slice directly into buffer + /// + /// Will truncate the number of bytes written to `spare_capacity()` so you want to + /// calculate the size of your slice to avoid losing bytes + /// + /// Based on `std::io::BufWriter` + pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize { + let available = self.spare_capacity(); + let amt_to_buffer = available.min(buf.len()); + + // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction. + unsafe { + self.write_to_buffer_unchecked(&buf[..amt_to_buffer]); + } + + amt_to_buffer + } + + /// Write byte slice directly into `self.buf` + /// + /// Based on `std::io::BufWriter` + #[inline] + unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) { + debug_assert!(buf.len() <= self.spare_capacity()); + let this = self.project(); + let old_len = this.buf.len(); + let buf_len = buf.len(); + let src = buf.as_ptr(); + let dst = this.buf.as_mut_ptr().add(old_len); + ptr::copy_nonoverlapping(src, dst, buf_len); + this.buf.set_len(old_len + buf_len); + } + + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_write(cx, buf) + } + + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_write_vectored(cx, bufs) + } } impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { diff --git a/src/io/chain.rs b/src/io/chain.rs index a35c50d..728a3d2 100644 --- a/src/io/chain.rs +++ b/src/io/chain.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; use pin_project_lite::pin_project; use std::fmt; @@ -111,16 +109,6 @@ where } this.second.poll_read_vectored(cx, bufs) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - let initializer = self.first.initializer(); - if initializer.should_initialize() { - initializer - } else { - self.second.initializer() - } - } } impl<T, U> AsyncBufRead for Chain<T, U> diff --git a/src/io/empty.rs b/src/io/empty.rs index ab2395a..02f6103 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use std::fmt; use std::io; @@ -43,12 +41,6 @@ impl AsyncRead for Empty { ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(0)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl AsyncBufRead for Empty { diff --git a/src/io/line_writer.rs b/src/io/line_writer.rs new file mode 100644 index 0000000..71cd668 --- /dev/null +++ b/src/io/line_writer.rs @@ -0,0 +1,155 @@ +use super::buf_writer::BufWriter; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use futures_io::IoSlice; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; + +pin_project! { +/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines +/// +/// This was written based on `std::io::LineWriter` which goes into further details +/// explaining the code. +/// +/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter` +/// to write on-each-line. +#[derive(Debug)] +pub struct LineWriter<W: AsyncWrite> { + #[pin] + buf_writer: BufWriter<W>, +} +} + +impl<W: AsyncWrite> LineWriter<W> { + /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB + /// which was taken from `std::io::LineWriter` + pub fn new(inner: W) -> LineWriter<W> { + LineWriter::with_capacity(1024, inner) + } + + /// Creates a new `LineWriter` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> { + LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) } + } + + /// Flush `buf_writer` if last char is "new line" + fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let this = self.project(); + match this.buf_writer.buffer().last().copied() { + Some(b'\n') => this.buf_writer.flush_buf(cx), + _ => Poll::Ready(Ok(())), + } + } + + /// Returns a reference to `buf_writer`'s internally buffered data. + pub fn buffer(&self) -> &[u8] { + self.buf_writer.buffer() + } + + /// Acquires a reference to the underlying sink or stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &W { + self.buf_writer.get_ref() + } +} + +impl<W: AsyncWrite> AsyncWrite for LineWriter<W> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + let mut this = self.as_mut().project(); + let newline_index = match memchr::memrchr(b'\n', buf) { + None => { + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buf_writer.poll_write(cx, buf); + } + Some(newline_index) => newline_index + 1, + }; + + ready!(this.buf_writer.as_mut().poll_flush(cx)?); + + let lines = &buf[..newline_index]; + + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? }; + + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let tail = if flushed >= newline_index { + &buf[flushed..] + } else if newline_index - flushed <= this.buf_writer.capacity() { + &buf[flushed..newline_index] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..this.buf_writer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_index) => &scan_area[..newline_index + 1], + None => scan_area, + } + }; + + let buffered = this.buf_writer.as_mut().write_to_buf(tail); + Poll::Ready(Ok(flushed + buffered)) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + let mut this = self.as_mut().project(); + // `is_write_vectored()` is handled in original code, but not in this crate + // see https://github.com/rust-lang/rust/issues/70436 + + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + let last_newline_buf_idx = match last_newline_buf_idx { + None => { + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buf_writer.poll_write_vectored(cx, bufs); + } + Some(i) => i, + }; + + ready!(this.buf_writer.as_mut().poll_flush(cx)?); + + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? }; + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Poll::Ready(Ok(flushed)); + } + + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| this.buf_writer.as_mut().write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); + + Poll::Ready(Ok(flushed + buffered)) + } + + /// Forward to `buf_writer` 's `BufWriter::poll_flush()` + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.as_mut().project().buf_writer.poll_flush(cx) + } + + /// Forward to `buf_writer` 's `BufWriter::poll_close()` + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.as_mut().project().buf_writer.poll_close(cx) + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 16cf5a7..4dd2e02 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -26,10 +26,6 @@ use std::{pin::Pin, ptr}; // Re-export some types from `std::io` so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. #[doc(no_inline)] -#[cfg(feature = "read-initializer")] -#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] -pub use std::io::Initializer; -#[doc(no_inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; @@ -40,15 +36,9 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024; /// Initializes a buffer if necessary. /// -/// A buffer is always initialized if `read-initializer` feature is disabled. +/// A buffer is currently always initialized. #[inline] unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) { - #[cfg(feature = "read-initializer")] - { - if !_reader.initializer().should_initialize() { - return; - } - } ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } @@ -61,6 +51,9 @@ pub use self::buf_reader::{BufReader, SeeKRelative}; mod buf_writer; pub use self::buf_writer::BufWriter; +mod line_writer; +pub use self::line_writer::LineWriter; + mod chain; pub use self::chain::Chain; diff --git a/src/io/repeat.rs b/src/io/repeat.rs index 4cefcb2..2828bf0 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncRead, IoSliceMut}; use std::fmt; use std::io; @@ -59,12 +57,6 @@ impl AsyncRead for Repeat { } Poll::Ready(Ok(nwritten)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl fmt::Debug for Repeat { diff --git a/src/io/take.rs b/src/io/take.rs index 0583020..2c49480 100644 --- a/src/io/take.rs +++ b/src/io/take.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use pin_project_lite::pin_project; use std::pin::Pin; @@ -100,11 +98,6 @@ impl<R: AsyncRead> AsyncRead for Take<R> { *this.limit -= n as u64; Poll::Ready(Ok(n)) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl<R: AsyncBufRead> AsyncBufRead for Take<R> { |