diff options
Diffstat (limited to 'src/io/util/buf_writer.rs')
-rw-r--r-- | src/io/util/buf_writer.rs | 62 |
1 files changed, 60 insertions, 2 deletions
diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs index 5e3d4b7..4e8e493 100644 --- a/src/io/util/buf_writer.rs +++ b/src/io/util/buf_writer.rs @@ -1,9 +1,9 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, Write}; +use std::io::{self, SeekFrom, Write}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,6 +34,7 @@ pin_project! { pub(super) inner: W, pub(super) buf: Vec<u8>, pub(super) written: usize, + pub(super) seek_state: SeekState, } } @@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> { inner, buf: Vec::with_capacity(cap), written: 0, + seek_state: SeekState::Init, } } @@ -142,6 +144,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to the offset, in bytes, in the underlying writer. +/// +/// Seeking always writes out the internal buffer before seeking. +impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We need to flush the internal buffer before seeking. + // It receives a `Context` and returns a `Poll`, so it cannot be called + // inside `start_seek`. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let pos = match self.seek_state { + SeekState::Init => { + return self.project().inner.poll_complete(cx); + } + SeekState::Start(pos) => Some(pos), + SeekState::Pending => None, + }; + + // Flush the internal buffer before seeking. + ready!(self.as_mut().flush_buf(cx))?; + + let mut me = self.project(); + if let Some(pos) = pos { + // Ensure previous seeks have finished before starting a new one + ready!(me.inner.as_mut().poll_complete(cx))?; + if let Err(e) = me.inner.as_mut().start_seek(pos) { + *me.seek_state = SeekState::Init; + return Poll::Ready(Err(e)); + } + } + match me.inner.poll_complete(cx) { + Poll::Ready(res) => { + *me.seek_state = SeekState::Init; + Poll::Ready(res) + } + Poll::Pending => { + *me.seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> { fn poll_read( self: Pin<&mut Self>, |