aboutsummaryrefslogtreecommitdiff
path: root/src/io/inspect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/inspect.rs')
-rw-r--r--src/io/inspect.rs134
1 files changed, 134 insertions, 0 deletions
diff --git a/src/io/inspect.rs b/src/io/inspect.rs
new file mode 100644
index 0000000..ec5bb97
--- /dev/null
+++ b/src/io/inspect.rs
@@ -0,0 +1,134 @@
+use futures_core::ready;
+use pin_project_lite::pin_project;
+use std::io::{IoSlice, Result};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+
+pin_project! {
+ /// An adapter that lets you inspect the data that's being read.
+ ///
+ /// This is useful for things like hashing data as it's read in.
+ pub struct InspectReader<R, F> {
+ #[pin]
+ reader: R,
+ f: F,
+ }
+}
+
+impl<R, F> InspectReader<R, F> {
+ /// Create a new InspectReader, wrapping `reader` and calling `f` for the
+ /// new data supplied by each read call.
+ ///
+ /// The closure will only be called with an empty slice if the inner reader
+ /// returns without reading data into the buffer. This happens at EOF, or if
+ /// `poll_read` is called with a zero-size buffer.
+ pub fn new(reader: R, f: F) -> InspectReader<R, F>
+ where
+ R: AsyncRead,
+ F: FnMut(&[u8]),
+ {
+ InspectReader { reader, f }
+ }
+
+ /// Consumes the `InspectReader`, returning the wrapped reader
+ pub fn into_inner(self) -> R {
+ self.reader
+ }
+}
+
+impl<R: AsyncRead, F: FnMut(&[u8])> AsyncRead for InspectReader<R, F> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<()>> {
+ let me = self.project();
+ let filled_length = buf.filled().len();
+ ready!(me.reader.poll_read(cx, buf))?;
+ (me.f)(&buf.filled()[filled_length..]);
+ Poll::Ready(Ok(()))
+ }
+}
+
+pin_project! {
+ /// An adapter that lets you inspect the data that's being written.
+ ///
+ /// This is useful for things like hashing data as it's written out.
+ pub struct InspectWriter<W, F> {
+ #[pin]
+ writer: W,
+ f: F,
+ }
+}
+
+impl<W, F> InspectWriter<W, F> {
+ /// Create a new InspectWriter, wrapping `write` and calling `f` for the
+ /// data successfully written by each write call.
+ ///
+ /// The closure `f` will never be called with an empty slice. A vectored
+ /// write can result in multiple calls to `f` - at most one call to `f` per
+ /// buffer supplied to `poll_write_vectored`.
+ pub fn new(writer: W, f: F) -> InspectWriter<W, F>
+ where
+ W: AsyncWrite,
+ F: FnMut(&[u8]),
+ {
+ InspectWriter { writer, f }
+ }
+
+ /// Consumes the `InspectWriter`, returning the wrapped writer
+ pub fn into_inner(self) -> W {
+ self.writer
+ }
+}
+
+impl<W: AsyncWrite, F: FnMut(&[u8])> AsyncWrite for InspectWriter<W, F> {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let me = self.project();
+ let res = me.writer.poll_write(cx, buf);
+ if let Poll::Ready(Ok(count)) = res {
+ if count != 0 {
+ (me.f)(&buf[..count]);
+ }
+ }
+ res
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let me = self.project();
+ me.writer.poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let me = self.project();
+ me.writer.poll_shutdown(cx)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<Result<usize>> {
+ let me = self.project();
+ let res = me.writer.poll_write_vectored(cx, bufs);
+ if let Poll::Ready(Ok(mut count)) = res {
+ for buf in bufs {
+ if count == 0 {
+ break;
+ }
+ let size = count.min(buf.len());
+ if size != 0 {
+ (me.f)(&buf[..size]);
+ count -= size;
+ }
+ }
+ }
+ res
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.writer.is_write_vectored()
+ }
+}