aboutsummaryrefslogtreecommitdiff
path: root/src/io/util/buf_reader.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/util/buf_reader.rs')
-rw-r--r--src/io/util/buf_reader.rs59
1 files changed, 19 insertions, 40 deletions
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index a1c5990..271f61b 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -1,10 +1,8 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
-use bytes::Buf;
use pin_project_lite::pin_project;
-use std::io::{self, Read};
-use std::mem::MaybeUninit;
+use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, fmt};
@@ -44,21 +42,12 @@ impl<R: AsyncRead> BufReader<R> {
/// Creates a new `BufReader` with the specified buffer capacity.
pub fn with_capacity(capacity: usize, inner: R) -> Self {
- unsafe {
- let mut buffer = Vec::with_capacity(capacity);
- buffer.set_len(capacity);
-
- {
- // Convert to MaybeUninit
- let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
- inner.prepare_uninitialized_buffer(b);
- }
- Self {
- inner,
- buf: buffer.into_boxed_slice(),
- pos: 0,
- cap: 0,
- }
+ let buffer = vec![0; capacity];
+ Self {
+ inner,
+ buf: buffer.into_boxed_slice(),
+ pos: 0,
+ cap: 0,
}
}
@@ -110,25 +99,21 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
- if self.pos == self.cap && buf.len() >= self.buf.len() {
+ if self.pos == self.cap && buf.remaining() >= self.buf.len() {
let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
- let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
- let nread = rem.read(buf)?;
- self.consume(nread);
- Poll::Ready(Ok(nread))
- }
-
- // we can't skip unconditionally because of the large buffer case in read.
- unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
- self.inner.prepare_uninitialized_buffer(buf)
+ let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let amt = std::cmp::min(rem.len(), buf.remaining());
+ buf.put_slice(&rem[..amt]);
+ self.consume(amt);
+ Poll::Ready(Ok(()))
}
}
@@ -142,7 +127,9 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
// to tell the compiler that the pos..cap slice is always valid.
if *me.pos >= *me.cap {
debug_assert!(*me.pos == *me.cap);
- *me.cap = ready!(me.inner.poll_read(cx, me.buf))?;
+ let mut buf = ReadBuf::new(me.buf);
+ ready!(me.inner.poll_read(cx, &mut buf))?;
+ *me.cap = buf.filled().len();
*me.pos = 0;
}
Poll::Ready(Ok(&me.buf[*me.pos..*me.cap]))
@@ -163,14 +150,6 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
self.get_pin_mut().poll_write(cx, buf)
}
- fn poll_write_buf<B: Buf>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<io::Result<usize>> {
- self.get_pin_mut().poll_write_buf(cx, buf)
- }
-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}