diff options
author | Joel Galenson <jgalenson@google.com> | 2021-10-14 18:01:55 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2021-10-14 18:01:55 +0000 |
commit | 25df33f2351ed27dafa1a449ce2331d54292505f (patch) | |
tree | 3b2fc897370dbc19b8f0f0a0593a3dafa8b7665b | |
parent | 1a12f48ce417a106dc460825ebf99400bcd60c48 (diff) | |
parent | c129d53e3fa6917db60f9f596af11ee1951e7a59 (diff) | |
download | futures-25df33f2351ed27dafa1a449ce2331d54292505f.tar.gz |
Merge "Upgrade rust/crates/futures to 0.3.17"android-s-v2-preview-2android-s-v2-preview-1android-s-v2-beta-2android-s-v2-preview-1
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 23 | ||||
-rw-r--r-- | Cargo.toml | 16 | ||||
-rw-r--r-- | Cargo.toml.orig | 16 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | src/lib.rs | 20 | ||||
-rw-r--r-- | tests/async_await_macros.rs | 40 | ||||
-rw-r--r-- | tests/auto_traits.rs | 28 | ||||
-rw-r--r-- | tests/io_buf_reader.rs | 292 | ||||
-rw-r--r-- | tests/stream_peekable.rs | 19 |
10 files changed, 319 insertions, 145 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 99dc8b0..ffd4f55 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee" + "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010" } } @@ -41,6 +41,8 @@ rust_library { name: "libfutures", host_supported: true, crate_name: "futures", + cargo_env_compat: true, + cargo_pkg_version: "0.3.17", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -67,24 +69,3 @@ rust_library { ], min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// futures-channel-0.3.16 "alloc,futures-sink,sink,std" -// futures-core-0.3.16 "alloc,std" -// futures-executor-0.3.16 "std" -// futures-io-0.3.16 "std" -// futures-macro-0.3.16 -// futures-sink-0.3.16 "alloc,std" -// futures-task-0.3.16 "alloc,std" -// futures-util-0.3.16 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" -// memchr-2.4.0 "default,std" -// pin-project-lite-0.2.7 -// pin-utils-0.1.0 -// proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.7 -// proc-macro2-1.0.28 "default,proc-macro" -// quote-1.0.9 "default,proc-macro" -// slab-0.4.4 "default,std" -// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" -// unicode-xid-0.2.2 "default" @@ -12,7 +12,7 @@ [package] edition = "2018" name = "futures" -version = "0.3.16" +version = "0.3.17" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -29,33 +29,33 @@ rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] [dependencies.futures-channel] -version = "0.3.16" +version = "0.3.17" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures-executor] -version = "0.3.16" +version = "0.3.17" optional = true default-features = false [dependencies.futures-io] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures-sink] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures-task] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures-util] -version = "0.3.16" +version = "0.3.17" features = ["sink"] default-features = false [dev-dependencies.assert_matches] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 04cea8a..b01b12e 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures" edition = "2018" -version = "0.3.16" +version = "0.3.17" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" readme = "../README.md" @@ -16,13 +16,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.16", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.16", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.16", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.16", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.16", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.17", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.17", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.17", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.16.crate" + value: "https://static.crates.io/crates/futures/futures-0.3.17.crate" } - version: "0.3.16" + version: "0.3.17" license_type: NOTICE last_upgrade_date { year: 2021 - month: 8 - day: 9 + month: 9 + day: 22 } } @@ -102,26 +102,26 @@ compile_error!("The `bilock` feature requires the `unstable` feature as an expli #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[doc(hidden)] +#[doc(no_inline)] pub use futures_core::future::{Future, TryFuture}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::future::{FutureExt, TryFutureExt}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_core::stream::{Stream, TryStream}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::stream::{StreamExt, TryStreamExt}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_sink::Sink; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::sink::SinkExt; #[cfg(feature = "std")] -#[doc(hidden)] +#[doc(no_inline)] pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; #[cfg(feature = "std")] -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; // Macro reexports @@ -137,6 +137,10 @@ pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-a #[doc(inline)] pub use futures_util::{future, never, sink, stream, task}; +#[cfg(feature = "std")] +#[cfg(feature = "async-await")] +pub use futures_util::stream_select; + #[cfg(feature = "alloc")] #[doc(inline)] pub use futures_channel as channel; diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs index 19833d0..ce1f3a3 100644 --- a/tests/async_await_macros.rs +++ b/tests/async_await_macros.rs @@ -4,7 +4,9 @@ use futures::future::{self, poll_fn, FutureExt}; use futures::sink::SinkExt; use futures::stream::StreamExt; use futures::task::{Context, Poll}; -use futures::{join, pending, pin_mut, poll, select, select_biased, try_join}; +use futures::{ + join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, +}; use std::mem; #[test] @@ -309,6 +311,42 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { } #[test] +#[allow(unused_assignments)] +fn stream_select() { + // stream_select! macro + block_on(async { + let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); + + let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); + assert_eq!(endless_ones.next().await, Some(1)); + assert_eq!(endless_ones.next().await, Some(1)); + + let mut finite_list = + stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, None); + + let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); + // Take 1000, and assert a somewhat even distribution of values. + // The fairness is randomized, but over 1000 samples we should be pretty close to even. + // This test may be a bit flaky. Feel free to adjust the margins as you see fit. + let mut count = 0; + let results = endless_mixed + .take_while(move |_| { + count += 1; + let ret = count < 1000; + async move { ret } + }) + .collect::<Vec<_>>() + .await; + assert!(results.iter().filter(|x| **x == 1).count() >= 299); + assert!(results.iter().filter(|x| **x == 2).count() >= 299); + assert!(results.iter().filter(|x| **x == 3).count() >= 299); + }); +} + +#[test] fn join_size() { let fut = async { let ready = future::ready(0i32); diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs index e0192a1..b3d8b00 100644 --- a/tests/auto_traits.rs +++ b/tests/auto_traits.rs @@ -470,6 +470,13 @@ pub mod future { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn<PhantomPinned>: Unpin); + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + assert_impl!(Ready<()>: Send); assert_not_impl!(Ready<*const ()>: Send); assert_impl!(Ready<()>: Sync); @@ -810,6 +817,12 @@ pub mod io { assert_impl!(Seek<'_, ()>: Unpin); assert_not_impl!(Seek<'_, PhantomPinned>: Unpin); + assert_impl!(SeeKRelative<'_, ()>: Send); + assert_not_impl!(SeeKRelative<'_, *const ()>: Send); + assert_impl!(SeeKRelative<'_, ()>: Sync); + assert_not_impl!(SeeKRelative<'_, *const ()>: Sync); + assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin); + assert_impl!(Sink: Send); assert_impl!(Sink: Sync); assert_impl!(Sink: Unpin); @@ -1430,6 +1443,14 @@ pub mod stream { assert_not_impl!(Peek<'_, LocalStream<()>>: Sync); assert_impl!(Peek<'_, PinnedStream>: Unpin); + assert_impl!(PeekMut<'_, SendStream<()>>: Send); + assert_not_impl!(PeekMut<'_, SendStream>: Send); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send); + assert_impl!(PeekMut<'_, SyncStream<()>>: Sync); + assert_not_impl!(PeekMut<'_, SyncStream>: Sync); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync); + assert_impl!(PeekMut<'_, PinnedStream>: Unpin); + assert_impl!(Peekable<SendStream<()>>: Send); assert_not_impl!(Peekable<SendStream>: Send); assert_not_impl!(Peekable<LocalStream>: Send); @@ -1451,6 +1472,13 @@ pub mod stream { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn<PhantomPinned>: Unpin); + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + assert_impl!(ReadyChunks<SendStream<()>>: Send); assert_not_impl!(ReadyChunks<SendStream>: Send); assert_not_impl!(ReadyChunks<LocalStream>: Send); diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs index d60df87..717297c 100644 --- a/tests/io_buf_reader.rs +++ b/tests/io_buf_reader.rs @@ -2,25 +2,17 @@ use futures::executor::block_on; use futures::future::{Future, FutureExt}; use futures::io::{ AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, - BufReader, Cursor, SeekFrom, + BufReader, SeekFrom, }; +use futures::pin_mut; use futures::task::{Context, Poll}; use futures_test::task::noop_context; +use pin_project::pin_project; use std::cmp; use std::io; use std::pin::Pin; -macro_rules! run_fill_buf { - ($reader:expr) => {{ - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - break x; - } - } - }}; -} - +// helper for maybe_pending_* tests fn run<F: Future + Unpin>(mut f: F) -> F::Output { let mut cx = noop_context(); loop { @@ -30,6 +22,49 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output { } } +// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719 +#[pin_project(!Unpin)] +struct Cursor<T> { + #[pin] + inner: futures::io::Cursor<T>, +} + +impl<T> Cursor<T> { + fn new(inner: T) -> Self { + Self { inner: futures::io::Cursor::new(inner) } + } +} + +impl AsyncRead for Cursor<&[u8]> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncBufRead for Cursor<&[u8]> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) + } +} + +impl AsyncSeek for Cursor<&[u8]> { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + self.project().inner.poll_seek(cx, pos) + } +} + struct MaybePending<'a> { inner: &'a [u8], ready_read: bool, @@ -80,54 +115,119 @@ impl AsyncBufRead for MaybePending<'_> { #[test] fn test_buffered_reader() { - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, inner); - - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 3); - assert_eq!(buf, [5, 6, 7]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 2); - assert_eq!(buf, [0, 1]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [2]); - assert_eq!(reader.buffer(), [3]); + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); +} - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [3, 0, 0]); - assert_eq!(reader.buffer(), []); +#[test] +fn test_buffered_reader_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]); + reader.as_mut().consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); + }); +} - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [4, 0, 0]); - assert_eq!(reader.buffer(), []); +#[test] +fn test_buffered_reader_seek_relative() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert!(reader.as_mut().seek_relative(3).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(0).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]); + assert!(reader.as_mut().seek_relative(-1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(2).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]); + }); +} - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +#[test] +fn test_buffered_reader_invalidated_after_read() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 5); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); } #[test] -fn test_buffered_reader_seek() { - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); - - assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); - Pin::new(&mut reader).consume(1); - assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +fn test_buffered_reader_invalidated_after_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + assert!(reader.seek(SeekFrom::Current(5)).await.is_ok()); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); } #[test] @@ -156,24 +256,27 @@ fn test_buffered_reader_seek_underflow() { self.pos = self.pos.wrapping_add(n as u64); } SeekFrom::End(n) => { - self.pos = u64::max_value().wrapping_add(n as u64); + self.pos = u64::MAX.wrapping_add(n as u64); } } Ok(self.pos) } } - let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); - assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // the following seek will require two underlying seeks - let expected = 9_223_372_036_854_775_802; - assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // seeking to 0 should empty the buffer. - assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); - assert_eq!(reader.get_ref().get_ref().pos, expected); + block_on(async { + let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + pin_mut!(reader); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().get_ref().pos, expected); + }); } #[test] @@ -193,16 +296,18 @@ fn test_short_reads() { } } - let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; - let mut reader = BufReader::new(AllowStdIo::new(inner)); - let mut buf = [0, 0]; - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + block_on(async { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); } #[test] @@ -263,7 +368,9 @@ fn maybe_pending_buf_read() { // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 #[test] fn maybe_pending_seek() { + #[pin_project] struct MaybePendingSeek<'a> { + #[pin] inner: Cursor<&'a [u8]>, ready: bool, } @@ -276,25 +383,21 @@ fn maybe_pending_seek() { impl AsyncRead for MaybePendingSeek<'_> { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - Pin::new(&mut self.inner).poll_read(cx, buf) + self.project().inner.poll_read(cx, buf) } } impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<&[u8]>> { - let this: *mut Self = &mut *self as *mut _; - Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) } } @@ -305,24 +408,25 @@ fn maybe_pending_seek() { pos: SeekFrom, ) -> Poll<io::Result<u64>> { if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_seek(cx, pos) + *self.as_mut().project().ready = false; + self.project().inner.poll_seek(cx, pos) } else { - self.ready = true; + *self.project().ready = true; Poll::Pending } } } let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + pin_mut!(reader); assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..])); Pin::new(&mut reader).consume(1); assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); } diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs index 2fa7f3a..153fcc2 100644 --- a/tests/stream_peekable.rs +++ b/tests/stream_peekable.rs @@ -9,6 +9,25 @@ fn peekable() { pin_mut!(peekable); assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]); + + let s = stream::once(async { 1 }).peekable(); + pin_mut!(s); + assert_eq!(s.as_mut().peek().await, Some(&1u8)); + assert_eq!(s.collect::<Vec<u8>>().await, vec![1]); + }); +} + +#[test] +fn peekable_mut() { + block_on(async { + let s = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(s); + if let Some(p) = s.as_mut().peek_mut().await { + if *p == 1 { + *p = 5; + } + } + assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]); }); } |