aboutsummaryrefslogtreecommitdiff
path: root/src/io/util/read_to_end.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/util/read_to_end.rs')
-rw-r--r--src/io/util/read_to_end.rs153
1 files changed, 78 insertions, 75 deletions
diff --git a/src/io/util/read_to_end.rs b/src/io/util/read_to_end.rs
index a2cd99b..a974625 100644
--- a/src/io/util/read_to_end.rs
+++ b/src/io/util/read_to_end.rs
@@ -1,92 +1,105 @@
-use crate::io::AsyncRead;
+use crate::io::{AsyncRead, ReadBuf};
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
-use std::mem::MaybeUninit;
+use std::marker::PhantomPinned;
+use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
-pub struct ReadToEnd<'a, R: ?Sized> {
- reader: &'a mut R,
- buf: &'a mut Vec<u8>,
- start_len: usize,
+pin_project! {
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct ReadToEnd<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
+ read: usize,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
}
-pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
+pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
- let start_len = buf.len();
ReadToEnd {
reader,
- buf,
- start_len,
+ buf: buffer,
+ read: 0,
+ _pin: PhantomPinned,
}
}
-struct Guard<'a> {
- buf: &'a mut Vec<u8>,
- len: usize,
-}
-
-impl Drop for Guard<'_> {
- fn drop(&mut self) {
- unsafe {
- self.buf.set_len(self.len);
+pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
+ buf: &mut Vec<u8>,
+ mut reader: Pin<&mut R>,
+ num_read: &mut usize,
+ cx: &mut Context<'_>,
+) -> Poll<io::Result<usize>> {
+ loop {
+ // safety: The caller promised to prepare the buffer.
+ let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
+ match ret {
+ Err(err) => return Poll::Ready(Err(err)),
+ Ok(0) => return Poll::Ready(Ok(mem::replace(num_read, 0))),
+ Ok(num) => {
+ *num_read += num;
+ }
}
}
}
-// This uses an adaptive system to extend the vector when it fills. We want to
-// avoid paying to allocate and zero a huge chunk of memory if the reader only
-// has 4 bytes while still making large reads if the reader does have a ton
-// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
-// time is 4,500 times (!) slower than this if the reader has a very small
-// amount of data to return.
-//
-// Because we're extending the buffer with uninitialized data for trusted
-// readers, we need to make sure to truncate that if any of this panics.
-pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
- mut rd: Pin<&mut R>,
- cx: &mut Context<'_>,
+/// Tries to read from the provided AsyncRead.
+///
+/// The length of the buffer is increased by the number of bytes read.
+fn poll_read_to_end<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
- start_len: usize,
+ read: Pin<&mut R>,
+ cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
- let mut g = Guard {
- len: buf.len(),
- buf,
- };
- let ret;
- loop {
- if g.len == g.buf.len() {
- unsafe {
- g.buf.reserve(32);
- let capacity = g.buf.capacity();
- g.buf.set_len(capacity);
+ // This uses an adaptive system to extend the vector when it fills. We want to
+ // avoid paying to allocate and zero a huge chunk of memory if the reader only
+ // has 4 bytes while still making large reads if the reader does have a ton
+ // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+ // time is 4,500 times (!) slower than this if the reader has a very small
+ // amount of data to return.
+ reserve(buf, 32);
- let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
+ let mut unused_capacity = ReadBuf::uninit(get_unused_capacity(buf));
- rd.prepare_uninitialized_buffer(b);
- }
- }
+ ready!(read.poll_read(cx, &mut unused_capacity))?;
- match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
- Ok(0) => {
- ret = Poll::Ready(Ok(g.len - start_len));
- break;
- }
- Ok(n) => g.len += n,
- Err(e) => {
- ret = Poll::Ready(Err(e));
- break;
- }
- }
+ let n = unused_capacity.filled().len();
+ let new_len = buf.len() + n;
+
+ // This should no longer even be possible in safe Rust. An implementor
+ // would need to have unsafely *replaced* the buffer inside `ReadBuf`,
+ // which... yolo?
+ assert!(new_len <= buf.capacity());
+ unsafe {
+ buf.set_len(new_len);
}
+ Poll::Ready(Ok(n))
+}
- ret
+/// Allocates more memory and ensures that the unused capacity is prepared for use
+/// with the `AsyncRead`.
+fn reserve(buf: &mut Vec<u8>, bytes: usize) {
+ if buf.capacity() - buf.len() >= bytes {
+ return;
+ }
+ buf.reserve(bytes);
+}
+
+/// Returns the unused capacity of the provided vector.
+fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
+ let uninit = bytes::BufMut::bytes_mut(buf);
+ unsafe { &mut *(uninit as *mut _ as *mut [MaybeUninit<u8>]) }
}
impl<A> Future for ReadToEnd<'_, A>
@@ -95,19 +108,9 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let this = &mut *self;
- read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadToEnd<'_, PhantomPinned>>();
+ read_to_end_internal(me.buf, Pin::new(*me.reader), me.read, cx)
}
}