aboutsummaryrefslogtreecommitdiff
path: root/src/fs/file.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/file.rs')
-rw-r--r--src/fs/file.rs296
1 files changed, 133 insertions, 163 deletions
diff --git a/src/fs/file.rs b/src/fs/file.rs
index f3bc985..7c71f48 100644
--- a/src/fs/file.rs
+++ b/src/fs/file.rs
@@ -5,7 +5,8 @@
use self::State::*;
use crate::fs::{asyncify, sys};
use crate::io::blocking::Buf;
-use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
+use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
+use crate::sync::Mutex;
use std::fmt;
use std::fs::{Metadata, Permissions};
@@ -80,12 +81,18 @@ use std::task::Poll::*;
/// ```
pub struct File {
std: Arc<sys::File>,
+ inner: Mutex<Inner>,
+}
+
+struct Inner {
state: State,
/// Errors from writes/flushes are returned in write/flush calls. If a write
/// error is observed while performing a read, it is saved until the next
/// write / flush call.
last_write_err: Option<io::ErrorKind>,
+
+ pos: u64,
}
#[derive(Debug)]
@@ -197,70 +204,11 @@ impl File {
pub fn from_std(std: sys::File) -> File {
File {
std: Arc::new(std),
- state: State::Idle(Some(Buf::with_capacity(0))),
- last_write_err: None,
- }
- }
-
- /// Seeks to an offset, in bytes, in a stream.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::fs::File;
- /// use tokio::prelude::*;
- ///
- /// use std::io::SeekFrom;
- ///
- /// # async fn dox() -> std::io::Result<()> {
- /// let mut file = File::open("foo.txt").await?;
- /// file.seek(SeekFrom::Start(6)).await?;
- ///
- /// let mut contents = vec![0u8; 10];
- /// file.read_exact(&mut contents).await?;
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// The [`read_exact`] method is defined on the [`AsyncReadExt`] trait.
- ///
- /// [`read_exact`]: fn@crate::io::AsyncReadExt::read_exact
- /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
- pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
- self.complete_inflight().await;
-
- let mut buf = match self.state {
- Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
- _ => unreachable!(),
- };
-
- // Factor in any unread data from the buf
- if !buf.is_empty() {
- let n = buf.discard_read();
-
- if let SeekFrom::Current(ref mut offset) = pos {
- *offset += n;
- }
- }
-
- let std = self.std.clone();
-
- // Start the operation
- self.state = Busy(sys::run(move || {
- let res = (&*std).seek(pos);
- (Operation::Seek(res), buf)
- }));
-
- let (op, buf) = match self.state {
- Idle(_) => unreachable!(),
- Busy(ref mut rx) => rx.await.unwrap(),
- };
-
- self.state = Idle(Some(buf));
-
- match op {
- Operation::Seek(res) => res,
- _ => unreachable!(),
+ inner: Mutex::new(Inner {
+ state: State::Idle(Some(Buf::with_capacity(0))),
+ last_write_err: None,
+ pos: 0,
+ }),
}
}
@@ -287,8 +235,9 @@ impl File {
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn sync_all(&mut self) -> io::Result<()> {
- self.complete_inflight().await;
+ pub async fn sync_all(&self) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
let std = self.std.clone();
asyncify(move || std.sync_all()).await
@@ -321,8 +270,9 @@ impl File {
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn sync_data(&mut self) -> io::Result<()> {
- self.complete_inflight().await;
+ pub async fn sync_data(&self) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
let std = self.std.clone();
asyncify(move || std.sync_data()).await
@@ -358,10 +308,11 @@ impl File {
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn set_len(&mut self, size: u64) -> io::Result<()> {
- self.complete_inflight().await;
+ pub async fn set_len(&self, size: u64) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
- let mut buf = match self.state {
+ let mut buf = match inner.state {
Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
_ => unreachable!(),
};
@@ -374,7 +325,7 @@ impl File {
let std = self.std.clone();
- self.state = Busy(sys::run(move || {
+ inner.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
} else {
@@ -386,15 +337,17 @@ impl File {
(Operation::Seek(res), buf)
}));
- let (op, buf) = match self.state {
+ let (op, buf) = match inner.state {
Idle(_) => unreachable!(),
Busy(ref mut rx) => rx.await?,
};
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
match op {
- Operation::Seek(res) => res.map(|_| ()),
+ Operation::Seek(res) => res.map(|pos| {
+ inner.pos = pos;
+ }),
_ => unreachable!(),
}
}
@@ -459,7 +412,7 @@ impl File {
/// # }
/// ```
pub async fn into_std(mut self) -> sys::File {
- self.complete_inflight().await;
+ self.inner.get_mut().complete_inflight().await;
Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
}
@@ -526,42 +479,32 @@ impl File {
let std = self.std.clone();
asyncify(move || std.set_permissions(perm)).await
}
-
- async fn complete_inflight(&mut self) {
- use crate::future::poll_fn;
-
- if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
- self.last_write_err = Some(e.kind());
- }
- }
}
impl AsyncRead for File {
- unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [std::mem::MaybeUninit<u8>]) -> bool {
- // https://github.com/rust-lang/rust/blob/09c817eeb29e764cfc12d0a8d94841e3ffe34023/src/libstd/fs.rs#L668
- false
- }
-
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
- dst: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ dst: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
loop {
- match self.state {
+ match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
if !buf.is_empty() {
- let n = buf.copy_to(dst);
+ buf.copy_to(dst);
*buf_cell = Some(buf);
- return Ready(Ok(n));
+ return Ready(Ok(()));
}
buf.ensure_capacity_for(dst);
- let std = self.std.clone();
+ let std = me.std.clone();
- self.state = Busy(sys::run(move || {
+ inner.state = Busy(sys::run(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
@@ -571,29 +514,32 @@ impl AsyncRead for File {
match op {
Operation::Read(Ok(_)) => {
- let n = buf.copy_to(dst);
- self.state = Idle(Some(buf));
- return Ready(Ok(n));
+ buf.copy_to(dst);
+ inner.state = Idle(Some(buf));
+ return Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
return Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
- assert!(self.last_write_err.is_none());
- self.last_write_err = Some(e.kind());
- self.state = Idle(Some(buf));
+ assert!(inner.last_write_err.is_none());
+ inner.last_write_err = Some(e.kind());
+ inner.state = Idle(Some(buf));
}
- Operation::Seek(_) => {
+ Operation::Seek(result) => {
assert!(buf.is_empty());
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
+ if let Ok(pos) = result {
+ inner.pos = pos;
+ }
continue;
}
}
@@ -604,13 +550,13 @@ impl AsyncRead for File {
}
impl AsyncSeek for File {
- fn start_seek(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- mut pos: SeekFrom,
- ) -> Poll<io::Result<()>> {
+ fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
loop {
- match self.state {
+ match inner.state {
+ Busy(_) => panic!("must wait for poll_complete before calling start_seek"),
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
@@ -623,49 +569,41 @@ impl AsyncSeek for File {
}
}
- let std = self.std.clone();
+ let std = me.std.clone();
- self.state = Busy(sys::run(move || {
+ inner.state = Busy(sys::run(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
-
- return Ready(Ok(()));
- }
- Busy(ref mut rx) => {
- let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
-
- match op {
- Operation::Read(_) => {}
- Operation::Write(Err(e)) => {
- assert!(self.last_write_err.is_none());
- self.last_write_err = Some(e.kind());
- }
- Operation::Write(_) => {}
- Operation::Seek(_) => {}
- }
+ return Ok(());
}
}
}
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let inner = self.inner.get_mut();
+
loop {
- match self.state {
- Idle(_) => panic!("must call start_seek before calling poll_complete"),
+ match inner.state {
+ Idle(_) => return Poll::Ready(Ok(inner.pos)),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
- assert!(self.last_write_err.is_none());
- self.last_write_err = Some(e.kind());
+ assert!(inner.last_write_err.is_none());
+ inner.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
- Operation::Seek(res) => return Ready(res),
+ Operation::Seek(res) => {
+ if let Ok(pos) = res {
+ inner.pos = pos;
+ }
+ return Ready(res);
+ }
}
}
}
@@ -675,16 +613,19 @@ impl AsyncSeek for File {
impl AsyncWrite for File {
fn poll_write(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
) -> Poll<io::Result<usize>> {
- if let Some(e) = self.last_write_err.take() {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
+ if let Some(e) = inner.last_write_err.take() {
return Ready(Err(e.into()));
}
loop {
- match self.state {
+ match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
@@ -695,9 +636,9 @@ impl AsyncWrite for File {
};
let n = buf.copy_from(src);
- let std = self.std.clone();
+ let std = me.std.clone();
- self.state = Busy(sys::run(move || {
+ inner.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
@@ -711,7 +652,7 @@ impl AsyncWrite for File {
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
+ inner.state = Idle(Some(buf));
match op {
Operation::Read(_) => {
@@ -737,27 +678,12 @@ impl AsyncWrite for File {
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- if let Some(e) = self.last_write_err.take() {
- return Ready(Err(e.into()));
- }
-
- let (op, buf) = match self.state {
- Idle(_) => return Ready(Ok(())),
- Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
- };
-
- // The buffer is not used here
- self.state = Idle(Some(buf));
-
- match op {
- Operation::Read(_) => Ready(Ok(())),
- Operation::Write(res) => Ready(res),
- Operation::Seek(_) => Ready(Ok(())),
- }
+ let inner = self.inner.get_mut();
+ inner.poll_flush(cx)
}
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- Poll::Ready(Ok(()))
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ self.poll_flush(cx)
}
}
@@ -782,9 +708,53 @@ impl std::os::unix::io::AsRawFd for File {
}
}
+#[cfg(unix)]
+impl std::os::unix::io::FromRawFd for File {
+ unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
+ sys::File::from_raw_fd(fd).into()
+ }
+}
+
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for File {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.std.as_raw_handle()
}
}
+
+#[cfg(windows)]
+impl std::os::windows::io::FromRawHandle for File {
+ unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
+ sys::File::from_raw_handle(handle).into()
+ }
+}
+
+impl Inner {
+ async fn complete_inflight(&mut self) {
+ use crate::future::poll_fn;
+
+ if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
+ self.last_write_err = Some(e.kind());
+ }
+ }
+
+ fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ let (op, buf) = match self.state {
+ Idle(_) => return Ready(Ok(())),
+ Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
+ };
+
+ // The buffer is not used here
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => Ready(Ok(())),
+ Operation::Write(res) => Ready(res),
+ Operation::Seek(_) => Ready(Ok(())),
+ }
+ }
+}