aboutsummaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/io')
-rw-r--r--src/io/allow_std.rs11
-rw-r--r--src/io/buf_reader.rs8
-rw-r--r--src/io/buf_writer.rs65
-rw-r--r--src/io/chain.rs12
-rw-r--r--src/io/empty.rs8
-rw-r--r--src/io/line_writer.rs155
-rw-r--r--src/io/mod.rs15
-rw-r--r--src/io/repeat.rs8
-rw-r--r--src/io/take.rs7
9 files changed, 223 insertions, 66 deletions
diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs
index 1d13e0c..ec30ee3 100644
--- a/src/io/allow_std.rs
+++ b/src/io/allow_std.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use std::pin::Pin;
use std::{fmt, io};
@@ -121,10 +119,6 @@ where
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.0.initializer()
- }
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
@@ -155,11 +149,6 @@ where
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.0.initializer()
- }
}
impl<T> io::Seek for AllowStdIo<T>
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 2d585a9..0334a9f 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -2,8 +2,6 @@ use super::DEFAULT_BUF_SIZE;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
use pin_project_lite::pin_project;
use std::io::{self, Read};
@@ -144,12 +142,6 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
self.consume(nread);
Poll::Ready(Ok(nread))
}
-
- // we can't skip unconditionally because of the large buffer case in read.
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.inner.initializer()
- }
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index f292b87..cb74863 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -6,6 +6,7 @@ use pin_project_lite::pin_project;
use std::fmt;
use std::io::{self, Write};
use std::pin::Pin;
+use std::ptr;
pin_project! {
/// Wraps a writer and buffers its output.
@@ -49,7 +50,7 @@ impl<W: AsyncWrite> BufWriter<W> {
Self { inner, buf: Vec::with_capacity(cap), written: 0 }
}
- fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
let len = this.buf.len();
@@ -83,6 +84,68 @@ impl<W: AsyncWrite> BufWriter<W> {
pub fn buffer(&self) -> &[u8] {
&self.buf
}
+
+ /// Capacity of `buf`. how many chars can be held in buffer
+ pub(super) fn capacity(&self) -> usize {
+ self.buf.capacity()
+ }
+
+ /// Remaining number of bytes to reach `buf` 's capacity
+ #[inline]
+ pub(super) fn spare_capacity(&self) -> usize {
+ self.buf.capacity() - self.buf.len()
+ }
+
+ /// Write a byte slice directly into buffer
+ ///
+ /// Will truncate the number of bytes written to `spare_capacity()` so you want to
+ /// calculate the size of your slice to avoid losing bytes
+ ///
+ /// Based on `std::io::BufWriter`
+ pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize {
+ let available = self.spare_capacity();
+ let amt_to_buffer = available.min(buf.len());
+
+ // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
+ unsafe {
+ self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
+ }
+
+ amt_to_buffer
+ }
+
+ /// Write byte slice directly into `self.buf`
+ ///
+ /// Based on `std::io::BufWriter`
+ #[inline]
+ unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) {
+ debug_assert!(buf.len() <= self.spare_capacity());
+ let this = self.project();
+ let old_len = this.buf.len();
+ let buf_len = buf.len();
+ let src = buf.as_ptr();
+ let dst = this.buf.as_mut_ptr().add(old_len);
+ ptr::copy_nonoverlapping(src, dst, buf_len);
+ this.buf.set_len(old_len + buf_len);
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write(cx, buf)
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
}
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
diff --git a/src/io/chain.rs b/src/io/chain.rs
index a35c50d..728a3d2 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
use pin_project_lite::pin_project;
use std::fmt;
@@ -111,16 +109,6 @@ where
}
this.second.poll_read_vectored(cx, bufs)
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- let initializer = self.first.initializer();
- if initializer.should_initialize() {
- initializer
- } else {
- self.second.initializer()
- }
- }
}
impl<T, U> AsyncBufRead for Chain<T, U>
diff --git a/src/io/empty.rs b/src/io/empty.rs
index ab2395a..02f6103 100644
--- a/src/io/empty.rs
+++ b/src/io/empty.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use std::fmt;
use std::io;
@@ -43,12 +41,6 @@ impl AsyncRead for Empty {
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}
-
- #[cfg(feature = "read-initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- Initializer::nop()
- }
}
impl AsyncBufRead for Empty {
diff --git a/src/io/line_writer.rs b/src/io/line_writer.rs
new file mode 100644
index 0000000..71cd668
--- /dev/null
+++ b/src/io/line_writer.rs
@@ -0,0 +1,155 @@
+use super::buf_writer::BufWriter;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_io::IoSlice;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+pin_project! {
+/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
+///
+/// This was written based on `std::io::LineWriter` which goes into further details
+/// explaining the code.
+///
+/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
+/// to write on-each-line.
+#[derive(Debug)]
+pub struct LineWriter<W: AsyncWrite> {
+ #[pin]
+ buf_writer: BufWriter<W>,
+}
+}
+
+impl<W: AsyncWrite> LineWriter<W> {
+ /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
+ /// which was taken from `std::io::LineWriter`
+ pub fn new(inner: W) -> LineWriter<W> {
+ LineWriter::with_capacity(1024, inner)
+ }
+
+ /// Creates a new `LineWriter` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
+ LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
+ }
+
+ /// Flush `buf_writer` if last char is "new line"
+ fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = self.project();
+ match this.buf_writer.buffer().last().copied() {
+ Some(b'\n') => this.buf_writer.flush_buf(cx),
+ _ => Poll::Ready(Ok(())),
+ }
+ }
+
+ /// Returns a reference to `buf_writer`'s internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ self.buf_writer.buffer()
+ }
+
+ /// Acquires a reference to the underlying sink or stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &W {
+ self.buf_writer.get_ref()
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ let newline_index = match memchr::memrchr(b'\n', buf) {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write(cx, buf);
+ }
+ Some(newline_index) => newline_index + 1,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let lines = &buf[..newline_index];
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? };
+
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let tail = if flushed >= newline_index {
+ &buf[flushed..]
+ } else if newline_index - flushed <= this.buf_writer.capacity() {
+ &buf[flushed..newline_index]
+ } else {
+ let scan_area = &buf[flushed..];
+ let scan_area = &scan_area[..this.buf_writer.capacity()];
+ match memchr::memrchr(b'\n', scan_area) {
+ Some(newline_index) => &scan_area[..newline_index + 1],
+ None => scan_area,
+ }
+ };
+
+ let buffered = this.buf_writer.as_mut().write_to_buf(tail);
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ // `is_write_vectored()` is handled in original code, but not in this crate
+ // see https://github.com/rust-lang/rust/issues/70436
+
+ let last_newline_buf_idx = bufs
+ .iter()
+ .enumerate()
+ .rev()
+ .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
+ let last_newline_buf_idx = match last_newline_buf_idx {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write_vectored(cx, bufs);
+ }
+ Some(i) => i,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? };
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let lines_len = lines.iter().map(|buf| buf.len()).sum();
+ if flushed < lines_len {
+ return Poll::Ready(Ok(flushed));
+ }
+
+ let buffered: usize = tail
+ .iter()
+ .filter(|buf| !buf.is_empty())
+ .map(|buf| this.buf_writer.as_mut().write_to_buf(buf))
+ .take_while(|&n| n > 0)
+ .sum();
+
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_flush(cx)
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_close()`
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_close(cx)
+ }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 16cf5a7..4dd2e02 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -26,10 +26,6 @@ use std::{pin::Pin, ptr};
// Re-export some types from `std::io` so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
#[doc(no_inline)]
-#[cfg(feature = "read-initializer")]
-#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
-pub use std::io::Initializer;
-#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
@@ -40,15 +36,9 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024;
/// Initializes a buffer if necessary.
///
-/// A buffer is always initialized if `read-initializer` feature is disabled.
+/// A buffer is currently always initialized.
#[inline]
unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
- #[cfg(feature = "read-initializer")]
- {
- if !_reader.initializer().should_initialize() {
- return;
- }
- }
ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
}
@@ -61,6 +51,9 @@ pub use self::buf_reader::{BufReader, SeeKRelative};
mod buf_writer;
pub use self::buf_writer::BufWriter;
+mod line_writer;
+pub use self::line_writer::LineWriter;
+
mod chain;
pub use self::chain::Chain;
diff --git a/src/io/repeat.rs b/src/io/repeat.rs
index 4cefcb2..2828bf0 100644
--- a/src/io/repeat.rs
+++ b/src/io/repeat.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncRead, IoSliceMut};
use std::fmt;
use std::io;
@@ -59,12 +57,6 @@ impl AsyncRead for Repeat {
}
Poll::Ready(Ok(nwritten))
}
-
- #[cfg(feature = "read-initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- Initializer::nop()
- }
}
impl fmt::Debug for Repeat {
diff --git a/src/io/take.rs b/src/io/take.rs
index 0583020..2c49480 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use pin_project_lite::pin_project;
use std::pin::Pin;
@@ -100,11 +98,6 @@ impl<R: AsyncRead> AsyncRead for Take<R> {
*this.limit -= n as u64;
Poll::Ready(Ok(n))
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.inner.initializer()
- }
}
impl<R: AsyncBufRead> AsyncBufRead for Take<R> {