diff options
author | Joel Galenson <jgalenson@google.com> | 2021-05-25 13:18:57 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-05-25 13:18:57 +0000 |
commit | 42adae9f5068706bf1825d38d7744ce1f1b07a76 (patch) | |
tree | 0cab22263910d38d37989cc6012cfde7b4c6756b | |
parent | de595d83f7c89c21e4f97ce775b94c5029424e80 (diff) | |
parent | 1aa349d12fd4316668ce00cf2e6d1aafd132b8f9 (diff) | |
download | futures-channel-42adae9f5068706bf1825d38d7744ce1f1b07a76.tar.gz |
Upgrade rust/crates/futures-channel to 0.3.15 am: cc0890a4cf am: 1aa349d12f
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-channel/+/1712421
Change-Id: I4bb6d234592a29e36285e4641d0bd2ab5760ffa6
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | Cargo.toml | 10 | ||||
-rw-r--r-- | Cargo.toml.orig | 15 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | TEST_MAPPING | 126 | ||||
-rw-r--r-- | benches/sync_mpsc.rs | 15 | ||||
-rw-r--r-- | build.rs | 42 | ||||
-rw-r--r-- | no_atomic_cas.rs | 11 | ||||
-rw-r--r-- | src/lib.rs | 8 | ||||
-rw-r--r-- | src/lock.rs | 7 | ||||
-rw-r--r-- | src/mpsc/mod.rs | 193 | ||||
-rw-r--r-- | src/mpsc/queue.rs | 22 | ||||
-rw-r--r-- | src/mpsc/sink_impl.rs | 58 | ||||
-rw-r--r-- | src/oneshot.rs | 39 | ||||
-rw-r--r-- | tests/channel.rs | 10 | ||||
-rw-r--r-- | tests/mpsc-close.rs | 50 | ||||
-rw-r--r-- | tests/mpsc.rs | 33 | ||||
-rw-r--r-- | tests/oneshot.rs | 4 |
19 files changed, 363 insertions, 292 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f3ad3ab..ec6442e 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" + "sha1": "fc080d153bc7bf00429ec5e2b91e2f21f2243846" } } @@ -62,4 +62,4 @@ rust_library { } // dependent_library ["feature_list"] -// futures-core-0.3.14 "alloc,std" +// futures-core-0.3.15 "alloc,std" @@ -13,7 +13,7 @@ [package] edition = "2018" name = "futures-channel" -version = "0.3.13" +version = "0.3.15" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "Channels for asynchronous communication using futures-rs.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -24,11 +24,11 @@ repository = "https://github.com/rust-lang/futures-rs" all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-core] -version = "0.3.13" +version = "0.3.15" default-features = false [dependencies.futures-sink] -version = "0.3.13" +version = "0.3.15" optional = true default-features = false @@ -36,8 +36,8 @@ default-features = false [features] alloc = ["futures-core/alloc"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] +cfg-target-has-atomic = [] default = ["std"] sink = ["futures-sink"] std = ["alloc", "futures-core/std"] -unstable = ["futures-core/unstable"] +unstable = [] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 9a33320..fae78a0 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures-channel" edition = "2018" -version = "0.3.13" +version = "0.3.15" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -17,15 +17,14 @@ std = ["alloc", "futures-core/std"] alloc = ["futures-core/alloc"] sink = ["futures-sink"] -# Unstable features -# These features are outside of the normal semver guarantees and require the -# `unstable` feature as an explicit opt-in to unstable API. -unstable = ["futures-core/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] +# These features are no longer used. +# TODO: remove in the next major version. +unstable = [] +cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.15", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.15", default-features = false, optional = true } [dev-dependencies] futures = { path = "../futures", default-features = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate" + value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.15.crate" } - version: "0.3.13" + version: "0.3.15" license_type: NOTICE last_upgrade_date { year: 2021 - month: 4 - day: 1 + month: 5 + day: 19 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index 6798806..7facd80 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -2,55 +2,157 @@ { "presubmit": [ { + "name": "ZipFuseTest" + }, + { + "name": "anyhow_device_test_src_lib" + }, + { + "name": "anyhow_device_test_tests_test_autotrait" + }, + { "name": "anyhow_device_test_tests_test_boxed" }, { + "name": "anyhow_device_test_tests_test_chain" + }, + { + "name": "anyhow_device_test_tests_test_context" + }, + { "name": "anyhow_device_test_tests_test_convert" }, { + "name": "anyhow_device_test_tests_test_downcast" + }, + { "name": "anyhow_device_test_tests_test_ffi" }, { - "name": "anyhow_device_test_tests_test_repr" + "name": "anyhow_device_test_tests_test_fmt" }, { - "name": "tokio-test_device_test_tests_block_on" + "name": "anyhow_device_test_tests_test_macros" }, { - "name": "anyhow_device_test_tests_test_chain" + "name": "anyhow_device_test_tests_test_repr" }, { "name": "anyhow_device_test_tests_test_source" }, { + "name": "authfs_device_test_src_lib" + }, + { + "name": "futures-util_device_test_src_lib" + }, + { + "name": "tokio-test_device_test_src_lib" + }, + { + "name": "tokio-test_device_test_tests_block_on" + }, + { "name": "tokio-test_device_test_tests_io" }, { - "name": "anyhow_device_test_tests_test_autotrait" + "name": "tokio-test_device_test_tests_macros" }, { - "name": "anyhow_device_test_src_lib" + "name": "tokio_device_test_tests_buffered" }, { - "name": "anyhow_device_test_tests_test_context" + "name": "tokio_device_test_tests_io_async_read" }, { - "name": "anyhow_device_test_tests_test_downcast" + "name": "tokio_device_test_tests_io_copy_bidirectional" }, { - "name": "anyhow_device_test_tests_test_macros" + "name": "tokio_device_test_tests_io_lines" }, { - "name": "futures-util_device_test_src_lib" + "name": "tokio_device_test_tests_io_mem_stream" }, { - "name": "anyhow_device_test_tests_test_fmt" + "name": "tokio_device_test_tests_io_read" }, { - "name": "tokio-test_device_test_tests_macros" + "name": "tokio_device_test_tests_io_read_buf" }, { - "name": "tokio-test_device_test_src_lib" + "name": "tokio_device_test_tests_io_read_to_end" + }, + { + "name": "tokio_device_test_tests_io_take" + }, + { + "name": "tokio_device_test_tests_io_write" + }, + { + "name": "tokio_device_test_tests_io_write_all" + }, + { + "name": "tokio_device_test_tests_io_write_buf" + }, + { + "name": "tokio_device_test_tests_io_write_int" + }, + { + "name": "tokio_device_test_tests_macros_join" + }, + { + "name": "tokio_device_test_tests_no_rt" + }, + { + "name": "tokio_device_test_tests_rt_basic" + }, + { + "name": "tokio_device_test_tests_rt_threaded" + }, + { + "name": "tokio_device_test_tests_sync_barrier" + }, + { + "name": "tokio_device_test_tests_sync_broadcast" + }, + { + "name": "tokio_device_test_tests_sync_errors" + }, + { + "name": "tokio_device_test_tests_sync_mpsc" + }, + { + "name": "tokio_device_test_tests_sync_mutex_owned" + }, + { + "name": "tokio_device_test_tests_sync_rwlock" + }, + { + "name": "tokio_device_test_tests_sync_watch" + }, + { + "name": "tokio_device_test_tests_task_local" + }, + { + "name": "tokio_device_test_tests_task_local_set" + }, + { + "name": "tokio_device_test_tests_tcp_accept" + }, + { + "name": "tokio_device_test_tests_tcp_echo" + }, + { + "name": "tokio_device_test_tests_tcp_into_std" + }, + { + "name": "tokio_device_test_tests_tcp_shutdown" + }, + { + "name": "tokio_device_test_tests_time_rt" + }, + { + "name": "tokio_device_test_tests_uds_split" } ] } diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index e22fe60..7c3c3d3 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -7,8 +7,8 @@ use { futures::{ channel::mpsc::{self, Sender, UnboundedSender}, ready, - stream::{Stream, StreamExt}, sink::Sink, + stream::{Stream, StreamExt}, task::{Context, Poll}, }, futures_test::task::noop_context, @@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) { // 1000 iterations to avoid measuring overhead of initialization // Result should be divided by 1000 for i in 0..1000 { - // Poll, not ready, park assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); @@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) { }) } - /// A Stream that continuously sends incrementing number of the queue struct TestSender { tx: Sender<u32>, @@ -84,9 +82,7 @@ struct TestSender { impl Stream for TestSender { type Item = u32; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Option<Self::Item>> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = &mut *self; let mut tx = Pin::new(&mut this.tx); @@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) { // Each sender can send one item after specified capacity let (tx, mut rx) = mpsc::channel(0); - let mut tx: Vec<_> = (0..100).map(|_| { - TestSender { - tx: tx.clone(), - last: 0 - } - }).collect(); + let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); for i in 0..10 { for x in &mut tx { diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..c4f341d --- /dev/null +++ b/build.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does not have atomic CAS (compare-and-swap). +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg strings below are +// *not* public API. Please let us know by opening a GitHub issue if your build +// environment requires some way to enable these cfgs other than by executing +// our build script. +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs new file mode 100644 index 0000000..0819af1 --- /dev/null +++ b/no_atomic_cas.rs @@ -0,0 +1,11 @@ +// This file is @generated by no_atomic_cas.sh. +// It is not intended for manual editing. + +const NO_ATOMIC_CAS_TARGETS: &[&str] = &[ + "avr-unknown-gnu-atmega328", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv6m-none-eabi", +]; @@ -11,22 +11,16 @@ //! All items are only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] - #![cfg_attr(not(feature = "std"), no_std)] - #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. #![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - macro_rules! cfg_target_has_atomic { ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] $item )*}; } diff --git a/src/lock.rs b/src/lock.rs index 5eecdd9..b328d0f 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -6,8 +6,8 @@ use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic::Ordering::SeqCst; use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; /// A "mutex" around a value, similar to `std::sync::Mutex<T>`. /// @@ -37,10 +37,7 @@ unsafe impl<T: Send> Sync for Lock<T> {} impl<T> Lock<T> { /// Creates a new lock around the given value. pub(crate) fn new(t: T) -> Self { - Self { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } + Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } } /// Attempts to acquire this lock, returning whether the lock was acquired or diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs index dd50343..28612da 100644 --- a/src/mpsc/mod.rs +++ b/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll, Waker}; use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; use std::fmt; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,9 +209,7 @@ impl SendError { impl<T> fmt::Debug for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError") - .field("kind", &self.err.kind) - .finish() + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() } } @@ -251,8 +249,7 @@ impl<T> TrySendError<T> { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError") - .finish() + f.debug_tuple("TryRecvError").finish() } } @@ -335,10 +332,7 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { - task: None, - is_parked: false, - } + Self { task: None, is_parked: false } } fn notify(&mut self) { @@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { maybe_parked: false, }; - let rx = Receiver { - inner: Some(inner), - }; + let rx = Receiver { inner: Some(inner) }; (Sender(Some(tx)), rx) } @@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { - let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { - inner: inner.clone(), - }; + let tx = UnboundedSenderInner { inner: inner.clone() }; - let rx = UnboundedReceiver { - inner: Some(inner), - }; + let rx = UnboundedReceiver { inner: Some(inner) }; (UnboundedSender(Some(tx)), rx) } @@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })) + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) } } - // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> { fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Full, - }, - val: msg, - }); + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); } // The channel has capacity to accept the message, so send it @@ -531,9 +511,7 @@ impl<T> BoundedSenderInner<T> { // Do the send without failing. // Can be called only by bounded sender. #[allow(clippy::debug_assert_with_mut_call)] - fn do_send_b(&mut self, msg: T) - -> Result<(), TrySendError<T>> - { + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { // Anyone callig do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -551,12 +529,12 @@ impl<T> BoundedSenderInner<T> { // the configured buffer size num_messages > self.inner.buffer } - None => return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }), + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } }; // If the channel has reached capacity, then the sender task needs to @@ -600,16 +578,17 @@ impl<T> BoundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -644,15 +623,10 @@ impl<T> BoundedSenderInner<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })); + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); } self.poll_unparked(Some(cx)).map(Ok) @@ -699,7 +673,7 @@ impl<T> BoundedSenderInner<T> { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()) + return Poll::Ready(()); } // At this point, an unpark request is pending, so there will be an @@ -724,12 +698,7 @@ impl<T> Sender<T> { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } } @@ -739,8 +708,7 @@ impl<T> Sender<T> { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg) - .map_err(|e| e.err) + self.try_send(msg).map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -755,13 +723,8 @@ impl<T> Sender<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_mut().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready(cx) } @@ -799,7 +762,10 @@ impl<T> Sender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -809,13 +775,8 @@ impl<T> Sender<T> { impl<T> UnboundedSender<T> { /// Check if the channel is ready to receive a message. - pub fn poll_ready( - &self, - _: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_ref().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready_nb() } @@ -845,12 +806,7 @@ impl<T> UnboundedSender<T> { } } - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } /// Send a message on the channel. @@ -858,8 +814,7 @@ impl<T> UnboundedSender<T> { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg) - .map_err(|e| e.err) + self.do_send_nb(msg).map_err(|e| e.err) } /// Sends a message along this channel. @@ -888,7 +843,10 @@ impl<T> UnboundedSender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -928,9 +886,7 @@ impl<T> Clone for UnboundedSenderInner<T> { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { - inner: self.inner.clone(), - }; + return Self { inner: self.inner.clone() }; } Err(actual) => curr = actual, } @@ -1021,19 +977,22 @@ impl<T> Receiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1098,18 +1057,15 @@ impl<T> FusedStream for Receiver<T> { impl<T> Stream for Receiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { - // Try to read a message off of the message queue. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1169,19 +1125,22 @@ impl<T> UnboundedReceiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1230,10 +1189,7 @@ impl<T> FusedStream for UnboundedReceiver<T> { impl<T> Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1241,7 +1197,7 @@ impl<T> Stream for UnboundedReceiver<T> { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1339,10 +1295,7 @@ impl State { */ fn decode_state(num: usize) -> State { - State { - is_open: num & OPEN_MASK == OPEN_MASK, - num_messages: num & MAX_CAPACITY, - } + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } } fn encode_state(state: &State) -> usize { diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs index b00e1b1..57dc7f5 100644 --- a/src/mpsc/queue.rs +++ b/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; -use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; /// A result of the `pop` function. pub(super) enum PopResult<T> { @@ -76,15 +76,12 @@ pub(super) struct Queue<T> { tail: UnsafeCell<*mut Node<T>>, } -unsafe impl<T: Send> Send for Queue<T> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Self { - Box::into_raw(Box::new(Self { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) } } @@ -93,10 +90,7 @@ impl<T> Queue<T> { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -133,7 +127,11 @@ impl<T> Queue<T> { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs index 4ce66b4..1be2016 100644 --- a/src/mpsc/sink_impl.rs +++ b/src/mpsc/sink_impl.rs @@ -6,24 +6,15 @@ use std::pin::Pin; impl<T> Sink<T> for Sender<T> { type Error = SendError; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { (*self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> { } } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> { impl<T> Sink<T> for UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Self::poll_ready(&*self, cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> { impl<T> Sink<T> for &UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg) - .map_err(TrySendError::into_send_error) + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/src/oneshot.rs b/src/oneshot.rs index dbbce81..5af651b 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -7,7 +7,7 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use crate::lock::Lock; @@ -16,7 +16,6 @@ use crate::lock::Lock; /// /// This is created by the [`channel`](channel) function. #[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] pub struct Receiver<T> { inner: Arc<Inner<T>>, } @@ -24,7 +23,6 @@ pub struct Receiver<T> { /// A means of transmitting a single value to another task. /// /// This is created by the [`channel`](channel) function. -#[derive(Debug)] pub struct Sender<T> { inner: Arc<Inner<T>>, } @@ -35,7 +33,6 @@ impl<T> Unpin for Sender<T> {} /// Internal state of the `Receiver`/`Sender` pair above. This is all used as /// the internal synchronization between the two for send/recv operations. -#[derive(Debug)] struct Inner<T> { /// Indicates whether this oneshot is complete yet. This is filled in both /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it @@ -106,12 +103,8 @@ struct Inner<T> { /// ``` pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Arc::new(Inner::new()); - let receiver = Receiver { - inner: inner.clone(), - }; - let sender = Sender { - inner, - }; + let receiver = Receiver { inner: inner.clone() }; + let sender = Sender { inner }; (sender, receiver) } @@ -127,7 +120,7 @@ impl<T> Inner<T> { fn send(&self, t: T) -> Result<(), T> { if self.complete.load(SeqCst) { - return Err(t) + return Err(t); } // Note that this lock acquisition may fail if the receiver @@ -164,7 +157,7 @@ impl<T> Inner<T> { // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. if self.complete.load(SeqCst) { - return Poll::Ready(()) + return Poll::Ready(()); } // If our other half is not gone then we need to park our current task @@ -273,7 +266,10 @@ impl<T> Inner<T> { } else { let task = cx.waker().clone(); match self.rx_task.try_lock() { - Some(mut slot) => { *slot = Some(task); false }, + Some(mut slot) => { + *slot = Some(task); + false + } None => true, } }; @@ -394,6 +390,12 @@ impl<T> Drop for Sender<T> { } } +impl<T: fmt::Debug> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").field("complete", &self.inner.complete).finish() + } +} + /// A future that resolves when the receiving end of a channel has hung up. /// /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). @@ -453,10 +455,7 @@ impl<T> Receiver<T> { impl<T> Future for Receiver<T> { type Output = Result<T, Canceled>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<T, Canceled>> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { self.inner.recv(cx) } } @@ -481,3 +480,9 @@ impl<T> Drop for Receiver<T> { self.inner.drop_rx() } } + +impl<T: fmt::Debug> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() + } +} diff --git a/tests/channel.rs b/tests/channel.rs index 73dac64..5f01a8e 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,8 +1,8 @@ use futures::channel::mpsc; use futures::executor::block_on; use futures::future::poll_fn; -use futures::stream::StreamExt; use futures::sink::SinkExt; +use futures::stream::StreamExt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; @@ -11,9 +11,7 @@ fn sequence() { let (tx, rx) = mpsc::channel(1); let amt = 20; - let t = thread::spawn(move || { - block_on(send_sequence(amt, tx)) - }); + let t = thread::spawn(move || block_on(send_sequence(amt, tx))); let list: Vec<_> = block_on(rx.collect()); let mut list = list.into_iter(); for i in (1..=amt).rev() { @@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { fn drop_sender() { let (tx, mut rx) = mpsc::channel::<u32>(1); drop(tx); - let f = poll_fn(|cx| { - rx.poll_next_unpin(cx) - }); + let f = poll_fn(|cx| rx.poll_next_unpin(cx)); assert_eq!(block_on(f), None) } diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 9eb5296..81203d3 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -13,9 +13,7 @@ use std::time::{Duration, Instant}; fn smoke() { let (mut sender, receiver) = mpsc::channel(1); - let t = thread::spawn(move || { - while let Ok(()) = block_on(sender.send(42)) {} - }); + let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); @@ -166,7 +164,7 @@ fn stress_try_send_as_receiver_closes() { struct TestRx { rx: mpsc::Receiver<Arc<()>>, // The number of times to query `rx` before dropping it. - poll_count: usize + poll_count: usize, } struct TestTask { command_rx: mpsc::Receiver<TestRx>, @@ -190,14 +188,11 @@ fn stress_try_send_as_receiver_closes() { impl Future for TestTask { type Output = (); - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Poll the test channel, if one is present. if let Some(rx) = &mut self.test_rx { if let Poll::Ready(v) = rx.poll_next_unpin(cx) { - let _ = v.expect("test finished unexpectedly!"); + let _ = v.expect("test finished unexpectedly!"); } self.countdown -= 1; // Busy-poll until the countdown is finished. @@ -209,9 +204,9 @@ fn stress_try_send_as_receiver_closes() { self.test_rx = Some(rx); self.countdown = poll_count; cx.waker().wake_by_ref(); - }, + } Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => {}, + Poll::Pending => {} } if self.countdown == 0 { // Countdown complete -- drop the Receiver. @@ -255,10 +250,14 @@ fn stress_try_send_as_receiver_closes() { if prev_weak.upgrade().is_none() { break; } - assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + assert!( + t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), "item not dropped on iteration {} after \ {} sends ({} successful). spin=({})", - i, attempted_sends, successful_sends, spins + i, + attempted_sends, + successful_sends, + spins ); spins += 1; thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); @@ -273,6 +272,27 @@ fn stress_try_send_as_receiver_closes() { } } drop(cmd_tx); - bg.join() - .expect("background thread join"); + bg.join().expect("background thread join"); +} + +#[test] +fn unbounded_try_next_after_none() { + let (tx, mut rx) = mpsc::unbounded::<String>(); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} + +#[test] +fn bounded_try_next_after_none() { + let (tx, mut rx) = mpsc::channel::<String>(17); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); } diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 61c5a50..88cdef1 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -1,13 +1,13 @@ use futures::channel::{mpsc, oneshot}; use futures::executor::{block_on, block_on_stream}; -use futures::future::{FutureExt, poll_fn}; -use futures::stream::{Stream, StreamExt}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; use futures::task::{Context, Poll}; -use futures::pin_mut; use futures_test::task::{new_count_waker, noop_context}; -use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread; trait AssertSend: Send {} @@ -77,7 +77,7 @@ fn send_shared_recv() { fn send_recv_threads() { let (mut tx, rx) = mpsc::channel::<i32>(16); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { block_on(tx.send(1)).unwrap(); }); @@ -204,7 +204,7 @@ fn stress_shared_unbounded() { const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -215,7 +215,7 @@ fn stress_shared_unbounded() { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { for _ in 0..AMT { tx.unbounded_send(1).unwrap(); } @@ -233,7 +233,7 @@ fn stress_shared_bounded_hard() { const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::<i32>(0); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -297,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() { } Poll::Ready(None) => { *rx_opt = None; - break - }, - Poll::Pending => {}, + break; + } + Poll::Pending => {} } } } else { @@ -311,7 +311,6 @@ fn stress_receiver_multi_task_bounded_hard() { th.push(t); } - for i in 0..AMT { block_on(tx.send(i)).unwrap(); } @@ -328,7 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - fn list() -> impl Stream<Item=i32> { + fn list() -> impl Stream<Item = i32> { let (tx, rx) = mpsc::channel(1); thread::spawn(move || { block_on(send_one_two_three(tx)); @@ -407,9 +406,7 @@ fn stress_poll_ready() { let mut threads = Vec::new(); for _ in 0..NTHREADS { let sender = tx.clone(); - threads.push(thread::spawn(move || { - block_on(stress_poll_ready_sender(sender, AMT)) - })); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); } drop(tx); @@ -436,7 +433,7 @@ fn try_send_1() { for i in 0..N { loop { if tx.try_send(i).is_ok() { - break + break; } } } @@ -542,8 +539,8 @@ fn is_connected_to() { #[test] fn hash_receiver() { - use std::hash::Hasher; use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; let mut hasher_a1 = DefaultHasher::new(); let mut hasher_a2 = DefaultHasher::new(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index a22d039..979cd8a 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,6 +1,6 @@ use futures::channel::oneshot::{self, Sender}; use futures::executor::block_on; -use futures::future::{FutureExt, poll_fn}; +use futures::future::{poll_fn, FutureExt}; use futures::task::{Context, Poll}; use futures_test::task::panic_waker_ref; use std::sync::mpsc; @@ -70,7 +70,7 @@ fn close() { rx.close(); block_on(poll_fn(|cx| { match rx.poll_unpin(cx) { - Poll::Ready(Err(_)) => {}, + Poll::Ready(Err(_)) => {} _ => panic!(), }; assert!(tx.poll_canceled(cx).is_ready()); |