diff options
41 files changed, 800 insertions, 475 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 08632f0..f15a046 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "d9dfc9e1ffabcb3c01addad14878f16c2795c371" - } -} + "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83" + }, + "path_in_vcs": "crossbeam-channel" +}
\ No newline at end of file @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --run --device --dependencies. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -44,9 +44,10 @@ license { rust_library { name: "libcrossbeam_channel", - // has rustc warnings host_supported: true, crate_name: "crossbeam_channel", + cargo_env_compat: true, + cargo_pkg_version: "0.5.2", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -59,9 +60,3 @@ rust_library { "libcrossbeam_utils", ], } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// cfg-if-1.0.0 -// crossbeam-utils-0.8.3 "lazy_static,std" -// lazy_static-1.4.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a53e8a..6bfd923 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# Version 0.5.2 + +- Fix stacked borrows violations. (#763, #764) + +# Version 0.5.1 + +- Fix memory leak in unbounded channel. (#669) + # Version 0.5.0 - Bump the minimum supported Rust version to 1.36. @@ -1,22 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "arc-swap" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" - -[[package]] -name = "autocfg" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" - -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +version = 3 [[package]] name = "cfg-if" @@ -25,16 +9,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "const_fn" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2" - -[[package]] name = "crossbeam-channel" -version = "0.5.0" +version = "0.5.2" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", "num_cpus", "rand", @@ -43,32 +21,30 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.0" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5" +checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" dependencies = [ - "autocfg", - "cfg-if 1.0.0", - "const_fn", + "cfg-if", "lazy_static", ] [[package]] name = "getrandom" -version = "0.1.15" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if 0.1.10", + "cfg-if", "libc", "wasi", ] [[package]] name = "hermit-abi" -version = "0.1.17" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] @@ -81,15 +57,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.79" +version = "0.2.112" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743" +checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" [[package]] name = "num_cpus" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" dependencies = [ "hermit-abi", "libc", @@ -97,17 +73,16 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.9" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "rand" -version = "0.7.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ - "getrandom", "libc", "rand_chacha", "rand_core", @@ -116,9 +91,9 @@ dependencies = [ [[package]] name = "rand_chacha" -version = "0.2.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", "rand_core", @@ -126,27 +101,27 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.5.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ "getrandom", ] [[package]] name = "rand_hc" -version = "0.2.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" dependencies = [ "rand_core", ] [[package]] name = "signal-hook" -version = "0.1.16" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604508c1418b99dfe1925ca9224829bb2a8a9a04dda655cc01fcad46f4ab05ed" +checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" dependencies = [ "libc", "signal-hook-registry", @@ -154,16 +129,15 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" dependencies = [ - "arc-swap", "libc", ] [[package]] name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" +version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" @@ -3,22 +3,19 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" +rust-version = "1.36" name = "crossbeam-channel" -version = "0.5.0" -authors = ["The Crossbeam Project Developers"] +version = "0.5.2" description = "Multi-producer multi-consumer channels for message passing" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel" -documentation = "https://docs.rs/crossbeam-channel" -readme = "README.md" keywords = ["channel", "mpmc", "select", "golang", "message"] categories = ["algorithms", "concurrency", "data-structures"] license = "MIT OR Apache-2.0" @@ -34,10 +31,10 @@ default-features = false version = "1.13.0" [dev-dependencies.rand] -version = "0.7.3" +version = "0.8" [dev-dependencies.signal-hook] -version = "0.1.15" +version = "0.3" [features] default = ["std"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 2931d21..640a808 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,14 +4,12 @@ name = "crossbeam-channel" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-channel-X.Y.Z" git tag -version = "0.5.0" -authors = ["The Crossbeam Project Developers"] +version = "0.5.2" edition = "2018" +rust-version = "1.36" license = "MIT OR Apache-2.0" -readme = "README.md" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel" -documentation = "https://docs.rs/crossbeam-channel" description = "Multi-producer multi-consumer channels for message passing" keywords = ["channel", "mpmc", "select", "golang", "message"] categories = ["algorithms", "concurrency", "data-structures"] @@ -21,6 +19,8 @@ default = ["std"] # Enable to use APIs that require `std`. # This is enabled by default. +# +# NOTE: Disabling `std` feature is not supported yet. std = ["crossbeam-utils/std"] [dependencies] @@ -34,5 +34,5 @@ optional = true [dev-dependencies] num_cpus = "1.13.0" -rand = "0.7.3" -signal-hook = "0.1.15" +rand = "0.8" +signal-hook = "0.3" diff --git a/LICENSE-THIRD-PARTY b/LICENSE-THIRD-PARTY index d15e32b..ed4df76 100644 --- a/LICENSE-THIRD-PARTY +++ b/LICENSE-THIRD-PARTY @@ -1,37 +1,5 @@ =============================================================================== -Bounded MPMC queue -http://www.1024cores.net/home/code-license - -Copyright (c) 2010-2011 Dmitry Vyukov. -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO -EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those -of the authors and should not be interpreted as representing official policies, -either expressed or implied, of Dmitry Vyukov. - -=============================================================================== - matching.go https://creativecommons.org/licenses/by/3.0/legalcode @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.0.crate" + value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate" } - version: "0.5.0" + version: "0.5.2" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 21 + year: 2022 + month: 3 + day: 1 } } @@ -2,7 +2,7 @@ [![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( https://github.com/crossbeam-rs/crossbeam/actions) -[![License](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)]( +[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel#license) [![Cargo](https://img.shields.io/crates/v/crossbeam-channel.svg)]( https://crates.io/crates/crossbeam-channel) @@ -10,7 +10,7 @@ https://crates.io/crates/crossbeam-channel) https://docs.rs/crossbeam-channel) [![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)]( https://www.rust-lang.org) -[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.gg/BBYwKq) +[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) This crate provides multi-producer multi-consumer channels for message passing. It is an alternative to [`std::sync::mpsc`] with more features and better performance. @@ -41,7 +41,7 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -crossbeam-channel = "0.4" +crossbeam-channel = "0.5" ``` ## Compatibility @@ -73,10 +73,6 @@ This product includes copies and modifications of software developed by third pa [matching.go](http://www.nada.kth.se/~snilsson/concurrency/src/matching.go) by Stefan Nilsson, licensed under Creative Commons Attribution 3.0 Unported License. -* [src/flavors/array.rs](src/flavors/array.rs) is based on - [Bounded MPMC queue](http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue) - by Dmitry Vyukov, licensed under the Simplified BSD License and the Apache License, Version 2.0. - * [tests/mpsc.rs](tests/mpsc.rs) includes modifications of code from The Rust Programming Language, licensed under the MIT License and the Apache License, Version 2.0. diff --git a/TEST_MAPPING b/TEST_MAPPING new file mode 100644 index 0000000..3cbd48d --- /dev/null +++ b/TEST_MAPPING @@ -0,0 +1,17 @@ +// Generated by update_crate_tests.py for tests that depend on this crate. +{ + "imports": [ + { + "path": "external/rust/crates/base64" + }, + { + "path": "external/rust/crates/tinytemplate" + }, + { + "path": "external/rust/crates/tinyvec" + }, + { + "path": "external/rust/crates/unicode-xid" + } + ] +} diff --git a/benches/crossbeam.rs b/benches/crossbeam.rs index 9870c98..1c05222 100644 --- a/benches/crossbeam.rs +++ b/benches/crossbeam.rs @@ -13,7 +13,7 @@ mod unbounded { #[bench] fn create(b: &mut Bencher) { - b.iter(|| unbounded::<i32>()); + b.iter(unbounded::<i32>); } #[bench] diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..bf78496 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,4 @@ +{ + "device": true, + "run": true +}
\ No newline at end of file diff --git a/examples/fibonacci.rs b/examples/fibonacci.rs index cf22b7a..e6f5e89 100644 --- a/examples/fibonacci.rs +++ b/examples/fibonacci.rs @@ -10,7 +10,7 @@ fn fibonacci(sender: Sender<u64>) { while sender.send(x).is_ok() { let tmp = x; x = y; - y = tmp + y; + y += tmp; } } diff --git a/examples/stopwatch.rs b/examples/stopwatch.rs index 6a67c9e..3a7578e 100644 --- a/examples/stopwatch.rs +++ b/examples/stopwatch.rs @@ -12,13 +12,13 @@ fn main() { use std::time::{Duration, Instant}; use crossbeam_channel::{bounded, select, tick, Receiver}; + use signal_hook::consts::SIGINT; use signal_hook::iterator::Signals; - use signal_hook::SIGINT; // Creates a channel that gets a message every time `SIGINT` is signalled. fn sigint_notifier() -> io::Result<Receiver<()>> { let (s, r) = bounded(100); - let signals = Signals::new(&[SIGINT])?; + let mut signals = Signals::new(&[SIGINT])?; thread::spawn(move || { for _ in signals.forever() { @@ -33,11 +33,7 @@ fn main() { // Prints the elapsed time. fn show(dur: Duration) { - println!( - "Elapsed: {}.{:03} sec", - dur.as_secs(), - dur.subsec_nanos() / 1_000_000 - ); + println!("Elapsed: {}.{:03} sec", dur.as_secs(), dur.subsec_millis()); } let start = Instant::now(); diff --git a/src/channel.rs b/src/channel.rs index ebcd652..8988235 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -257,8 +257,6 @@ pub fn at(when: Instant) -> Receiver<Instant> { /// recv(timeout) -> _ => println!("timed out"), /// } /// ``` -/// -/// [`select!`]: macro.select.html pub fn never<T>() -> Receiver<T> { Receiver { flavor: ReceiverFlavor::Never(flavors::never::Channel::new()), @@ -645,7 +643,7 @@ impl<T> Drop for Sender<T> { unsafe { match &self.flavor { SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), - SenderFlavor::List(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } } @@ -1137,7 +1135,7 @@ impl<T> Drop for Receiver<T> { unsafe { match &self.flavor { ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), - ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::At(_) => {} ReceiverFlavor::Tick(_) => {} @@ -1485,7 +1483,7 @@ impl<T> SelectHandle for Receiver<T> { } /// Writes a message into the channel. -pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> { +pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> { match &s.flavor { SenderFlavor::Array(chan) => chan.write(token, msg), SenderFlavor::List(chan) => chan.write(token, msg), @@ -1494,7 +1492,7 @@ pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T } /// Reads a message from the channel. -pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> { +pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> { match &r.flavor { ReceiverFlavor::Array(chan) => chan.read(token), ReceiverFlavor::List(chan) => chan.read(token), diff --git a/src/context.rs b/src/context.rs index e2e8480..7467b80 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,7 +1,8 @@ //! Thread-local context used in select. use std::cell::Cell; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::ptr; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::{self, Thread, ThreadId}; use std::time::Instant; @@ -11,6 +12,7 @@ use crossbeam_utils::Backoff; use crate::select::Selected; /// Thread-local context used in select. +// This is a private API that is used by the select macro. #[derive(Debug, Clone)] pub struct Context { inner: Arc<Inner>, @@ -23,7 +25,7 @@ struct Inner { select: AtomicUsize, /// A slot into which another thread may store a pointer to its `Packet`. - packet: AtomicUsize, + packet: AtomicPtr<()>, /// Thread handle. thread: Thread, @@ -45,7 +47,7 @@ impl Context { } let mut f = Some(f); - let mut f = move |cx: &Context| -> R { + let mut f = |cx: &Context| -> R { let f = f.take().unwrap(); f(cx) }; @@ -69,7 +71,7 @@ impl Context { Context { inner: Arc::new(Inner { select: AtomicUsize::new(Selected::Waiting.into()), - packet: AtomicUsize::new(0), + packet: AtomicPtr::new(ptr::null_mut()), thread: thread::current(), thread_id: thread::current().id(), }), @@ -82,7 +84,7 @@ impl Context { self.inner .select .store(Selected::Waiting.into(), Ordering::Release); - self.inner.packet.store(0, Ordering::Release); + self.inner.packet.store(ptr::null_mut(), Ordering::Release); } /// Attempts to select an operation. @@ -112,19 +114,19 @@ impl Context { /// /// This method must be called after `try_select` succeeds and there is a packet to provide. #[inline] - pub fn store_packet(&self, packet: usize) { - if packet != 0 { + pub fn store_packet(&self, packet: *mut ()) { + if !packet.is_null() { self.inner.packet.store(packet, Ordering::Release); } } /// Waits until a packet is provided and returns it. #[inline] - pub fn wait_packet(&self) -> usize { + pub fn wait_packet(&self) -> *mut () { let backoff = Backoff::new(); loop { let packet = self.inner.packet.load(Ordering::Acquire); - if packet != 0 { + if !packet.is_null() { return packet; } backoff.snooze(); diff --git a/src/counter.rs b/src/counter.rs index 2eaf067..2c27f7c 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -21,7 +21,7 @@ struct Counter<C> { } /// Wraps a channel into the reference counter. -pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { +pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { let counter = Box::into_raw(Box::new(Counter { senders: AtomicUsize::new(1), receivers: AtomicUsize::new(1), @@ -34,7 +34,7 @@ pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { } /// The sending side. -pub struct Sender<C> { +pub(crate) struct Sender<C> { counter: *mut Counter<C>, } @@ -45,7 +45,7 @@ impl<C> Sender<C> { } /// Acquires another sender reference. - pub fn acquire(&self) -> Sender<C> { + pub(crate) fn acquire(&self) -> Sender<C> { let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); // Cloning senders and calling `mem::forget` on the clones could potentially overflow the @@ -63,7 +63,7 @@ impl<C> Sender<C> { /// Releases the sender reference. /// /// Function `disconnect` will be called if this is the last sender reference. - pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { + pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { disconnect(&self.counter().chan); @@ -89,7 +89,7 @@ impl<C> PartialEq for Sender<C> { } /// The receiving side. -pub struct Receiver<C> { +pub(crate) struct Receiver<C> { counter: *mut Counter<C>, } @@ -100,7 +100,7 @@ impl<C> Receiver<C> { } /// Acquires another receiver reference. - pub fn acquire(&self) -> Receiver<C> { + pub(crate) fn acquire(&self) -> Receiver<C> { let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the @@ -118,7 +118,7 @@ impl<C> Receiver<C> { /// Releases the receiver reference. /// /// Function `disconnect` will be called if this is the last receiver reference. - pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { + pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { disconnect(&self.counter().chan); diff --git a/src/flavors/array.rs b/src/flavors/array.rs index 323a200..871768c 100644 --- a/src/flavors/array.rs +++ b/src/flavors/array.rs @@ -5,17 +5,12 @@ //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. //! //! Source: -//! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue -//! - https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub -//! -//! Copyright & License: -//! - Copyright (c) 2010-2011 Dmitry Vyukov -//! - Simplified BSD License and Apache License, Version 2.0 -//! - http://www.1024cores.net/home/code-license +//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> +//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> use std::cell::UnsafeCell; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::time::Instant; @@ -57,7 +52,7 @@ impl Default for ArrayToken { } /// Bounded channel based on a preallocated array. -pub struct Channel<T> { +pub(crate) struct Channel<T> { /// The head of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but @@ -100,7 +95,7 @@ pub struct Channel<T> { impl<T> Channel<T> { /// Creates a bounded channel of capacity `cap`. - pub fn with_capacity(cap: usize) -> Self { + pub(crate) fn with_capacity(cap: usize) -> Self { assert!(cap > 0, "capacity must be positive"); // Compute constants `mark_bit` and `one_lap`. @@ -115,7 +110,7 @@ impl<T> Channel<T> { // Allocate a buffer of `cap` slots initialized // with stamps. let buffer = { - let mut boxed: Box<[Slot<T>]> = (0..cap) + let boxed: Box<[Slot<T>]> = (0..cap) .map(|i| { // Set the stamp to `{ lap: 0, mark: 0, index: i }`. Slot { @@ -124,9 +119,7 @@ impl<T> Channel<T> { } }) .collect(); - let ptr = boxed.as_mut_ptr(); - mem::forget(boxed); - ptr + Box::into_raw(boxed) as *mut Slot<T> }; Channel { @@ -143,12 +136,12 @@ impl<T> Channel<T> { } /// Returns a receiver handle to the channel. - pub fn receiver(&self) -> Receiver<'_, T> { + pub(crate) fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. - pub fn sender(&self) -> Sender<'_, T> { + pub(crate) fn sender(&self) -> Sender<'_, T> { Sender(self) } @@ -224,7 +217,7 @@ impl<T> Channel<T> { } /// Writes a message into the channel. - pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.array.slot.is_null() { return Err(msg); @@ -314,7 +307,7 @@ impl<T> Channel<T> { } /// Reads a message from the channel. - pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { if token.array.slot.is_null() { // The channel is disconnected. return Err(()); @@ -332,7 +325,7 @@ impl<T> Channel<T> { } /// Attempts to send a message into the channel. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { let token = &mut Token::default(); if self.start_send(token) { unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } @@ -342,7 +335,11 @@ impl<T> Channel<T> { } /// Sends a message into the channel. - pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + pub(crate) fn send( + &self, + msg: T, + deadline: Option<Instant>, + ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); loop { // Try sending a message several times. @@ -391,7 +388,7 @@ impl<T> Channel<T> { } /// Attempts to receive a message without blocking. - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); if self.start_recv(token) { @@ -402,7 +399,7 @@ impl<T> Channel<T> { } /// Receives a message from the channel. - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); loop { // Try receiving a message several times. @@ -453,7 +450,7 @@ impl<T> Channel<T> { } /// Returns the current number of messages inside the channel. - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { loop { // Load the tail, then load the head. let tail = self.tail.load(Ordering::SeqCst); @@ -478,14 +475,15 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - pub fn capacity(&self) -> Option<usize> { + #[allow(clippy::unnecessary_wraps)] // This is intentional. + pub(crate) fn capacity(&self) -> Option<usize> { Some(self.cap) } /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. - pub fn disconnect(&self) -> bool { + pub(crate) fn disconnect(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { @@ -498,12 +496,12 @@ impl<T> Channel<T> { } /// Returns `true` if the channel is disconnected. - pub fn is_disconnected(&self) -> bool { + pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 } /// Returns `true` if the channel is empty. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { let head = self.head.load(Ordering::SeqCst); let tail = self.tail.load(Ordering::SeqCst); @@ -515,7 +513,7 @@ impl<T> Channel<T> { } /// Returns `true` if the channel is full. - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { let tail = self.tail.load(Ordering::SeqCst); let head = self.head.load(Ordering::SeqCst); @@ -563,10 +561,10 @@ impl<T> Drop for Channel<T> { } /// Receiver handle to a channel. -pub struct Receiver<'a, T>(&'a Channel<T>); +pub(crate) struct Receiver<'a, T>(&'a Channel<T>); /// Sender handle to a channel. -pub struct Sender<'a, T>(&'a Channel<T>); +pub(crate) struct Sender<'a, T>(&'a Channel<T>); impl<T> SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { diff --git a/src/flavors/at.rs b/src/flavors/at.rs index a2b1b57..4581edb 100644 --- a/src/flavors/at.rs +++ b/src/flavors/at.rs @@ -12,10 +12,10 @@ use crate::select::{Operation, SelectHandle, Token}; use crate::utils; /// Result of a receive operation. -pub type AtToken = Option<Instant>; +pub(crate) type AtToken = Option<Instant>; /// Channel that delivers a message at a certain moment in time -pub struct Channel { +pub(crate) struct Channel { /// The instant at which the message will be delivered. delivery_time: Instant, @@ -26,7 +26,7 @@ pub struct Channel { impl Channel { /// Creates a channel that delivers a message at a certain instant in time. #[inline] - pub fn new_deadline(when: Instant) -> Self { + pub(crate) fn new_deadline(when: Instant) -> Self { Channel { delivery_time: when, received: AtomicBool::new(false), @@ -34,13 +34,13 @@ impl Channel { } /// Creates a channel that delivers a message after a certain duration of time. #[inline] - pub fn new_timeout(dur: Duration) -> Self { + pub(crate) fn new_timeout(dur: Duration) -> Self { Self::new_deadline(Instant::now() + dur) } /// Attempts to receive a message without blocking. #[inline] - pub fn try_recv(&self) -> Result<Instant, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { // We use relaxed ordering because this is just an optional optimistic check. if self.received.load(Ordering::Relaxed) { // The message has already been received. @@ -64,7 +64,7 @@ impl Channel { /// Receives a message from the channel. #[inline] - pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { // We use relaxed ordering because this is just an optional optimistic check. if self.received.load(Ordering::Relaxed) { // The message has already been received. @@ -103,13 +103,13 @@ impl Channel { /// Reads a message from the channel. #[inline] - pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { token.at.ok_or(()) } /// Returns `true` if the channel is empty. #[inline] - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { // We use relaxed ordering because this is just an optional optimistic check. if self.received.load(Ordering::Relaxed) { return true; @@ -127,13 +127,13 @@ impl Channel { /// Returns `true` if the channel is full. #[inline] - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { !self.is_empty() } /// Returns the number of messages in the channel. #[inline] - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { if self.is_empty() { 0 } else { @@ -142,8 +142,9 @@ impl Channel { } /// Returns the capacity of the channel. + #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] - pub fn capacity(&self) -> Option<usize> { + pub(crate) fn capacity(&self) -> Option<usize> { Some(1) } } diff --git a/src/flavors/list.rs b/src/flavors/list.rs index 532e8b6..5056aa4 100644 --- a/src/flavors/list.rs +++ b/src/flavors/list.rs @@ -151,7 +151,7 @@ impl Default for ListToken { /// /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and /// improve cache efficiency. -pub struct Channel<T> { +pub(crate) struct Channel<T> { /// The head of the channel. head: CachePadded<Position<T>>, @@ -167,7 +167,7 @@ pub struct Channel<T> { impl<T> Channel<T> { /// Creates a new unbounded channel. - pub fn new() -> Self { + pub(crate) fn new() -> Self { Channel { head: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), @@ -183,12 +183,12 @@ impl<T> Channel<T> { } /// Returns a receiver handle to the channel. - pub fn receiver(&self) -> Receiver<'_, T> { + pub(crate) fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. - pub fn sender(&self) -> Sender<'_, T> { + pub(crate) fn sender(&self) -> Sender<'_, T> { Sender(self) } @@ -231,8 +231,8 @@ impl<T> Channel<T> { if self .tail .block - .compare_and_swap(block, new, Ordering::Release) - == block + .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) + .is_ok() { self.head.block.store(new, Ordering::Release); block = new; @@ -276,7 +276,7 @@ impl<T> Channel<T> { } /// Writes a message into the channel. - pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.list.block.is_null() { return Err(msg); @@ -380,7 +380,7 @@ impl<T> Channel<T> { } /// Reads a message from the channel. - pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { if token.list.block.is_null() { // The channel is disconnected. return Err(()); @@ -405,7 +405,7 @@ impl<T> Channel<T> { } /// Attempts to send a message into the channel. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.send(msg, None).map_err(|err| match err { SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), SendTimeoutError::Timeout(_) => unreachable!(), @@ -413,7 +413,11 @@ impl<T> Channel<T> { } /// Sends a message into the channel. - pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + pub(crate) fn send( + &self, + msg: T, + _deadline: Option<Instant>, + ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); assert!(self.start_send(token)); unsafe { @@ -423,7 +427,7 @@ impl<T> Channel<T> { } /// Attempts to receive a message without blocking. - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); if self.start_recv(token) { @@ -434,7 +438,7 @@ impl<T> Channel<T> { } /// Receives a message from the channel. - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); loop { // Try receiving a message several times. @@ -486,7 +490,7 @@ impl<T> Channel<T> { } /// Returns the current number of messages inside the channel. - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { loop { // Load the tail index, then load the head index. let mut tail = self.tail.index.load(Ordering::SeqCst); @@ -522,14 +526,14 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - pub fn capacity(&self) -> Option<usize> { + pub(crate) fn capacity(&self) -> Option<usize> { None } - /// Disconnects the channel and wakes up all blocked receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); if tail & MARK_BIT == 0 { @@ -540,20 +544,90 @@ impl<T> Channel<T> { } } + /// Disconnects receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect_receivers(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + // If receivers are dropped first, discard all messages to free + // memory eagerly. + self.discard_all_messages(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// This method should only be called when all receivers are dropped. + fn discard_all_messages(&self) { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + loop { + let offset = (tail >> SHIFT) % LAP; + if offset != BLOCK_CAP { + break; + } + + // New updates to tail will be rejected by MARK_BIT and aborted unless it's + // at boundary. We need to wait for the updates take affect otherwise there + // can be memory leaks. + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + } + + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head >> SHIFT != tail >> SHIFT { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + (*block).wait_next(); + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Acquire); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + head &= !MARK_BIT; + self.head.block.store(ptr::null_mut(), Ordering::Release); + self.head.index.store(head, Ordering::Release); + } + /// Returns `true` if the channel is disconnected. - pub fn is_disconnected(&self) -> bool { + pub(crate) fn is_disconnected(&self) -> bool { self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 } /// Returns `true` if the channel is empty. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { let head = self.head.index.load(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::SeqCst); head >> SHIFT == tail >> SHIFT } /// Returns `true` if the channel is full. - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { false } } @@ -597,10 +671,10 @@ impl<T> Drop for Channel<T> { } /// Receiver handle to a channel. -pub struct Receiver<'a, T>(&'a Channel<T>); +pub(crate) struct Receiver<'a, T>(&'a Channel<T>); /// Sender handle to a channel. -pub struct Sender<'a, T>(&'a Channel<T>); +pub(crate) struct Sender<'a, T>(&'a Channel<T>); impl<T> SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { diff --git a/src/flavors/mod.rs b/src/flavors/mod.rs index 299e78f..0314bf4 100644 --- a/src/flavors/mod.rs +++ b/src/flavors/mod.rs @@ -9,9 +9,9 @@ //! 5. `tick` - Channel that delivers messages periodically. //! 6. `zero` - Zero-capacity channel. -pub mod array; -pub mod at; -pub mod list; -pub mod never; -pub mod tick; -pub mod zero; +pub(crate) mod array; +pub(crate) mod at; +pub(crate) mod list; +pub(crate) mod never; +pub(crate) mod tick; +pub(crate) mod zero; diff --git a/src/flavors/never.rs b/src/flavors/never.rs index e49d214..1951e96 100644 --- a/src/flavors/never.rs +++ b/src/flavors/never.rs @@ -11,17 +11,17 @@ use crate::select::{Operation, SelectHandle, Token}; use crate::utils; /// This flavor doesn't need a token. -pub type NeverToken = (); +pub(crate) type NeverToken = (); /// Channel that never delivers messages. -pub struct Channel<T> { +pub(crate) struct Channel<T> { _marker: PhantomData<T>, } impl<T> Channel<T> { /// Creates a channel that never delivers messages. #[inline] - pub fn new() -> Self { + pub(crate) fn new() -> Self { Channel { _marker: PhantomData, } @@ -29,44 +29,45 @@ impl<T> Channel<T> { /// Attempts to receive a message without blocking. #[inline] - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { Err(TryRecvError::Empty) } /// Receives a message from the channel. #[inline] - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { utils::sleep_until(deadline); Err(RecvTimeoutError::Timeout) } /// Reads a message from the channel. #[inline] - pub unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> { Err(()) } /// Returns `true` if the channel is empty. #[inline] - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { true } /// Returns `true` if the channel is full. #[inline] - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { true } /// Returns the number of messages in the channel. #[inline] - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { 0 } /// Returns the capacity of the channel. + #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] - pub fn capacity(&self) -> Option<usize> { + pub(crate) fn capacity(&self) -> Option<usize> { Some(0) } } diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs index e8e7020..d4b1f6c 100644 --- a/src/flavors/tick.rs +++ b/src/flavors/tick.rs @@ -12,10 +12,10 @@ use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; /// Result of a receive operation. -pub type TickToken = Option<Instant>; +pub(crate) type TickToken = Option<Instant>; /// Channel that delivers messages periodically. -pub struct Channel { +pub(crate) struct Channel { /// The instant at which the next message will be delivered. delivery_time: AtomicCell<Instant>, @@ -26,7 +26,7 @@ pub struct Channel { impl Channel { /// Creates a channel that delivers messages periodically. #[inline] - pub fn new(dur: Duration) -> Self { + pub(crate) fn new(dur: Duration) -> Self { Channel { delivery_time: AtomicCell::new(Instant::now() + dur), duration: dur, @@ -35,7 +35,7 @@ impl Channel { /// Attempts to receive a message without blocking. #[inline] - pub fn try_recv(&self) -> Result<Instant, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { loop { let now = Instant::now(); let delivery_time = self.delivery_time.load(); @@ -56,7 +56,7 @@ impl Channel { /// Receives a message from the channel. #[inline] - pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { loop { let delivery_time = self.delivery_time.load(); let now = Instant::now(); @@ -85,25 +85,25 @@ impl Channel { /// Reads a message from the channel. #[inline] - pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { token.tick.ok_or(()) } /// Returns `true` if the channel is empty. #[inline] - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { Instant::now() < self.delivery_time.load() } /// Returns `true` if the channel is full. #[inline] - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { !self.is_empty() } /// Returns the number of messages in the channel. #[inline] - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { if self.is_empty() { 0 } else { @@ -112,8 +112,9 @@ impl Channel { } /// Returns the capacity of the channel. + #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] - pub fn capacity(&self) -> Option<usize> { + pub(crate) fn capacity(&self) -> Option<usize> { Some(1) } } diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index be647b5..4afbd8f 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -6,6 +6,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; +use std::{fmt, ptr}; use crossbeam_utils::Backoff; @@ -16,7 +17,19 @@ use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub type ZeroToken = usize; +pub struct ZeroToken(*mut ()); + +impl Default for ZeroToken { + fn default() -> Self { + Self(ptr::null_mut()) + } +} + +impl fmt::Debug for ZeroToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&(self.0 as usize), f) + } +} /// A slot for passing one message from a sender to a receiver. struct Packet<T> { @@ -80,7 +93,7 @@ struct Inner { } /// Zero-capacity channel. -pub struct Channel<T> { +pub(crate) struct Channel<T> { /// Inner representation of the channel. inner: Spinlock<Inner>, @@ -90,7 +103,7 @@ pub struct Channel<T> { impl<T> Channel<T> { /// Constructs a new zero-capacity channel. - pub fn new() -> Self { + pub(crate) fn new() -> Self { Channel { inner: Spinlock::new(Inner { senders: Waker::new(), @@ -102,12 +115,12 @@ impl<T> Channel<T> { } /// Returns a receiver handle to the channel. - pub fn receiver(&self) -> Receiver<'_, T> { + pub(crate) fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. - pub fn sender(&self) -> Sender<'_, T> { + pub(crate) fn sender(&self) -> Sender<'_, T> { Sender(self) } @@ -117,10 +130,10 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -128,13 +141,13 @@ impl<T> Channel<T> { } /// Writes a message into the packet. - pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(msg); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); packet.msg.get().write(Some(msg)); packet.ready.store(true, Ordering::Release); Ok(()) @@ -146,10 +159,10 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -157,13 +170,13 @@ impl<T> Channel<T> { } /// Reads a message from the packet. - pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(()); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); if packet.on_stack { // The message has been in the packet from the beginning, so there is no need to wait @@ -177,19 +190,19 @@ impl<T> Channel<T> { // heap-allocated packet. packet.wait_ready(); let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); + drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); Ok(msg) } } /// Attempts to send a message into the channel. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -203,13 +216,17 @@ impl<T> Channel<T> { } /// Sends a message into the channel. - pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + pub(crate) fn send( + &self, + msg: T, + deadline: Option<Instant>, + ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -224,10 +241,10 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::message_on_stack(msg); + let mut packet = Packet::<T>::message_on_stack(msg); inner .senders - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); inner.receivers.notify(); drop(inner); @@ -256,13 +273,13 @@ impl<T> Channel<T> { } /// Attempts to receive a message without blocking. - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } } else if inner.is_disconnected { @@ -273,13 +290,13 @@ impl<T> Channel<T> { } /// Receives a message from the channel. - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); let mut inner = self.inner.lock(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); @@ -293,10 +310,12 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::empty_on_stack(); - inner - .receivers - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + let mut packet = Packet::<T>::empty_on_stack(); + inner.receivers.register_with_packet( + oper, + &mut packet as *mut Packet<T> as *mut (), + cx, + ); inner.senders.notify(); drop(inner); @@ -325,7 +344,7 @@ impl<T> Channel<T> { /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. - pub fn disconnect(&self) -> bool { + pub(crate) fn disconnect(&self) -> bool { let mut inner = self.inner.lock(); if !inner.is_disconnected { @@ -339,31 +358,32 @@ impl<T> Channel<T> { } /// Returns the current number of messages inside the channel. - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { 0 } /// Returns the capacity of the channel. - pub fn capacity(&self) -> Option<usize> { + #[allow(clippy::unnecessary_wraps)] // This is intentional. + pub(crate) fn capacity(&self) -> Option<usize> { Some(0) } /// Returns `true` if the channel is empty. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { true } /// Returns `true` if the channel is full. - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { true } } /// Receiver handle to a channel. -pub struct Receiver<'a, T>(&'a Channel<T>); +pub(crate) struct Receiver<'a, T>(&'a Channel<T>); /// Sender handle to a channel. -pub struct Sender<'a, T>(&'a Channel<T>); +pub(crate) struct Sender<'a, T>(&'a Channel<T>); impl<T> SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { @@ -380,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> { let mut inner = self.0.inner.lock(); inner .receivers - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.senders.notify(); inner.senders.can_select() || inner.is_disconnected } @@ -394,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } @@ -430,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> { let mut inner = self.0.inner.lock(); inner .senders - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.receivers.notify(); inner.receivers.can_select() || inner.is_disconnected } @@ -444,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } @@ -294,7 +294,7 @@ //! //! * [`after`] creates a channel that delivers a single message after a certain duration of time. //! * [`tick`] creates a channel that delivers messages periodically. -//! * [`never`] creates a channel that never delivers messages. +//! * [`never`](never()) creates a channel that never delivers messages. //! //! These channels are very efficient because messages get lazily generated on receive operations. //! @@ -328,10 +328,13 @@ allow(dead_code, unused_assignments, unused_variables) ) ))] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] #![cfg_attr(not(feature = "std"), no_std)] -// matches! requires Rust 1.42 -#![allow(clippy::match_like_matches_macro)] use cfg_if::cfg_if; diff --git a/src/select.rs b/src/select.rs index 1488f80..6103ef4 100644 --- a/src/select.rs +++ b/src/select.rs @@ -19,6 +19,7 @@ use crate::utils; /// `read` or `write`. /// /// Each field contains data associated with a specific channel flavor. +// This is a private API that is used by the select macro. #[derive(Debug, Default)] pub struct Token { pub at: flavors::at::AtToken, @@ -93,6 +94,7 @@ impl Into<usize> for Selected { /// /// This is a handle that assists select in executing an operation, registration, deciding on the /// appropriate deadline for blocking, etc. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. pub trait SelectHandle { /// Attempts to select an operation and returns `true` on success. fn try_select(&self, token: &mut Token) -> bool; @@ -442,6 +444,7 @@ fn run_ready( } /// Attempts to select one of the operations without blocking. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], @@ -458,6 +461,7 @@ pub fn try_select<'a>( } /// Blocks until one of the operations becomes ready and selects it. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], @@ -476,6 +480,7 @@ pub fn select<'a>( } /// Blocks for a limited time until one of the operations becomes ready and selects it. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], @@ -486,7 +491,7 @@ pub fn select_timeout<'a>( /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. #[inline] -pub fn select_deadline<'a>( +pub(crate) fn select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { @@ -864,9 +869,6 @@ impl<'a> Select<'a> { /// The selected operation must be completed with [`SelectedOperation::send`] /// or [`SelectedOperation::recv`]. /// - /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send - /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv - /// /// # Examples /// /// ``` diff --git a/src/utils.rs b/src/utils.rs index 3fe171b..557b6a0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; use crossbeam_utils::Backoff; /// Randomly shuffles a slice. -pub fn shuffle<T>(v: &mut [T]) { +pub(crate) fn shuffle<T>(v: &mut [T]) { let len = v.len(); if len <= 1 { return; @@ -46,7 +46,7 @@ pub fn shuffle<T>(v: &mut [T]) { } /// Sleeps until the deadline, or forever if the deadline isn't specified. -pub fn sleep_until(deadline: Option<Instant>) { +pub(crate) fn sleep_until(deadline: Option<Instant>) { loop { match deadline { None => thread::sleep(Duration::from_secs(1000)), @@ -62,14 +62,14 @@ pub fn sleep_until(deadline: Option<Instant>) { } /// A simple spinlock. -pub struct Spinlock<T> { +pub(crate) struct Spinlock<T> { flag: AtomicBool, value: UnsafeCell<T>, } impl<T> Spinlock<T> { /// Returns a new spinlock initialized with `value`. - pub fn new(value: T) -> Spinlock<T> { + pub(crate) fn new(value: T) -> Spinlock<T> { Spinlock { flag: AtomicBool::new(false), value: UnsafeCell::new(value), @@ -77,7 +77,7 @@ impl<T> Spinlock<T> { } /// Locks the spinlock. - pub fn lock(&self) -> SpinlockGuard<'_, T> { + pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> { let backoff = Backoff::new(); while self.flag.swap(true, Ordering::Acquire) { backoff.snooze(); @@ -87,7 +87,7 @@ impl<T> Spinlock<T> { } /// A guard holding a spinlock locked. -pub struct SpinlockGuard<'a, T> { +pub(crate) struct SpinlockGuard<'a, T> { parent: &'a Spinlock<T>, } diff --git a/src/waker.rs b/src/waker.rs index 3d0af26..dec73a9 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -1,5 +1,6 @@ //! Waking mechanism for threads blocked on channel operations. +use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{self, ThreadId}; @@ -8,22 +9,22 @@ use crate::select::{Operation, Selected}; use crate::utils::Spinlock; /// Represents a thread blocked on a specific channel operation. -pub struct Entry { +pub(crate) struct Entry { /// The operation. - pub oper: Operation, + pub(crate) oper: Operation, /// Optional packet. - pub packet: usize, + pub(crate) packet: *mut (), /// Context associated with the thread owning this operation. - pub cx: Context, + pub(crate) cx: Context, } /// A queue of threads blocked on channel operations. /// /// This data structure is used by threads to register blocking operations and get woken up once /// an operation becomes ready. -pub struct Waker { +pub(crate) struct Waker { /// A list of select operations. selectors: Vec<Entry>, @@ -34,7 +35,7 @@ pub struct Waker { impl Waker { /// Creates a new `Waker`. #[inline] - pub fn new() -> Self { + pub(crate) fn new() -> Self { Waker { selectors: Vec::new(), observers: Vec::new(), @@ -43,13 +44,13 @@ impl Waker { /// Registers a select operation. #[inline] - pub fn register(&mut self, oper: Operation, cx: &Context) { - self.register_with_packet(oper, 0, cx); + pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { + self.register_with_packet(oper, ptr::null_mut(), cx); } /// Registers a select operation and a packet. #[inline] - pub fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) { + pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { self.selectors.push(Entry { oper, packet, @@ -59,7 +60,7 @@ impl Waker { /// Unregisters a select operation. #[inline] - pub fn unregister(&mut self, oper: Operation) -> Option<Entry> { + pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { if let Some((i, _)) = self .selectors .iter() @@ -75,40 +76,32 @@ impl Waker { /// Attempts to find another thread's entry, select the operation, and wake it up. #[inline] - pub fn try_select(&mut self) -> Option<Entry> { - let mut entry = None; - - if !self.selectors.is_empty() { - let thread_id = current_thread_id(); - - for i in 0..self.selectors.len() { + pub(crate) fn try_select(&mut self) -> Option<Entry> { + self.selectors + .iter() + .position(|selector| { // Does the entry belong to a different thread? - if self.selectors[i].cx.thread_id() != thread_id { - // Try selecting this operation. - let sel = Selected::Operation(self.selectors[i].oper); - let res = self.selectors[i].cx.try_select(sel); - - if res.is_ok() { + selector.cx.thread_id() != current_thread_id() + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { // Provide the packet. - self.selectors[i].cx.store_packet(self.selectors[i].packet); + selector.cx.store_packet(selector.packet); // Wake the thread up. - self.selectors[i].cx.unpark(); - - // Remove the entry from the queue to keep it clean and improve - // performance. - entry = Some(self.selectors.remove(i)); - break; + selector.cx.unpark(); + true } - } - } - } - - entry + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) } /// Returns `true` if there is an entry which can be selected by the current thread. #[inline] - pub fn can_select(&self) -> bool { + pub(crate) fn can_select(&self) -> bool { if self.selectors.is_empty() { false } else { @@ -122,23 +115,23 @@ impl Waker { /// Registers an operation waiting to be ready. #[inline] - pub fn watch(&mut self, oper: Operation, cx: &Context) { + pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { self.observers.push(Entry { oper, - packet: 0, + packet: ptr::null_mut(), cx: cx.clone(), }); } /// Unregisters an operation waiting to be ready. #[inline] - pub fn unwatch(&mut self, oper: Operation) { + pub(crate) fn unwatch(&mut self, oper: Operation) { self.observers.retain(|e| e.oper != oper); } /// Notifies all operations waiting to be ready. #[inline] - pub fn notify(&mut self) { + pub(crate) fn notify(&mut self) { for entry in self.observers.drain(..) { if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { entry.cx.unpark(); @@ -148,7 +141,7 @@ impl Waker { /// Notifies all registered operations that the channel is disconnected. #[inline] - pub fn disconnect(&mut self) { + pub(crate) fn disconnect(&mut self) { for entry in self.selectors.iter() { if entry.cx.try_select(Selected::Disconnected).is_ok() { // Wake the thread up. @@ -175,7 +168,7 @@ impl Drop for Waker { /// A waker that can be shared among threads without locking. /// /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. -pub struct SyncWaker { +pub(crate) struct SyncWaker { /// The inner `Waker`. inner: Spinlock<Waker>, @@ -186,7 +179,7 @@ pub struct SyncWaker { impl SyncWaker { /// Creates a new `SyncWaker`. #[inline] - pub fn new() -> Self { + pub(crate) fn new() -> Self { SyncWaker { inner: Spinlock::new(Waker::new()), is_empty: AtomicBool::new(true), @@ -195,7 +188,7 @@ impl SyncWaker { /// Registers the current thread with an operation. #[inline] - pub fn register(&self, oper: Operation, cx: &Context) { + pub(crate) fn register(&self, oper: Operation, cx: &Context) { let mut inner = self.inner.lock(); inner.register(oper, cx); self.is_empty.store( @@ -206,7 +199,7 @@ impl SyncWaker { /// Unregisters an operation previously registered by the current thread. #[inline] - pub fn unregister(&self, oper: Operation) -> Option<Entry> { + pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { let mut inner = self.inner.lock(); let entry = inner.unregister(oper); self.is_empty.store( @@ -218,7 +211,7 @@ impl SyncWaker { /// Attempts to find one thread (not the current one), select its operation, and wake it up. #[inline] - pub fn notify(&self) { + pub(crate) fn notify(&self) { if !self.is_empty.load(Ordering::SeqCst) { let mut inner = self.inner.lock(); if !self.is_empty.load(Ordering::SeqCst) { @@ -234,7 +227,7 @@ impl SyncWaker { /// Registers an operation waiting to be ready. #[inline] - pub fn watch(&self, oper: Operation, cx: &Context) { + pub(crate) fn watch(&self, oper: Operation, cx: &Context) { let mut inner = self.inner.lock(); inner.watch(oper, cx); self.is_empty.store( @@ -245,7 +238,7 @@ impl SyncWaker { /// Unregisters an operation waiting to be ready. #[inline] - pub fn unwatch(&self, oper: Operation) { + pub(crate) fn unwatch(&self, oper: Operation) { let mut inner = self.inner.lock(); inner.unwatch(oper); self.is_empty.store( @@ -256,7 +249,7 @@ impl SyncWaker { /// Notifies all threads that the channel is disconnected. #[inline] - pub fn disconnect(&self) { + pub(crate) fn disconnect(&self) { let mut inner = self.inner.lock(); inner.disconnect(); self.is_empty.store( @@ -269,7 +262,7 @@ impl SyncWaker { impl Drop for SyncWaker { #[inline] fn drop(&mut self) { - debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true); + debug_assert!(self.is_empty.load(Ordering::SeqCst)); } } diff --git a/tests/after.rs b/tests/after.rs index 20670dc..678a8c6 100644 --- a/tests/after.rs +++ b/tests/after.rs @@ -1,5 +1,7 @@ //! Tests for the after channel flavor. +#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow + use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::thread; @@ -56,20 +58,20 @@ fn len_empty_full() { let r = after(ms(50)); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); thread::sleep(ms(100)); assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), true); + assert!(!r.is_empty()); + assert!(r.is_full()); r.try_recv().unwrap(); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); } #[test] @@ -211,7 +213,7 @@ fn select() { break; } i => { - oper.recv(&v[i]).unwrap(); + oper.recv(v[i]).unwrap(); hits.fetch_add(1, Ordering::SeqCst); } } diff --git a/tests/array.rs b/tests/array.rs index a7ae323..bb2cebe 100644 --- a/tests/array.rs +++ b/tests/array.rs @@ -1,5 +1,7 @@ //! Tests for the array channel flavor. +#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow + use std::any::Any; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -43,38 +45,38 @@ fn len_empty_full() { let (s, r) = bounded(2); assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); + assert!(s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); s.send(()).unwrap(); assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); + assert!(!s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); + assert!(!r.is_empty()); + assert!(!r.is_full()); s.send(()).unwrap(); assert_eq!(s.len(), 2); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), true); + assert!(!s.is_empty()); + assert!(s.is_full()); assert_eq!(r.len(), 2); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), true); + assert!(!r.is_empty()); + assert!(r.is_full()); r.recv().unwrap(); assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); + assert!(!s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); + assert!(!r.is_empty()); + assert!(!r.is_full()); } #[test] @@ -497,8 +499,8 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0, 10_000); - let additional = rng.gen_range(0, 50); + let steps = rng.gen_range(0..10_000); + let additional = rng.gen_range(0..50); DROPS.store(0, Ordering::SeqCst); let (s, r) = bounded::<DropCounter>(50); diff --git a/tests/golang.rs b/tests/golang.rs index 69a9315..05d67f6 100644 --- a/tests/golang.rs +++ b/tests/golang.rs @@ -9,6 +9,8 @@ //! - https://golang.org/LICENSE //! - https://golang.org/PATENTS +#![allow(clippy::mutex_atomic, clippy::redundant_clone)] + use std::alloc::{GlobalAlloc, Layout, System}; use std::any::Any; use std::cell::Cell; @@ -176,7 +178,7 @@ unsafe impl GlobalAlloc for Counter { if !ret.is_null() { ALLOCATED.fetch_add(layout.size(), SeqCst); } - return ret; + ret } unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { @@ -232,6 +234,9 @@ macro_rules! go { mod doubleselect { use super::*; + #[cfg(miri)] + const ITERATIONS: i32 = 100; + #[cfg(not(miri))] const ITERATIONS: i32 = 10_000; fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { @@ -315,7 +320,7 @@ mod fifo { fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) { inp.recv(); if ch.recv() != Some(val) { - panic!(val); + panic!("{}", val); } out.send(1); } @@ -691,6 +696,11 @@ mod select { mod select2 { use super::*; + #[cfg(miri)] + const N: i32 = 1000; + #[cfg(not(miri))] + const N: i32 = 100000; + #[test] fn main() { fn sender(c: &Chan<i32>, n: i32) { @@ -702,9 +712,7 @@ mod select2 { fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) { for _ in 0..n { select! { - recv(c.rx()) -> _ => { - () - } + recv(c.rx()) -> _ => {} recv(dummy.rx()) -> _ => { panic!("dummy"); } @@ -717,15 +725,18 @@ mod select2 { ALLOCATED.store(0, SeqCst); - go!(c, sender(&c, 100000)); - receiver(&c, &dummy, 100000); + go!(c, sender(&c, N)); + receiver(&c, &dummy, N); let alloc = ALLOCATED.load(SeqCst); - go!(c, sender(&c, 100000)); - receiver(&c, &dummy, 100000); + go!(c, sender(&c, N)); + receiver(&c, &dummy, N); - assert!(!(ALLOCATED.load(SeqCst) > alloc && (ALLOCATED.load(SeqCst) - alloc) > 110000)) + assert!( + !(ALLOCATED.load(SeqCst) > alloc + && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000)) + ) } } @@ -913,6 +924,9 @@ mod chan_test { #[test] fn test_chan() { + #[cfg(miri)] + const N: i32 = 20; + #[cfg(not(miri))] const N: i32 = 200; for cap in 0..N { @@ -1052,6 +1066,9 @@ mod chan_test { #[test] fn test_nonblock_recv_race() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] const N: usize = 1000; for _ in 0..N { @@ -1073,6 +1090,9 @@ mod chan_test { #[test] fn test_nonblock_select_race() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] const N: usize = 1000; let done = make::<bool>(1); @@ -1106,6 +1126,9 @@ mod chan_test { #[test] fn test_nonblock_select_race2() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] const N: usize = 1000; let done = make::<bool>(1); @@ -1142,6 +1165,11 @@ mod chan_test { // Ensure that send/recv on the same chan in select // does not crash nor deadlock. + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + for &cap in &[0, 10] { let wg = WaitGroup::new(); wg.add(2); @@ -1151,7 +1179,7 @@ mod chan_test { let p = p; go!(wg, p, c, { defer! { wg.done() } - for i in 0..1000 { + for i in 0..N { if p == 0 || i % 2 == 0 { select! { send(c.tx(), p) -> _ => {} @@ -1180,6 +1208,11 @@ mod chan_test { #[test] fn test_select_stress() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + let c = vec![ make::<i32>(0), make::<i32>(0), @@ -1187,8 +1220,6 @@ mod chan_test { make::<i32>(3), ]; - const N: usize = 10000; - // There are 4 goroutines that send N values on each of the chans, // + 4 goroutines that receive N values on each of the chans, // + 1 goroutine that sends N values on each of the chans in a single select, @@ -1286,6 +1317,9 @@ mod chan_test { #[test] fn test_select_fairness() { + #[cfg(miri)] + const TRIALS: usize = 100; + #[cfg(not(miri))] const TRIALS: usize = 10000; let c1 = make::<u8>(TRIALS + 1); @@ -1369,6 +1403,9 @@ mod chan_test { #[test] fn test_pseudo_random_send() { + #[cfg(miri)] + const N: usize = 20; + #[cfg(not(miri))] const N: usize = 100; for cap in 0..N { @@ -1412,6 +1449,9 @@ mod chan_test { #[test] fn test_multi_consumer() { const NWORK: usize = 23; + #[cfg(miri)] + const NITER: usize = 100; + #[cfg(not(miri))] const NITER: usize = 271828; let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31]; @@ -1507,5 +1547,61 @@ mod chan { // https://github.com/golang/go/blob/master/test/ken/chan1.go mod chan1 { - // TODO + use super::*; + + // sent messages + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + // receiving "goroutines" + const M: usize = 10; + // channel buffering + const W: usize = 2; + + fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) { + loop { + select! { + recv(c.rx()) -> rr => { + let r = rr.unwrap(); + let mut data = h.lock().unwrap(); + if data[r] != 1 { + println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]); + panic!("fail") + } + data[r] = 2; + } + } + } + } + + fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) { + for n in 0..N { + let r = n; + let mut data = h.lock().unwrap(); + if data[r] != 0 { + println!("s"); + panic!("fail"); + } + data[r] = 1; + // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094 + drop(data); + c.send(r); + } + } + + #[test] + fn main() { + let h = Arc::new(Mutex::new([0usize; N])); + let c = make::<usize>(W); + for m in 0..M { + go!(c, h, { + r(c, m, h); + }); + thread::yield_now(); + } + thread::yield_now(); + thread::yield_now(); + s(c, h); + } } diff --git a/tests/iter.rs b/tests/iter.rs index 38bcac2..463f3b0 100644 --- a/tests/iter.rs +++ b/tests/iter.rs @@ -93,7 +93,7 @@ fn recv_into_iter_owned() { assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); + assert!(iter.next().is_none()); } #[test] @@ -106,5 +106,5 @@ fn recv_into_iter_borrowed() { let mut iter = (&r).into_iter(); assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); + assert!(iter.next().is_none()); } diff --git a/tests/list.rs b/tests/list.rs index 8b84105..619e1fc 100644 --- a/tests/list.rs +++ b/tests/list.rs @@ -41,29 +41,29 @@ fn len_empty_full() { let (s, r) = unbounded(); assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); + assert!(s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); s.send(()).unwrap(); assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); + assert!(!s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); + assert!(!r.is_empty()); + assert!(!r.is_full()); r.recv().unwrap(); assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); + assert!(s.is_empty()); + assert!(!s.is_full()); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); } #[test] @@ -239,6 +239,9 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (s, r) = unbounded(); @@ -261,6 +264,9 @@ fn spsc() { #[test] fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -295,6 +301,9 @@ fn mpmc() { #[test] fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; for _ in 0..COUNT { @@ -310,6 +319,9 @@ fn stress_oneshot() { #[test] fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (request_s, request_r) = unbounded(); @@ -371,8 +383,11 @@ fn stress_timeout_two_threads() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + const RUNS: usize = 100; + static DROPS: AtomicUsize = AtomicUsize::new(0); #[derive(Debug, PartialEq)] @@ -386,9 +401,9 @@ fn drops() { let mut rng = thread_rng(); - for _ in 0..100 { - let steps = rng.gen_range(0, 10_000); - let additional = rng.gen_range(0, 1000); + for _ in 0..RUNS { + let steps = rng.gen_range(0..10_000); + let additional = rng.gen_range(0..1000); DROPS.store(0, Ordering::SeqCst); let (s, r) = unbounded::<DropCounter>(); @@ -421,6 +436,9 @@ fn drops() { #[test] fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -441,6 +459,9 @@ fn linearizable() { #[test] fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded::<()>(); @@ -463,6 +484,9 @@ fn fairness() { #[test] fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s, r) = unbounded(); @@ -496,6 +520,9 @@ fn recv_in_send() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 2a0786a..4d6e179 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -20,6 +20,12 @@ //! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT //! - https://www.rust-lang.org/en-US/legal.html +#![allow( + clippy::drop_copy, + clippy::match_single_binding, + clippy::redundant_clone +)] + use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::sync::mpsc::{SendError, TrySendError}; use std::thread::JoinHandle; @@ -176,7 +182,7 @@ macro_rules! select { ) => ({ cc::crossbeam_channel_internal! { $( - recv(($rx).inner) -> res => { + $meth(($rx).inner) -> res => { let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); $code } @@ -314,13 +320,18 @@ mod channel_tests { #[test] fn stress() { + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] + const COUNT: usize = 10000; + let (tx, rx) = channel::<i32>(); let t = thread::spawn(move || { - for _ in 0..10000 { + for _ in 0..COUNT { tx.send(1).unwrap(); } }); - for _ in 0..10000 { + for _ in 0..COUNT { assert_eq!(rx.recv().unwrap(), 1); } t.join().ok().unwrap(); @@ -328,6 +339,9 @@ mod channel_tests { #[test] fn stress_shared() { + #[cfg(miri)] + const AMT: u32 = 500; + #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = channel::<i32>(); @@ -336,10 +350,7 @@ mod channel_tests { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } + assert!(rx.try_recv().is_err()); }); let mut ts = Vec::with_capacity(NTHREADS as usize); @@ -735,12 +746,17 @@ mod channel_tests { #[test] fn recv_a_lot() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); - for _ in 0..10000 { + for _ in 0..N { tx.send(()).unwrap(); } - for _ in 0..10000 { + for _ in 0..N { rx.recv().unwrap(); } } @@ -880,7 +896,7 @@ mod channel_tests { }; assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); + assert!(iter.next().is_none()); } #[test] @@ -892,7 +908,7 @@ mod channel_tests { let mut iter = (&rx).into_iter(); assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); + assert!(iter.next().is_none()); } #[test] @@ -1079,13 +1095,18 @@ mod sync_channel_tests { #[test] fn stress() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + let (tx, rx) = sync_channel::<i32>(0); let t = thread::spawn(move || { - for _ in 0..10000 { + for _ in 0..N { tx.send(1).unwrap(); } }); - for _ in 0..10000 { + for _ in 0..N { assert_eq!(rx.recv().unwrap(), 1); } t.join().unwrap(); @@ -1093,10 +1114,15 @@ mod sync_channel_tests { #[test] fn stress_recv_timeout_two_threads() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + let (tx, rx) = sync_channel::<i32>(0); let t = thread::spawn(move || { - for _ in 0..10000 { + for _ in 0..N { tx.send(1).unwrap(); } }); @@ -1113,12 +1139,15 @@ mod sync_channel_tests { } } - assert_eq!(recv_count, 10000); + assert_eq!(recv_count, N); t.join().unwrap(); } #[test] fn stress_recv_timeout_shared() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 1000; const NTHREADS: u32 = 8; let (tx, rx) = sync_channel::<i32>(0); @@ -1165,6 +1194,9 @@ mod sync_channel_tests { #[test] fn stress_shared() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 1000; const NTHREADS: u32 = 8; let (tx, rx) = sync_channel::<i32>(0); @@ -1174,10 +1206,7 @@ mod sync_channel_tests { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } + assert!(rx.try_recv().is_err()); dtx.send(()).unwrap(); }); @@ -1449,12 +1478,17 @@ mod sync_channel_tests { #[test] fn recv_a_lot() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(10000); - for _ in 0..10000 { + let (tx, rx) = sync_channel(N); + for _ in 0..N { tx.send(()).unwrap(); } - for _ in 0..10000 { + for _ in 0..N { rx.recv().unwrap(); } } @@ -1792,7 +1826,11 @@ mod select_tests { #[test] fn stress() { + #[cfg(miri)] + const AMT: i32 = 100; + #[cfg(not(miri))] const AMT: i32 = 10000; + let (tx1, rx1) = channel::<i32>(); let (tx2, rx2) = channel::<i32>(); let (tx3, rx3) = channel::<()>(); diff --git a/tests/never.rs b/tests/never.rs index 31cebf6..f275126 100644 --- a/tests/never.rs +++ b/tests/never.rs @@ -65,8 +65,8 @@ fn capacity() { fn len_empty_full() { let r = never::<i32>(); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), true); + assert!(r.is_empty()); + assert!(r.is_full()); } #[test] diff --git a/tests/ready.rs b/tests/ready.rs index 700f487..d8dd6ce 100644 --- a/tests/ready.rs +++ b/tests/ready.rs @@ -1,5 +1,7 @@ //! Tests for channel readiness using the `Select` struct. +#![allow(clippy::drop_copy)] + use std::any::Any; use std::cell::Cell; use std::thread; @@ -490,6 +492,9 @@ fn nesting() { #[test] fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded(); @@ -527,6 +532,9 @@ fn stress_recv() { #[test] fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -561,6 +569,9 @@ fn stress_send() { #[test] fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -606,8 +617,7 @@ fn stress_timeout_two_threads() { thread::sleep(ms(500)); } - let done = false; - while !done { + loop { let mut sel = Select::new(); sel.send(&s); match sel.ready_timeout(ms(100)) { @@ -628,15 +638,14 @@ fn stress_timeout_two_threads() { thread::sleep(ms(500)); } - let mut done = false; - while !done { + loop { let mut sel = Select::new(); sel.recv(&r); match sel.ready_timeout(ms(100)) { Err(_) => {} Ok(0) => { assert_eq!(r.try_recv(), Ok(i)); - done = true; + break; } Ok(_) => panic!(), } @@ -668,6 +677,9 @@ fn send_recv_same_channel() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; @@ -724,6 +736,9 @@ fn channel_through_channel() { #[test] fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -769,6 +784,9 @@ fn fairness1() { #[test] fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (s1, r1) = unbounded::<()>(); diff --git a/tests/select.rs b/tests/select.rs index 4cf08b6..f24aed8 100644 --- a/tests/select.rs +++ b/tests/select.rs @@ -1,5 +1,7 @@ //! Tests for channel selection using the `Select` struct. +#![allow(clippy::drop_copy)] + use std::any::Any; use std::cell::Cell; use std::thread; @@ -406,6 +408,7 @@ fn both_ready() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn loop_try() { const RUNS: usize = 20; @@ -690,6 +693,9 @@ fn nesting() { #[test] fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded(); @@ -728,6 +734,9 @@ fn stress_recv() { #[test] fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -763,6 +772,9 @@ fn stress_send() { #[test] fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -809,8 +821,7 @@ fn stress_timeout_two_threads() { thread::sleep(ms(500)); } - let done = false; - while !done { + loop { let mut sel = Select::new(); let oper1 = sel.send(&s); let oper = sel.select_timeout(ms(100)); @@ -834,8 +845,7 @@ fn stress_timeout_two_threads() { thread::sleep(ms(500)); } - let mut done = false; - while !done { + loop { let mut sel = Select::new(); let oper1 = sel.recv(&r); let oper = sel.select_timeout(ms(100)); @@ -844,7 +854,7 @@ fn stress_timeout_two_threads() { Ok(oper) => match oper.index() { ix if ix == oper1 => { assert_eq!(oper.recv(&r), Ok(i)); - done = true; + break; } _ => unreachable!(), }, @@ -897,12 +907,12 @@ fn matching() { for i in 0..THREADS { scope.spawn(move |_| { let mut sel = Select::new(); - let oper1 = sel.recv(&r); - let oper2 = sel.send(&s); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); let oper = sel.select(); match oper.index() { - ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), - ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), _ => unreachable!(), } }); @@ -923,12 +933,12 @@ fn matching_with_leftover() { for i in 0..THREADS { scope.spawn(move |_| { let mut sel = Select::new(); - let oper1 = sel.recv(&r); - let oper2 = sel.send(&s); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); let oper = sel.select(); match oper.index() { - ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), - ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), _ => unreachable!(), } }); @@ -942,6 +952,9 @@ fn matching_with_leftover() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; @@ -1000,6 +1013,9 @@ fn channel_through_channel() { #[test] fn linearizable_try() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; for step in 0..2 { @@ -1052,6 +1068,9 @@ fn linearizable_try() { #[test] fn linearizable_timeout() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; for step in 0..2 { @@ -1104,6 +1123,9 @@ fn linearizable_timeout() { #[test] fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -1150,6 +1172,9 @@ fn fairness1() { #[test] fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded::<()>(); @@ -1214,8 +1239,8 @@ fn sync_and_clone() { let (s, r) = &bounded::<usize>(0); let mut sel = Select::new(); - let oper1 = sel.recv(&r); - let oper2 = sel.send(&s); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); let sel = &sel; scope(|scope| { @@ -1224,8 +1249,8 @@ fn sync_and_clone() { let mut sel = sel.clone(); let oper = sel.select(); match oper.index() { - ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), - ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), _ => unreachable!(), } }); @@ -1243,8 +1268,8 @@ fn send_and_clone() { let (s, r) = &bounded::<usize>(0); let mut sel = Select::new(); - let oper1 = sel.recv(&r); - let oper2 = sel.send(&s); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); scope(|scope| { for i in 0..THREADS { @@ -1252,8 +1277,8 @@ fn send_and_clone() { scope.spawn(move |_| { let oper = sel.select(); match oper.index() { - ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)), - ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()), + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), _ => unreachable!(), } }); @@ -1266,6 +1291,9 @@ fn send_and_clone() { #[test] fn reuse() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); diff --git a/tests/select_macro.rs b/tests/select_macro.rs index c05f7a0..0b9a21a 100644 --- a/tests/select_macro.rs +++ b/tests/select_macro.rs @@ -1,6 +1,7 @@ //! Tests for the `select!` macro. #![forbid(unsafe_code)] // select! is safe. +#![allow(clippy::drop_copy, clippy::match_single_binding)] use std::any::Any; use std::cell::Cell; @@ -283,6 +284,7 @@ fn both_ready() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn loop_try() { const RUNS: usize = 20; @@ -485,6 +487,9 @@ fn panic_receiver() { #[test] fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded(); @@ -518,6 +523,9 @@ fn stress_recv() { #[test] fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -548,6 +556,9 @@ fn stress_send() { #[test] fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); @@ -681,6 +692,9 @@ fn matching_with_leftover() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; @@ -726,6 +740,9 @@ fn channel_through_channel() { #[test] fn linearizable_default() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; for step in 0..2 { @@ -770,6 +787,9 @@ fn linearizable_default() { #[test] fn linearizable_timeout() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; for step in 0..2 { @@ -814,6 +834,9 @@ fn linearizable_timeout() { #[test] fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -838,6 +861,9 @@ fn fairness1() { #[test] fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded::<()>(); @@ -875,6 +901,9 @@ fn fairness2() { #[test] fn fairness_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -897,6 +926,9 @@ fn fairness_recv() { #[test] fn fairness_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, _r1) = bounded::<()>(COUNT); @@ -912,6 +944,7 @@ fn fairness_send() { assert!(hits.iter().all(|x| *x >= COUNT / 4)); } +#[allow(clippy::or_fun_call)] // This is intentional. #[test] fn references() { let (s, r) = unbounded::<i32>(); @@ -958,6 +991,7 @@ fn case_blocks() { drop(s); } +#[allow(clippy::redundant_closure_call)] // This is intentional. #[test] fn move_handles() { let (s, r) = unbounded::<i32>(); diff --git a/tests/thread_locals.rs b/tests/thread_locals.rs index 9e27146..effb6a1 100644 --- a/tests/thread_locals.rs +++ b/tests/thread_locals.rs @@ -1,5 +1,7 @@ //! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics. +#![cfg(not(miri))] // error: abnormal termination: the evaluated program aborted execution + use std::thread; use std::time::Duration; diff --git a/tests/tick.rs b/tests/tick.rs index 5dc8730..23bbb1f 100644 --- a/tests/tick.rs +++ b/tests/tick.rs @@ -1,5 +1,7 @@ //! Tests for the tick channel flavor. +#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow + use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::thread; @@ -78,20 +80,20 @@ fn len_empty_full() { let r = tick(ms(50)); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); thread::sleep(ms(100)); assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), true); + assert!(!r.is_empty()); + assert!(r.is_full()); r.try_recv().unwrap(); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); + assert!(r.is_empty()); + assert!(!r.is_full()); } #[test] @@ -127,6 +129,7 @@ fn recv() { assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } +#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow #[test] fn recv_timeout() { let start = Instant::now(); @@ -251,6 +254,7 @@ fn select() { assert_eq!(hits.load(Ordering::SeqCst), 8); } +#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow #[test] fn ready() { const THREADS: usize = 4; diff --git a/tests/zero.rs b/tests/zero.rs index 66dcc1e..ba41b1a 100644 --- a/tests/zero.rs +++ b/tests/zero.rs @@ -35,11 +35,11 @@ fn len_empty_full() { let (s, r) = bounded(0); assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), true); + assert!(s.is_empty()); + assert!(s.is_full()); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), true); + assert!(r.is_empty()); + assert!(r.is_full()); scope(|scope| { scope.spawn(|_| s.send(0).unwrap()); @@ -48,11 +48,11 @@ fn len_empty_full() { .unwrap(); assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), true); + assert!(s.is_empty()); + assert!(s.is_full()); assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), true); + assert!(r.is_empty()); + assert!(r.is_full()); } #[test] @@ -187,6 +187,9 @@ fn send_timeout() { #[test] fn len() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 25_000; let (s, r) = bounded(0); @@ -249,6 +252,9 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (s, r) = bounded(0); @@ -271,6 +277,9 @@ fn spsc() { #[test] fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -303,6 +312,9 @@ fn mpmc() { #[test] fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; for _ in 0..COUNT { @@ -316,6 +328,7 @@ fn stress_oneshot() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_iter() { const COUNT: usize = 1000; @@ -383,8 +396,11 @@ fn stress_timeout_two_threads() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + const RUNS: usize = 100; + static DROPS: AtomicUsize = AtomicUsize::new(0); #[derive(Debug, PartialEq)] @@ -398,8 +414,8 @@ fn drops() { let mut rng = thread_rng(); - for _ in 0..100 { - let steps = rng.gen_range(0, 3_000); + for _ in 0..RUNS { + let steps = rng.gen_range(0..3_000); DROPS.store(0, Ordering::SeqCst); let (s, r) = bounded::<DropCounter>(0); @@ -428,6 +444,9 @@ fn drops() { #[test] fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(0); @@ -459,6 +478,9 @@ fn fairness() { #[test] fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s, r) = bounded::<()>(0); @@ -517,6 +539,9 @@ fn recv_in_send() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; |