diff options
Diffstat (limited to 'src/fs/file.rs')
-rw-r--r-- | src/fs/file.rs | 296 |
1 files changed, 133 insertions, 163 deletions
diff --git a/src/fs/file.rs b/src/fs/file.rs index f3bc985..7c71f48 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -5,7 +5,8 @@ use self::State::*; use crate::fs::{asyncify, sys}; use crate::io::blocking::Buf; -use crate::io::{AsyncRead, AsyncSeek, AsyncWrite}; +use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; +use crate::sync::Mutex; use std::fmt; use std::fs::{Metadata, Permissions}; @@ -80,12 +81,18 @@ use std::task::Poll::*; /// ``` pub struct File { std: Arc<sys::File>, + inner: Mutex<Inner>, +} + +struct Inner { state: State, /// Errors from writes/flushes are returned in write/flush calls. If a write /// error is observed while performing a read, it is saved until the next /// write / flush call. last_write_err: Option<io::ErrorKind>, + + pos: u64, } #[derive(Debug)] @@ -197,70 +204,11 @@ impl File { pub fn from_std(std: sys::File) -> File { File { std: Arc::new(std), - state: State::Idle(Some(Buf::with_capacity(0))), - last_write_err: None, - } - } - - /// Seeks to an offset, in bytes, in a stream. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::fs::File; - /// use tokio::prelude::*; - /// - /// use std::io::SeekFrom; - /// - /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; - /// file.seek(SeekFrom::Start(6)).await?; - /// - /// let mut contents = vec![0u8; 10]; - /// file.read_exact(&mut contents).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// The [`read_exact`] method is defined on the [`AsyncReadExt`] trait. - /// - /// [`read_exact`]: fn@crate::io::AsyncReadExt::read_exact - /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt - pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> { - self.complete_inflight().await; - - let mut buf = match self.state { - Idle(ref mut buf_cell) => buf_cell.take().unwrap(), - _ => unreachable!(), - }; - - // Factor in any unread data from the buf - if !buf.is_empty() { - let n = buf.discard_read(); - - if let SeekFrom::Current(ref mut offset) = pos { - *offset += n; - } - } - - let std = self.std.clone(); - - // Start the operation - self.state = Busy(sys::run(move || { - let res = (&*std).seek(pos); - (Operation::Seek(res), buf) - })); - - let (op, buf) = match self.state { - Idle(_) => unreachable!(), - Busy(ref mut rx) => rx.await.unwrap(), - }; - - self.state = Idle(Some(buf)); - - match op { - Operation::Seek(res) => res, - _ => unreachable!(), + inner: Mutex::new(Inner { + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + pos: 0, + }), } } @@ -287,8 +235,9 @@ impl File { /// /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - pub async fn sync_all(&mut self) -> io::Result<()> { - self.complete_inflight().await; + pub async fn sync_all(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; let std = self.std.clone(); asyncify(move || std.sync_all()).await @@ -321,8 +270,9 @@ impl File { /// /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - pub async fn sync_data(&mut self) -> io::Result<()> { - self.complete_inflight().await; + pub async fn sync_data(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; let std = self.std.clone(); asyncify(move || std.sync_data()).await @@ -358,10 +308,11 @@ impl File { /// /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - pub async fn set_len(&mut self, size: u64) -> io::Result<()> { - self.complete_inflight().await; + pub async fn set_len(&self, size: u64) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; - let mut buf = match self.state { + let mut buf = match inner.state { Idle(ref mut buf_cell) => buf_cell.take().unwrap(), _ => unreachable!(), }; @@ -374,7 +325,7 @@ impl File { let std = self.std.clone(); - self.state = Busy(sys::run(move || { + inner.state = Busy(sys::run(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| std.set_len(size)) } else { @@ -386,15 +337,17 @@ impl File { (Operation::Seek(res), buf) })); - let (op, buf) = match self.state { + let (op, buf) = match inner.state { Idle(_) => unreachable!(), Busy(ref mut rx) => rx.await?, }; - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); match op { - Operation::Seek(res) => res.map(|_| ()), + Operation::Seek(res) => res.map(|pos| { + inner.pos = pos; + }), _ => unreachable!(), } } @@ -459,7 +412,7 @@ impl File { /// # } /// ``` pub async fn into_std(mut self) -> sys::File { - self.complete_inflight().await; + self.inner.get_mut().complete_inflight().await; Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed") } @@ -526,42 +479,32 @@ impl File { let std = self.std.clone(); asyncify(move || std.set_permissions(perm)).await } - - async fn complete_inflight(&mut self) { - use crate::future::poll_fn; - - if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { - self.last_write_err = Some(e.kind()); - } - } } impl AsyncRead for File { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [std::mem::MaybeUninit<u8>]) -> bool { - // https://github.com/rust-lang/rust/blob/09c817eeb29e764cfc12d0a8d94841e3ffe34023/src/libstd/fs.rs#L668 - false - } - fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, - dst: &mut [u8], - ) -> Poll<io::Result<usize>> { + dst: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + loop { - match self.state { + match inner.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); if !buf.is_empty() { - let n = buf.copy_to(dst); + buf.copy_to(dst); *buf_cell = Some(buf); - return Ready(Ok(n)); + return Ready(Ok(())); } buf.ensure_capacity_for(dst); - let std = self.std.clone(); + let std = me.std.clone(); - self.state = Busy(sys::run(move || { + inner.state = Busy(sys::run(move || { let res = buf.read_from(&mut &*std); (Operation::Read(res), buf) })); @@ -571,29 +514,32 @@ impl AsyncRead for File { match op { Operation::Read(Ok(_)) => { - let n = buf.copy_to(dst); - self.state = Idle(Some(buf)); - return Ready(Ok(n)); + buf.copy_to(dst); + inner.state = Idle(Some(buf)); + return Ready(Ok(())); } Operation::Read(Err(e)) => { assert!(buf.is_empty()); - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); return Ready(Err(e)); } Operation::Write(Ok(_)) => { assert!(buf.is_empty()); - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); continue; } Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); - self.state = Idle(Some(buf)); + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + inner.state = Idle(Some(buf)); } - Operation::Seek(_) => { + Operation::Seek(result) => { assert!(buf.is_empty()); - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); + if let Ok(pos) = result { + inner.pos = pos; + } continue; } } @@ -604,13 +550,13 @@ impl AsyncRead for File { } impl AsyncSeek for File { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut pos: SeekFrom, - ) -> Poll<io::Result<()>> { + fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + loop { - match self.state { + match inner.state { + Busy(_) => panic!("must wait for poll_complete before calling start_seek"), Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); @@ -623,49 +569,41 @@ impl AsyncSeek for File { } } - let std = self.std.clone(); + let std = me.std.clone(); - self.state = Busy(sys::run(move || { + inner.state = Busy(sys::run(move || { let res = (&*std).seek(pos); (Operation::Seek(res), buf) })); - - return Ready(Ok(())); - } - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - - match op { - Operation::Read(_) => {} - Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); - } - Operation::Write(_) => {} - Operation::Seek(_) => {} - } + return Ok(()); } } } } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let inner = self.inner.get_mut(); + loop { - match self.state { - Idle(_) => panic!("must call start_seek before calling poll_complete"), + match inner.state { + Idle(_) => return Poll::Ready(Ok(inner.pos)), Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); match op { Operation::Read(_) => {} Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); } Operation::Write(_) => {} - Operation::Seek(res) => return Ready(res), + Operation::Seek(res) => { + if let Ok(pos) = res { + inner.pos = pos; + } + return Ready(res); + } } } } @@ -675,16 +613,19 @@ impl AsyncSeek for File { impl AsyncWrite for File { fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll<io::Result<usize>> { - if let Some(e) = self.last_write_err.take() { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { return Ready(Err(e.into())); } loop { - match self.state { + match inner.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); @@ -695,9 +636,9 @@ impl AsyncWrite for File { }; let n = buf.copy_from(src); - let std = self.std.clone(); + let std = me.std.clone(); - self.state = Busy(sys::run(move || { + inner.state = Busy(sys::run(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) } else { @@ -711,7 +652,7 @@ impl AsyncWrite for File { } Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); + inner.state = Idle(Some(buf)); match op { Operation::Read(_) => { @@ -737,27 +678,12 @@ impl AsyncWrite for File { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - if let Some(e) = self.last_write_err.take() { - return Ready(Err(e.into())); - } - - let (op, buf) = match self.state { - Idle(_) => return Ready(Ok(())), - Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, - }; - - // The buffer is not used here - self.state = Idle(Some(buf)); - - match op { - Operation::Read(_) => Ready(Ok(())), - Operation::Write(res) => Ready(res), - Operation::Seek(_) => Ready(Ok(())), - } + let inner = self.inner.get_mut(); + inner.poll_flush(cx) } - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - Poll::Ready(Ok(())) + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + self.poll_flush(cx) } } @@ -782,9 +708,53 @@ impl std::os::unix::io::AsRawFd for File { } } +#[cfg(unix)] +impl std::os::unix::io::FromRawFd for File { + unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { + sys::File::from_raw_fd(fd).into() + } +} + #[cfg(windows)] impl std::os::windows::io::AsRawHandle for File { fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { self.std.as_raw_handle() } } + +#[cfg(windows)] +impl std::os::windows::io::FromRawHandle for File { + unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self { + sys::File::from_raw_handle(handle).into() + } +} + +impl Inner { + async fn complete_inflight(&mut self) { + use crate::future::poll_fn; + + if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { + self.last_write_err = Some(e.kind()); + } + } + + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + let (op, buf) = match self.state { + Idle(_) => return Ready(Ok(())), + Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, + }; + + // The buffer is not used here + self.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => Ready(Ok(())), + Operation::Write(res) => Ready(res), + Operation::Seek(_) => Ready(Ok(())), + } + } +} |