aboutsummaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
authorLuke Huang <huangluke@google.com>2021-05-26 23:24:32 +0800
committerLuke Huang <huangluke@google.com>2021-05-27 16:32:57 +0800
commitb81c80b31414735c1c2056834bf3d5fb678668e9 (patch)
treed4202b122faca63ebf59cafaf87ae56c64c63898 /src/io
parentf1a1047f595f273f50d8bc39e3b9eea74a0b73b4 (diff)
downloadtokio-b81c80b31414735c1c2056834bf3d5fb678668e9.tar.gz
Upgrade rust/crates/tokio to 1.6.0 and use cargo2android.json to generate bp file
1. Only generate libtokio by cargo2android.json 2. Put all the test targets to patch, which might let future upgrade easier. 3. Add some tests removed by previous upgrade back. 4. Disable some tests that doesn't work for Android. Test: atest Bug: 189140417 Change-Id: I141d548e667cbf33966e868a6eedbe4b50ab56ed
Diffstat (limited to 'src/io')
-rw-r--r--src/io/driver/mod.rs2
-rw-r--r--src/io/driver/registration.rs7
-rw-r--r--src/io/poll_evented.rs26
-rw-r--r--src/io/util/async_write_ext.rs58
-rw-r--r--src/io/util/buf_reader.rs124
-rw-r--r--src/io/util/buf_stream.rs4
-rw-r--r--src/io/util/buf_writer.rs62
-rw-r--r--src/io/util/copy_bidirectional.rs1
-rw-r--r--src/io/util/lines.rs2
-rw-r--r--src/io/util/mem.rs26
-rw-r--r--src/io/util/mod.rs1
-rw-r--r--src/io/util/read_line.rs4
-rw-r--r--src/io/util/read_to_string.rs2
-rw-r--r--src/io/util/split.rs2
-rw-r--r--src/io/util/write_all_buf.rs56
15 files changed, 342 insertions, 35 deletions
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index fa2d420..52451c6 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -273,7 +273,7 @@ cfg_not_rt! {
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
pub(super) fn current() -> Self {
- panic!(crate::util::error::CONTEXT_MISSING_ERROR)
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs
index 8251fe6..7350be6 100644
--- a/src/io/driver/registration.rs
+++ b/src/io/driver/registration.rs
@@ -14,8 +14,9 @@ cfg_io_driver! {
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
- /// The association between an I/O resource is made by calling [`new`]. Once
- /// the association is established, it remains established until the
+ /// The association between an I/O resource is made by calling
+ /// [`new_with_interest_and_handle`].
+ /// Once the association is established, it remains established until the
/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One
@@ -36,7 +37,7 @@ cfg_io_driver! {
/// stream. The write readiness event stream is only for `Ready::writable()`
/// events.
///
- /// [`new`]: method@Self::new
+ /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 47ae558..a31e6db 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -10,10 +10,10 @@ cfg_io_driver! {
/// [`std::io::Write`] traits with the reactor that drives it.
///
/// `PollEvented` uses [`Registration`] internally to take a type that
- /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
+ /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or
/// [`std::io::Write`] and associate it with a reactor that will drive it.
///
- /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
+ /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
/// used from within the future's execution model. As such, the
/// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
/// implementations using the underlying I/O resource as well as readiness
@@ -40,13 +40,13 @@ cfg_io_driver! {
/// [`poll_read_ready`] again will also indicate read readiness.
///
/// When the operation is attempted and is unable to succeed due to the I/O
- /// resource not being ready, the caller must call [`clear_read_ready`] or
- /// [`clear_write_ready`]. This clears the readiness state until a new
+ /// resource not being ready, the caller must call `clear_read_ready` or
+ /// `clear_write_ready`. This clears the readiness state until a new
/// readiness event is received.
///
/// This allows the caller to implement additional functions. For example,
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
- /// [`clear_read_ready`].
+ /// `clear_read_ready`.
///
/// ## Platform-specific events
///
@@ -54,17 +54,11 @@ cfg_io_driver! {
/// These events are included as part of the read readiness event stream. The
/// write readiness event stream is only for `Ready::writable()` events.
///
- /// [`std::io::Read`]: trait@std::io::Read
- /// [`std::io::Write`]: trait@std::io::Write
- /// [`AsyncRead`]: trait@AsyncRead
- /// [`AsyncWrite`]: trait@AsyncWrite
- /// [`mio::Evented`]: trait@mio::Evented
- /// [`Registration`]: struct@Registration
- /// [`TcpListener`]: struct@crate::net::TcpListener
- /// [`clear_read_ready`]: method@Self::clear_read_ready
- /// [`clear_write_ready`]: method@Self::clear_write_ready
- /// [`poll_read_ready`]: method@Self::poll_read_ready
- /// [`poll_write_ready`]: method@Self::poll_write_ready
+ /// [`AsyncRead`]: crate::io::AsyncRead
+ /// [`AsyncWrite`]: crate::io::AsyncWrite
+ /// [`TcpListener`]: crate::net::TcpListener
+ /// [`poll_read_ready`]: Registration::poll_read_ready
+ /// [`poll_write_ready`]: Registration::poll_write_ready
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index d011d82..2510ccd 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
+use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf};
use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
@@ -159,7 +160,6 @@ cfg_io_util! {
write_vectored(self, bufs)
}
-
/// Writes a buffer into this writer, advancing the buffer's internal
/// cursor.
///
@@ -197,10 +197,11 @@ cfg_io_util! {
///
/// # Examples
///
- /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
@@ -233,6 +234,59 @@ cfg_io_util! {
write_buf(self, src)
}
+ /// Attempts to write an entire buffer into this writer
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> {
+ /// while buf.has_remaining() {
+ /// self.write_buf(&mut buf).await?;
+ /// }
+ /// }
+ /// ```
+ ///
+ /// This method will continuously call [`write`] until
+ /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not
+ /// return until the entire buffer has been successfully written or an error occurs. The
+ /// first error generated will be returned.
+ ///
+ /// The buffer is advanced after each chunk is successfully written. After failure,
+ /// `src.chunk()` will return the chunk that failed to write.
+ ///
+ /// # Examples
+ ///
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
+ ///
+ /// [`File`]: crate::fs::File
+ /// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
+ ///
+ /// ```no_run
+ /// use tokio::io::{self, AsyncWriteExt};
+ /// use tokio::fs::File;
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// let mut buffer = Cursor::new(b"data to write");
+ ///
+ /// file.write_all_buf(&mut buffer).await?;
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// [`write`]: AsyncWriteExt::write
+ fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B>
+ where
+ Self: Sized + Unpin,
+ B: Buf,
+ {
+ write_all_buf(self, src)
+ }
+
/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 271f61b..cc65ef2 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -1,11 +1,11 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
-use std::io;
+use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::{cmp, fmt};
+use std::{cmp, fmt, mem};
pin_project! {
/// The `BufReader` struct adds buffering to any reader.
@@ -30,6 +30,7 @@ pin_project! {
pub(super) buf: Box<[u8]>,
pub(super) pos: usize,
pub(super) cap: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> {
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
+ seek_state: SeekState::Init,
}
}
@@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch.
+ PendingOverflowed(i64),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seek to an offset, in bytes, in the underlying reader.
+///
+/// The position used for seeking with `SeekFrom::Current(_)` is the
+/// position the underlying reader would be at if the `BufReader` had no
+/// internal buffer.
+///
+/// Seeking always discards the internal buffer, even if the seek position
+/// would otherwise fall within it. This guarantees that calling
+/// `.into_inner()` immediately after a seek yields the underlying reader
+/// at the same position.
+///
+/// See [`AsyncSeek`] for more details.
+///
+/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+/// where `n` minus the internal buffer length overflows an `i64`, two
+/// seeks will be performed instead of one. If the second seek returns
+/// `Err`, the underlying reader will be left at the same position it would
+/// have if you called `seek` with `SeekFrom::Current(0)`.
+impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We needs to call seek operation multiple times.
+ // And we should always call both start_seek and poll_complete,
+ // as start_seek alone cannot guarantee that the operation will be completed.
+ // poll_complete receives a Context and returns a Poll, so it cannot be called
+ // inside start_seek.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) {
+ SeekState::Init => {
+ // 1.x AsyncSeek recommends calling poll_complete before start_seek.
+ // We don't have to guarantee that the value returned by
+ // poll_complete called without start_seek is correct,
+ // so we'll return 0.
+ return Poll::Ready(Ok(0));
+ }
+ SeekState::Start(SeekFrom::Current(n)) => {
+ let remainder = (self.cap - self.pos) as i64;
+ // it should be safe to assume that remainder fits within an i64 as the alternative
+ // means we managed to allocate 8 exbibytes and that's absurd.
+ // But it's not out of the realm of possibility for some weird underlying reader to
+ // support seeking by i64::min_value() so we need to handle underflow when subtracting
+ // remainder.
+ if let Some(offset) = n.checked_sub(remainder) {
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(offset))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ } else {
+ // seek backwards by our remainder, and then by the offset
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(-remainder))?;
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ }
+ SeekState::PendingOverflowed(n) => {
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Start(pos) => {
+ // Seeking with Start/End doesn't care about our buffer length.
+ self.as_mut().get_pin_mut().start_seek(pos)?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?,
+ };
+
+ match res {
+ Poll::Ready(res) => {
+ self.discard_buffer();
+ Poll::Ready(Ok(res))
+ }
+ Poll::Pending => {
+ *self.as_mut().project().seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs
index cc857e2..9238665 100644
--- a/src/io/util/buf_stream.rs
+++ b/src/io/util/buf_stream.rs
@@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
buf: wbuf,
written,
+ seek_state: wseek_state,
} = b;
BufStream {
@@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
inner,
buf: wbuf,
written,
+ seek_state: wseek_state,
},
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
}
}
diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs
index 5e3d4b7..4e8e493 100644
--- a/src/io/util/buf_writer.rs
+++ b/src/io/util/buf_writer.rs
@@ -1,9 +1,9 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
use std::fmt;
-use std::io::{self, Write};
+use std::io::{self, SeekFrom, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -34,6 +34,7 @@ pin_project! {
pub(super) inner: W,
pub(super) buf: Vec<u8>,
pub(super) written: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> {
inner,
buf: Vec::with_capacity(cap),
written: 0,
+ seek_state: SeekState::Init,
}
}
@@ -142,6 +144,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seek to the offset, in bytes, in the underlying writer.
+///
+/// Seeking always writes out the internal buffer before seeking.
+impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We need to flush the internal buffer before seeking.
+ // It receives a `Context` and returns a `Poll`, so it cannot be called
+ // inside `start_seek`.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let pos = match self.seek_state {
+ SeekState::Init => {
+ return self.project().inner.poll_complete(cx);
+ }
+ SeekState::Start(pos) => Some(pos),
+ SeekState::Pending => None,
+ };
+
+ // Flush the internal buffer before seeking.
+ ready!(self.as_mut().flush_buf(cx))?;
+
+ let mut me = self.project();
+ if let Some(pos) = pos {
+ // Ensure previous seeks have finished before starting a new one
+ ready!(me.inner.as_mut().poll_complete(cx))?;
+ if let Err(e) = me.inner.as_mut().start_seek(pos) {
+ *me.seek_state = SeekState::Init;
+ return Poll::Ready(Err(e));
+ }
+ }
+ match me.inner.poll_complete(cx) {
+ Poll::Ready(res) => {
+ *me.seek_state = SeekState::Init;
+ Poll::Ready(res)
+ }
+ Poll::Pending => {
+ *me.seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs
index cc43f0f..c93060b 100644
--- a/src/io/util/copy_bidirectional.rs
+++ b/src/io/util/copy_bidirectional.rs
@@ -104,6 +104,7 @@ where
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index ed6a944..d02a453 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -128,7 +128,7 @@ where
}
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, String::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs
index e91a932..4eefe7b 100644
--- a/src/io/util/mem.rs
+++ b/src/io/util/mem.rs
@@ -16,6 +16,14 @@ use std::{
/// that can be used as in-memory IO types. Writing to one of the pairs will
/// allow that data to be read from the other, and vice versa.
///
+/// # Closing a `DuplexStream`
+///
+/// If one end of the `DuplexStream` channel is dropped, any pending reads on
+/// the other side will continue to read data until the buffer is drained, then
+/// they will signal EOF by returning 0 bytes. Any writes to the other side,
+/// including pending ones (that are waiting for free space in the buffer) will
+/// return `Err(BrokenPipe)` immediately.
+///
/// # Example
///
/// ```
@@ -37,6 +45,7 @@ use std::{
/// # }
/// ```
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
@@ -72,6 +81,7 @@ struct Pipe {
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a side before the write returns `Poll::Pending`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
@@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream {
impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
- self.write.lock().close();
+ self.write.lock().close_write();
+ self.read.lock().close_read();
}
}
@@ -151,12 +162,21 @@ impl Pipe {
}
}
- fn close(&mut self) {
+ fn close_write(&mut self) {
self.is_closed = true;
+ // needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
}
+
+ fn close_read(&mut self) {
+ self.is_closed = true;
+ // needs to notify any writers that they have to abort
+ if let Some(waker) = self.write_waker.take() {
+ waker.wake();
+ }
+ }
}
impl AsyncRead for Pipe {
@@ -217,7 +237,7 @@ impl AsyncWrite for Pipe {
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
- self.close();
+ self.close_write();
Poll::Ready(Ok(()))
}
}
diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs
index ab38664..fd3dd0d 100644
--- a/src/io/util/mod.rs
+++ b/src/io/util/mod.rs
@@ -77,6 +77,7 @@ cfg_io_util! {
mod write_vectored;
mod write_all;
mod write_buf;
+ mod write_all_buf;
mod write_int;
diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs
index d38ffaf..e641f51 100644
--- a/src/io/util/read_line.rs
+++ b/src/io/util/read_line.rs
@@ -36,7 +36,7 @@ where
{
ReadLine {
reader,
- buf: mem::replace(string, String::new()).into_bytes(),
+ buf: mem::take(string).into_bytes(),
output: string,
read: 0,
_pin: PhantomPinned,
@@ -99,7 +99,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
read: &mut usize,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
- let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+ let utf8_res = String::from_utf8(mem::take(buf));
// At this point both buf and output are empty. The allocation is in utf8_res.
diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs
index 2c17383..b3d82a2 100644
--- a/src/io/util/read_to_string.rs
+++ b/src/io/util/read_to_string.rs
@@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>(
where
R: AsyncRead + ?Sized + Unpin,
{
- let buf = mem::replace(string, String::new()).into_bytes();
+ let buf = mem::take(string).into_bytes();
ReadToString {
reader,
buf: VecWithInitialized::new(buf),
diff --git a/src/io/util/split.rs b/src/io/util/split.rs
index 4f3ce4e..9c2bb05 100644
--- a/src/io/util/split.rs
+++ b/src/io/util/split.rs
@@ -106,7 +106,7 @@ where
me.buf.pop();
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs
new file mode 100644
index 0000000..05af7fe
--- /dev/null
+++ b/src/io/util/write_all_buf.rs
@@ -0,0 +1,56 @@
+use crate::io::AsyncWrite;
+
+use bytes::Buf;
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A future to write some of the buffer to an `AsyncWrite`.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct WriteAllBuf<'a, W, B> {
+ writer: &'a mut W,
+ buf: &'a mut B,
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+/// Tries to write some bytes from the given `buf` to the writer in an
+/// asynchronous manner, returning a future.
+pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ WriteAllBuf {
+ writer,
+ buf,
+ _pin: PhantomPinned,
+ }
+}
+
+impl<W, B> Future for WriteAllBuf<'_, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ type Output = io::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let me = self.project();
+ while me.buf.has_remaining() {
+ let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?);
+ me.buf.advance(n);
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}