aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp11
-rw-r--r--CHANGELOG.md8
-rw-r--r--Cargo.lock88
-rw-r--r--Cargo.toml19
-rw-r--r--Cargo.toml.orig12
-rw-r--r--LICENSE-THIRD-PARTY32
-rw-r--r--METADATA10
-rw-r--r--README.md10
-rw-r--r--TEST_MAPPING17
-rw-r--r--benches/crossbeam.rs2
-rw-r--r--cargo2android.json4
-rw-r--r--examples/fibonacci.rs2
-rw-r--r--examples/stopwatch.rs10
-rw-r--r--src/channel.rs10
-rw-r--r--src/context.rs20
-rw-r--r--src/counter.rs14
-rw-r--r--src/flavors/array.rs58
-rw-r--r--src/flavors/at.rs23
-rw-r--r--src/flavors/list.rs116
-rw-r--r--src/flavors/mod.rs12
-rw-r--r--src/flavors/never.rs21
-rw-r--r--src/flavors/tick.rs21
-rw-r--r--src/flavors/zero.rs102
-rw-r--r--src/lib.rs11
-rw-r--r--src/select.rs10
-rw-r--r--src/utils.rs12
-rw-r--r--src/waker.rs93
-rw-r--r--tests/after.rs16
-rw-r--r--tests/array.rs38
-rw-r--r--tests/golang.rs124
-rw-r--r--tests/iter.rs4
-rw-r--r--tests/list.rs57
-rw-r--r--tests/mpsc.rs82
-rw-r--r--tests/never.rs4
-rw-r--r--tests/ready.rs28
-rw-r--r--tests/select.rs70
-rw-r--r--tests/select_macro.rs34
-rw-r--r--tests/thread_locals.rs2
-rw-r--r--tests/tick.rs16
-rw-r--r--tests/zero.rs45
41 files changed, 800 insertions, 475 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 08632f0..f15a046 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "d9dfc9e1ffabcb3c01addad14878f16c2795c371"
- }
-}
+ "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83"
+ },
+ "path_in_vcs": "crossbeam-channel"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 97ef73e..a5821d8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,4 @@
-// This file is generated by cargo2android.py --run --device --dependencies.
+// This file is generated by cargo2android.py --config cargo2android.json.
// Do not modify this file as changes will be overridden on upgrade.
package {
@@ -44,9 +44,10 @@ license {
rust_library {
name: "libcrossbeam_channel",
- // has rustc warnings
host_supported: true,
crate_name: "crossbeam_channel",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.5.2",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -59,9 +60,3 @@ rust_library {
"libcrossbeam_utils",
],
}
-
-// dependent_library ["feature_list"]
-// autocfg-1.0.1
-// cfg-if-1.0.0
-// crossbeam-utils-0.8.3 "lazy_static,std"
-// lazy_static-1.4.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0a53e8a..6bfd923 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+# Version 0.5.2
+
+- Fix stacked borrows violations. (#763, #764)
+
+# Version 0.5.1
+
+- Fix memory leak in unbounded channel. (#669)
+
# Version 0.5.0
- Bump the minimum supported Rust version to 1.36.
diff --git a/Cargo.lock b/Cargo.lock
index 91cb58a..feb6e75 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,22 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-[[package]]
-name = "arc-swap"
-version = "0.4.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
-
-[[package]]
-name = "autocfg"
-version = "1.0.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
-
-[[package]]
-name = "cfg-if"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
+version = 3
[[package]]
name = "cfg-if"
@@ -25,16 +9,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
-name = "const_fn"
-version = "0.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2"
-
-[[package]]
name = "crossbeam-channel"
-version = "0.5.0"
+version = "0.5.2"
dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
"crossbeam-utils",
"num_cpus",
"rand",
@@ -43,32 +21,30 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
-version = "0.8.0"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5"
+checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [
- "autocfg",
- "cfg-if 1.0.0",
- "const_fn",
+ "cfg-if",
"lazy_static",
]
[[package]]
name = "getrandom"
-version = "0.1.15"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
+checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hermit-abi"
-version = "0.1.17"
+version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
+checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
@@ -81,15 +57,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
-version = "0.2.79"
+version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743"
+checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]]
name = "num_cpus"
-version = "1.13.0"
+version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
+checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
@@ -97,17 +73,16 @@ dependencies = [
[[package]]
name = "ppv-lite86"
-version = "0.2.9"
+version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20"
+checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "rand"
-version = "0.7.3"
+version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
+checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
dependencies = [
- "getrandom",
"libc",
"rand_chacha",
"rand_core",
@@ -116,9 +91,9 @@ dependencies = [
[[package]]
name = "rand_chacha"
-version = "0.2.2"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
+checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
@@ -126,27 +101,27 @@ dependencies = [
[[package]]
name = "rand_core"
-version = "0.5.1"
+version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
+checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
-version = "0.2.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
+checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "signal-hook"
-version = "0.1.16"
+version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "604508c1418b99dfe1925ca9224829bb2a8a9a04dda655cc01fcad46f4ab05ed"
+checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d"
dependencies = [
"libc",
"signal-hook-registry",
@@ -154,16 +129,15 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
-version = "1.2.1"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035"
+checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
- "arc-swap",
"libc",
]
[[package]]
name = "wasi"
-version = "0.9.0+wasi-snapshot-preview1"
+version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
+checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
diff --git a/Cargo.toml b/Cargo.toml
index a5c9964..87a889f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,22 +3,19 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
+rust-version = "1.36"
name = "crossbeam-channel"
-version = "0.5.0"
-authors = ["The Crossbeam Project Developers"]
+version = "0.5.2"
description = "Multi-producer multi-consumer channels for message passing"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel"
-documentation = "https://docs.rs/crossbeam-channel"
-readme = "README.md"
keywords = ["channel", "mpmc", "select", "golang", "message"]
categories = ["algorithms", "concurrency", "data-structures"]
license = "MIT OR Apache-2.0"
@@ -34,10 +31,10 @@ default-features = false
version = "1.13.0"
[dev-dependencies.rand]
-version = "0.7.3"
+version = "0.8"
[dev-dependencies.signal-hook]
-version = "0.1.15"
+version = "0.3"
[features]
default = ["std"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 2931d21..640a808 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,14 +4,12 @@ name = "crossbeam-channel"
# - Update CHANGELOG.md
# - Update README.md
# - Create "crossbeam-channel-X.Y.Z" git tag
-version = "0.5.0"
-authors = ["The Crossbeam Project Developers"]
+version = "0.5.2"
edition = "2018"
+rust-version = "1.36"
license = "MIT OR Apache-2.0"
-readme = "README.md"
repository = "https://github.com/crossbeam-rs/crossbeam"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel"
-documentation = "https://docs.rs/crossbeam-channel"
description = "Multi-producer multi-consumer channels for message passing"
keywords = ["channel", "mpmc", "select", "golang", "message"]
categories = ["algorithms", "concurrency", "data-structures"]
@@ -21,6 +19,8 @@ default = ["std"]
# Enable to use APIs that require `std`.
# This is enabled by default.
+#
+# NOTE: Disabling `std` feature is not supported yet.
std = ["crossbeam-utils/std"]
[dependencies]
@@ -34,5 +34,5 @@ optional = true
[dev-dependencies]
num_cpus = "1.13.0"
-rand = "0.7.3"
-signal-hook = "0.1.15"
+rand = "0.8"
+signal-hook = "0.3"
diff --git a/LICENSE-THIRD-PARTY b/LICENSE-THIRD-PARTY
index d15e32b..ed4df76 100644
--- a/LICENSE-THIRD-PARTY
+++ b/LICENSE-THIRD-PARTY
@@ -1,37 +1,5 @@
===============================================================================
-Bounded MPMC queue
-http://www.1024cores.net/home/code-license
-
-Copyright (c) 2010-2011 Dmitry Vyukov.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
-WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
-EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
-ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-The views and conclusions contained in the software and documentation are those
-of the authors and should not be interpreted as representing official policies,
-either expressed or implied, of Dmitry Vyukov.
-
-===============================================================================
-
matching.go
https://creativecommons.org/licenses/by/3.0/legalcode
diff --git a/METADATA b/METADATA
index 00d9dde..6cbb05f 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.0.crate"
+ value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate"
}
- version: "0.5.0"
+ version: "0.5.2"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 12
- day: 21
+ year: 2022
+ month: 3
+ day: 1
}
}
diff --git a/README.md b/README.md
index eab623a..f5077c5 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)](
https://github.com/crossbeam-rs/crossbeam/actions)
-[![License](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)](
+[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)](
https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel#license)
[![Cargo](https://img.shields.io/crates/v/crossbeam-channel.svg)](
https://crates.io/crates/crossbeam-channel)
@@ -10,7 +10,7 @@ https://crates.io/crates/crossbeam-channel)
https://docs.rs/crossbeam-channel)
[![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)](
https://www.rust-lang.org)
-[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.gg/BBYwKq)
+[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ)
This crate provides multi-producer multi-consumer channels for message passing.
It is an alternative to [`std::sync::mpsc`] with more features and better performance.
@@ -41,7 +41,7 @@ Add this to your `Cargo.toml`:
```toml
[dependencies]
-crossbeam-channel = "0.4"
+crossbeam-channel = "0.5"
```
## Compatibility
@@ -73,10 +73,6 @@ This product includes copies and modifications of software developed by third pa
[matching.go](http://www.nada.kth.se/~snilsson/concurrency/src/matching.go) by Stefan Nilsson,
licensed under Creative Commons Attribution 3.0 Unported License.
-* [src/flavors/array.rs](src/flavors/array.rs) is based on
- [Bounded MPMC queue](http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)
- by Dmitry Vyukov, licensed under the Simplified BSD License and the Apache License, Version 2.0.
-
* [tests/mpsc.rs](tests/mpsc.rs) includes modifications of code from The Rust Programming Language,
licensed under the MIT License and the Apache License, Version 2.0.
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..3cbd48d
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,17 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "imports": [
+ {
+ "path": "external/rust/crates/base64"
+ },
+ {
+ "path": "external/rust/crates/tinytemplate"
+ },
+ {
+ "path": "external/rust/crates/tinyvec"
+ },
+ {
+ "path": "external/rust/crates/unicode-xid"
+ }
+ ]
+}
diff --git a/benches/crossbeam.rs b/benches/crossbeam.rs
index 9870c98..1c05222 100644
--- a/benches/crossbeam.rs
+++ b/benches/crossbeam.rs
@@ -13,7 +13,7 @@ mod unbounded {
#[bench]
fn create(b: &mut Bencher) {
- b.iter(|| unbounded::<i32>());
+ b.iter(unbounded::<i32>);
}
#[bench]
diff --git a/cargo2android.json b/cargo2android.json
new file mode 100644
index 0000000..bf78496
--- /dev/null
+++ b/cargo2android.json
@@ -0,0 +1,4 @@
+{
+ "device": true,
+ "run": true
+} \ No newline at end of file
diff --git a/examples/fibonacci.rs b/examples/fibonacci.rs
index cf22b7a..e6f5e89 100644
--- a/examples/fibonacci.rs
+++ b/examples/fibonacci.rs
@@ -10,7 +10,7 @@ fn fibonacci(sender: Sender<u64>) {
while sender.send(x).is_ok() {
let tmp = x;
x = y;
- y = tmp + y;
+ y += tmp;
}
}
diff --git a/examples/stopwatch.rs b/examples/stopwatch.rs
index 6a67c9e..3a7578e 100644
--- a/examples/stopwatch.rs
+++ b/examples/stopwatch.rs
@@ -12,13 +12,13 @@ fn main() {
use std::time::{Duration, Instant};
use crossbeam_channel::{bounded, select, tick, Receiver};
+ use signal_hook::consts::SIGINT;
use signal_hook::iterator::Signals;
- use signal_hook::SIGINT;
// Creates a channel that gets a message every time `SIGINT` is signalled.
fn sigint_notifier() -> io::Result<Receiver<()>> {
let (s, r) = bounded(100);
- let signals = Signals::new(&[SIGINT])?;
+ let mut signals = Signals::new(&[SIGINT])?;
thread::spawn(move || {
for _ in signals.forever() {
@@ -33,11 +33,7 @@ fn main() {
// Prints the elapsed time.
fn show(dur: Duration) {
- println!(
- "Elapsed: {}.{:03} sec",
- dur.as_secs(),
- dur.subsec_nanos() / 1_000_000
- );
+ println!("Elapsed: {}.{:03} sec", dur.as_secs(), dur.subsec_millis());
}
let start = Instant::now();
diff --git a/src/channel.rs b/src/channel.rs
index ebcd652..8988235 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -257,8 +257,6 @@ pub fn at(when: Instant) -> Receiver<Instant> {
/// recv(timeout) -> _ => println!("timed out"),
/// }
/// ```
-///
-/// [`select!`]: macro.select.html
pub fn never<T>() -> Receiver<T> {
Receiver {
flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
@@ -645,7 +643,7 @@ impl<T> Drop for Sender<T> {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
- SenderFlavor::List(chan) => chan.release(|c| c.disconnect()),
+ SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
@@ -1137,7 +1135,7 @@ impl<T> Drop for Receiver<T> {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
- ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::At(_) => {}
ReceiverFlavor::Tick(_) => {}
@@ -1485,7 +1483,7 @@ impl<T> SelectHandle for Receiver<T> {
}
/// Writes a message into the channel.
-pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
+pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
@@ -1494,7 +1492,7 @@ pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T
}
/// Reads a message from the channel.
-pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
+pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
match &r.flavor {
ReceiverFlavor::Array(chan) => chan.read(token),
ReceiverFlavor::List(chan) => chan.read(token),
diff --git a/src/context.rs b/src/context.rs
index e2e8480..7467b80 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -1,7 +1,8 @@
//! Thread-local context used in select.
use std::cell::Cell;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread, ThreadId};
use std::time::Instant;
@@ -11,6 +12,7 @@ use crossbeam_utils::Backoff;
use crate::select::Selected;
/// Thread-local context used in select.
+// This is a private API that is used by the select macro.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
@@ -23,7 +25,7 @@ struct Inner {
select: AtomicUsize,
/// A slot into which another thread may store a pointer to its `Packet`.
- packet: AtomicUsize,
+ packet: AtomicPtr<()>,
/// Thread handle.
thread: Thread,
@@ -45,7 +47,7 @@ impl Context {
}
let mut f = Some(f);
- let mut f = move |cx: &Context| -> R {
+ let mut f = |cx: &Context| -> R {
let f = f.take().unwrap();
f(cx)
};
@@ -69,7 +71,7 @@ impl Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
- packet: AtomicUsize::new(0),
+ packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread_id: thread::current().id(),
}),
@@ -82,7 +84,7 @@ impl Context {
self.inner
.select
.store(Selected::Waiting.into(), Ordering::Release);
- self.inner.packet.store(0, Ordering::Release);
+ self.inner.packet.store(ptr::null_mut(), Ordering::Release);
}
/// Attempts to select an operation.
@@ -112,19 +114,19 @@ impl Context {
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
- pub fn store_packet(&self, packet: usize) {
- if packet != 0 {
+ pub fn store_packet(&self, packet: *mut ()) {
+ if !packet.is_null() {
self.inner.packet.store(packet, Ordering::Release);
}
}
/// Waits until a packet is provided and returns it.
#[inline]
- pub fn wait_packet(&self) -> usize {
+ pub fn wait_packet(&self) -> *mut () {
let backoff = Backoff::new();
loop {
let packet = self.inner.packet.load(Ordering::Acquire);
- if packet != 0 {
+ if !packet.is_null() {
return packet;
}
backoff.snooze();
diff --git a/src/counter.rs b/src/counter.rs
index 2eaf067..2c27f7c 100644
--- a/src/counter.rs
+++ b/src/counter.rs
@@ -21,7 +21,7 @@ struct Counter<C> {
}
/// Wraps a channel into the reference counter.
-pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
+pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
let counter = Box::into_raw(Box::new(Counter {
senders: AtomicUsize::new(1),
receivers: AtomicUsize::new(1),
@@ -34,7 +34,7 @@ pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
}
/// The sending side.
-pub struct Sender<C> {
+pub(crate) struct Sender<C> {
counter: *mut Counter<C>,
}
@@ -45,7 +45,7 @@ impl<C> Sender<C> {
}
/// Acquires another sender reference.
- pub fn acquire(&self) -> Sender<C> {
+ pub(crate) fn acquire(&self) -> Sender<C> {
let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
// Cloning senders and calling `mem::forget` on the clones could potentially overflow the
@@ -63,7 +63,7 @@ impl<C> Sender<C> {
/// Releases the sender reference.
///
/// Function `disconnect` will be called if this is the last sender reference.
- pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
disconnect(&self.counter().chan);
@@ -89,7 +89,7 @@ impl<C> PartialEq for Sender<C> {
}
/// The receiving side.
-pub struct Receiver<C> {
+pub(crate) struct Receiver<C> {
counter: *mut Counter<C>,
}
@@ -100,7 +100,7 @@ impl<C> Receiver<C> {
}
/// Acquires another receiver reference.
- pub fn acquire(&self) -> Receiver<C> {
+ pub(crate) fn acquire(&self) -> Receiver<C> {
let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
// Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
@@ -118,7 +118,7 @@ impl<C> Receiver<C> {
/// Releases the receiver reference.
///
/// Function `disconnect` will be called if this is the last receiver reference.
- pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
disconnect(&self.counter().chan);
diff --git a/src/flavors/array.rs b/src/flavors/array.rs
index 323a200..871768c 100644
--- a/src/flavors/array.rs
+++ b/src/flavors/array.rs
@@ -5,17 +5,12 @@
//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
//!
//! Source:
-//! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-//! - https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub
-//!
-//! Copyright & License:
-//! - Copyright (c) 2010-2011 Dmitry Vyukov
-//! - Simplified BSD License and Apache License, Version 2.0
-//! - http://www.1024cores.net/home/code-license
+//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
+//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
use std::cell::UnsafeCell;
use std::marker::PhantomData;
-use std::mem::{self, MaybeUninit};
+use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::time::Instant;
@@ -57,7 +52,7 @@ impl Default for ArrayToken {
}
/// Bounded channel based on a preallocated array.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
@@ -100,7 +95,7 @@ pub struct Channel<T> {
impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
- pub fn with_capacity(cap: usize) -> Self {
+ pub(crate) fn with_capacity(cap: usize) -> Self {
assert!(cap > 0, "capacity must be positive");
// Compute constants `mark_bit` and `one_lap`.
@@ -115,7 +110,7 @@ impl<T> Channel<T> {
// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer = {
- let mut boxed: Box<[Slot<T>]> = (0..cap)
+ let boxed: Box<[Slot<T>]> = (0..cap)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot {
@@ -124,9 +119,7 @@ impl<T> Channel<T> {
}
})
.collect();
- let ptr = boxed.as_mut_ptr();
- mem::forget(boxed);
- ptr
+ Box::into_raw(boxed) as *mut Slot<T>
};
Channel {
@@ -143,12 +136,12 @@ impl<T> Channel<T> {
}
/// Returns a receiver handle to the channel.
- pub fn receiver(&self) -> Receiver<'_, T> {
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
- pub fn sender(&self) -> Sender<'_, T> {
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
@@ -224,7 +217,7 @@ impl<T> Channel<T> {
}
/// Writes a message into the channel.
- pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.array.slot.is_null() {
return Err(msg);
@@ -314,7 +307,7 @@ impl<T> Channel<T> {
}
/// Reads a message from the channel.
- pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.array.slot.is_null() {
// The channel is disconnected.
return Err(());
@@ -332,7 +325,7 @@ impl<T> Channel<T> {
}
/// Attempts to send a message into the channel.
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
@@ -342,7 +335,11 @@ impl<T> Channel<T> {
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
loop {
// Try sending a message several times.
@@ -391,7 +388,7 @@ impl<T> Channel<T> {
}
/// Attempts to receive a message without blocking.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
@@ -402,7 +399,7 @@ impl<T> Channel<T> {
}
/// Receives a message from the channel.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
@@ -453,7 +450,7 @@ impl<T> Channel<T> {
}
/// Returns the current number of messages inside the channel.
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
loop {
// Load the tail, then load the head.
let tail = self.tail.load(Ordering::SeqCst);
@@ -478,14 +475,15 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- pub fn capacity(&self) -> Option<usize> {
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(self.cap)
}
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
@@ -498,12 +496,12 @@ impl<T> Channel<T> {
}
/// Returns `true` if the channel is disconnected.
- pub fn is_disconnected(&self) -> bool {
+ pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
}
/// Returns `true` if the channel is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
@@ -515,7 +513,7 @@ impl<T> Channel<T> {
}
/// Returns `true` if the channel is full.
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
@@ -563,10 +561,10 @@ impl<T> Drop for Channel<T> {
}
/// Receiver handle to a channel.
-pub struct Receiver<'a, T>(&'a Channel<T>);
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
-pub struct Sender<'a, T>(&'a Channel<T>);
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
diff --git a/src/flavors/at.rs b/src/flavors/at.rs
index a2b1b57..4581edb 100644
--- a/src/flavors/at.rs
+++ b/src/flavors/at.rs
@@ -12,10 +12,10 @@ use crate::select::{Operation, SelectHandle, Token};
use crate::utils;
/// Result of a receive operation.
-pub type AtToken = Option<Instant>;
+pub(crate) type AtToken = Option<Instant>;
/// Channel that delivers a message at a certain moment in time
-pub struct Channel {
+pub(crate) struct Channel {
/// The instant at which the message will be delivered.
delivery_time: Instant,
@@ -26,7 +26,7 @@ pub struct Channel {
impl Channel {
/// Creates a channel that delivers a message at a certain instant in time.
#[inline]
- pub fn new_deadline(when: Instant) -> Self {
+ pub(crate) fn new_deadline(when: Instant) -> Self {
Channel {
delivery_time: when,
received: AtomicBool::new(false),
@@ -34,13 +34,13 @@ impl Channel {
}
/// Creates a channel that delivers a message after a certain duration of time.
#[inline]
- pub fn new_timeout(dur: Duration) -> Self {
+ pub(crate) fn new_timeout(dur: Duration) -> Self {
Self::new_deadline(Instant::now() + dur)
}
/// Attempts to receive a message without blocking.
#[inline]
- pub fn try_recv(&self) -> Result<Instant, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
// The message has already been received.
@@ -64,7 +64,7 @@ impl Channel {
/// Receives a message from the channel.
#[inline]
- pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
// The message has already been received.
@@ -103,13 +103,13 @@ impl Channel {
/// Reads a message from the channel.
#[inline]
- pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
token.at.ok_or(())
}
/// Returns `true` if the channel is empty.
#[inline]
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
return true;
@@ -127,13 +127,13 @@ impl Channel {
/// Returns `true` if the channel is full.
#[inline]
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
!self.is_empty()
}
/// Returns the number of messages in the channel.
#[inline]
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
if self.is_empty() {
0
} else {
@@ -142,8 +142,9 @@ impl Channel {
}
/// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
- pub fn capacity(&self) -> Option<usize> {
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
}
}
diff --git a/src/flavors/list.rs b/src/flavors/list.rs
index 532e8b6..5056aa4 100644
--- a/src/flavors/list.rs
+++ b/src/flavors/list.rs
@@ -151,7 +151,7 @@ impl Default for ListToken {
///
/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
/// improve cache efficiency.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
/// The head of the channel.
head: CachePadded<Position<T>>,
@@ -167,7 +167,7 @@ pub struct Channel<T> {
impl<T> Channel<T> {
/// Creates a new unbounded channel.
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
@@ -183,12 +183,12 @@ impl<T> Channel<T> {
}
/// Returns a receiver handle to the channel.
- pub fn receiver(&self) -> Receiver<'_, T> {
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
- pub fn sender(&self) -> Sender<'_, T> {
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
@@ -231,8 +231,8 @@ impl<T> Channel<T> {
if self
.tail
.block
- .compare_and_swap(block, new, Ordering::Release)
- == block
+ .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
@@ -276,7 +276,7 @@ impl<T> Channel<T> {
}
/// Writes a message into the channel.
- pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
@@ -380,7 +380,7 @@ impl<T> Channel<T> {
}
/// Reads a message from the channel.
- pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
// The channel is disconnected.
return Err(());
@@ -405,7 +405,7 @@ impl<T> Channel<T> {
}
/// Attempts to send a message into the channel.
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.send(msg, None).map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
@@ -413,7 +413,11 @@ impl<T> Channel<T> {
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ _deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
@@ -423,7 +427,7 @@ impl<T> Channel<T> {
}
/// Attempts to receive a message without blocking.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
@@ -434,7 +438,7 @@ impl<T> Channel<T> {
}
/// Receives a message from the channel.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
@@ -486,7 +490,7 @@ impl<T> Channel<T> {
}
/// Returns the current number of messages inside the channel.
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
let mut tail = self.tail.index.load(Ordering::SeqCst);
@@ -522,14 +526,14 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- pub fn capacity(&self) -> Option<usize> {
+ pub(crate) fn capacity(&self) -> Option<usize> {
None
}
- /// Disconnects the channel and wakes up all blocked receivers.
+ /// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
@@ -540,20 +544,90 @@ impl<T> Channel<T> {
}
}
+ /// Disconnects receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ // If receivers are dropped first, discard all messages to free
+ // memory eagerly.
+ self.discard_all_messages();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Discards all messages.
+ ///
+ /// This method should only be called when all receivers are dropped.
+ fn discard_all_messages(&self) {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ loop {
+ let offset = (tail >> SHIFT) % LAP;
+ if offset != BLOCK_CAP {
+ break;
+ }
+
+ // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+ // at boundary. We need to wait for the updates take affect otherwise there
+ // can be memory leaks.
+ backoff.snooze();
+ tail = self.tail.index.load(Ordering::Acquire);
+ }
+
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head >> SHIFT != tail >> SHIFT {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ (*block).wait_next();
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Acquire);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ head &= !MARK_BIT;
+ self.head.block.store(ptr::null_mut(), Ordering::Release);
+ self.head.index.store(head, Ordering::Release);
+ }
+
/// Returns `true` if the channel is disconnected.
- pub fn is_disconnected(&self) -> bool {
+ pub(crate) fn is_disconnected(&self) -> bool {
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
/// Returns `true` if the channel is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the channel is full.
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
false
}
}
@@ -597,10 +671,10 @@ impl<T> Drop for Channel<T> {
}
/// Receiver handle to a channel.
-pub struct Receiver<'a, T>(&'a Channel<T>);
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
-pub struct Sender<'a, T>(&'a Channel<T>);
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
diff --git a/src/flavors/mod.rs b/src/flavors/mod.rs
index 299e78f..0314bf4 100644
--- a/src/flavors/mod.rs
+++ b/src/flavors/mod.rs
@@ -9,9 +9,9 @@
//! 5. `tick` - Channel that delivers messages periodically.
//! 6. `zero` - Zero-capacity channel.
-pub mod array;
-pub mod at;
-pub mod list;
-pub mod never;
-pub mod tick;
-pub mod zero;
+pub(crate) mod array;
+pub(crate) mod at;
+pub(crate) mod list;
+pub(crate) mod never;
+pub(crate) mod tick;
+pub(crate) mod zero;
diff --git a/src/flavors/never.rs b/src/flavors/never.rs
index e49d214..1951e96 100644
--- a/src/flavors/never.rs
+++ b/src/flavors/never.rs
@@ -11,17 +11,17 @@ use crate::select::{Operation, SelectHandle, Token};
use crate::utils;
/// This flavor doesn't need a token.
-pub type NeverToken = ();
+pub(crate) type NeverToken = ();
/// Channel that never delivers messages.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Creates a channel that never delivers messages.
#[inline]
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Channel {
_marker: PhantomData,
}
@@ -29,44 +29,45 @@ impl<T> Channel<T> {
/// Attempts to receive a message without blocking.
#[inline]
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
Err(TryRecvError::Empty)
}
/// Receives a message from the channel.
#[inline]
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
utils::sleep_until(deadline);
Err(RecvTimeoutError::Timeout)
}
/// Reads a message from the channel.
#[inline]
- pub unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
Err(())
}
/// Returns `true` if the channel is empty.
#[inline]
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
#[inline]
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
true
}
/// Returns the number of messages in the channel.
#[inline]
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
- pub fn capacity(&self) -> Option<usize> {
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
}
diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs
index e8e7020..d4b1f6c 100644
--- a/src/flavors/tick.rs
+++ b/src/flavors/tick.rs
@@ -12,10 +12,10 @@ use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
/// Result of a receive operation.
-pub type TickToken = Option<Instant>;
+pub(crate) type TickToken = Option<Instant>;
/// Channel that delivers messages periodically.
-pub struct Channel {
+pub(crate) struct Channel {
/// The instant at which the next message will be delivered.
delivery_time: AtomicCell<Instant>,
@@ -26,7 +26,7 @@ pub struct Channel {
impl Channel {
/// Creates a channel that delivers messages periodically.
#[inline]
- pub fn new(dur: Duration) -> Self {
+ pub(crate) fn new(dur: Duration) -> Self {
Channel {
delivery_time: AtomicCell::new(Instant::now() + dur),
duration: dur,
@@ -35,7 +35,7 @@ impl Channel {
/// Attempts to receive a message without blocking.
#[inline]
- pub fn try_recv(&self) -> Result<Instant, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
loop {
let now = Instant::now();
let delivery_time = self.delivery_time.load();
@@ -56,7 +56,7 @@ impl Channel {
/// Receives a message from the channel.
#[inline]
- pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
loop {
let delivery_time = self.delivery_time.load();
let now = Instant::now();
@@ -85,25 +85,25 @@ impl Channel {
/// Reads a message from the channel.
#[inline]
- pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
token.tick.ok_or(())
}
/// Returns `true` if the channel is empty.
#[inline]
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
Instant::now() < self.delivery_time.load()
}
/// Returns `true` if the channel is full.
#[inline]
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
!self.is_empty()
}
/// Returns the number of messages in the channel.
#[inline]
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
if self.is_empty() {
0
} else {
@@ -112,8 +112,9 @@ impl Channel {
}
/// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
- pub fn capacity(&self) -> Option<usize> {
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
}
}
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
index be647b5..4afbd8f 100644
--- a/src/flavors/zero.rs
+++ b/src/flavors/zero.rs
@@ -6,6 +6,7 @@ use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
+use std::{fmt, ptr};
use crossbeam_utils::Backoff;
@@ -16,7 +17,19 @@ use crate::utils::Spinlock;
use crate::waker::Waker;
/// A pointer to a packet.
-pub type ZeroToken = usize;
+pub struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
@@ -80,7 +93,7 @@ struct Inner {
}
/// Zero-capacity channel.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
/// Inner representation of the channel.
inner: Spinlock<Inner>,
@@ -90,7 +103,7 @@ pub struct Channel<T> {
impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Channel {
inner: Spinlock::new(Inner {
senders: Waker::new(),
@@ -102,12 +115,12 @@ impl<T> Channel<T> {
}
/// Returns a receiver handle to the channel.
- pub fn receiver(&self) -> Receiver<'_, T> {
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
- pub fn sender(&self) -> Sender<'_, T> {
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
@@ -117,10 +130,10 @@ impl<T> Channel<T> {
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -128,13 +141,13 @@ impl<T> Channel<T> {
}
/// Writes a message into the packet.
- pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(msg);
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
@@ -146,10 +159,10 @@ impl<T> Channel<T> {
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -157,13 +170,13 @@ impl<T> Channel<T> {
}
/// Reads a message from the packet.
- pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(());
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
@@ -177,19 +190,19 @@ impl<T> Channel<T> {
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
- drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
+ drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
}
/// Attempts to send a message into the channel.
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -203,13 +216,17 @@ impl<T> Channel<T> {
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T, deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -224,10 +241,10 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::message_on_stack(msg);
+ let mut packet = Packet::<T>::message_on_stack(msg);
inner
.senders
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);
@@ -256,13 +273,13 @@ impl<T> Channel<T> {
}
/// Attempts to receive a message without blocking.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
@@ -273,13 +290,13 @@ impl<T> Channel<T> {
}
/// Receives a message from the channel.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
@@ -293,10 +310,12 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::empty_on_stack();
- inner
- .receivers
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
inner.senders.notify();
drop(inner);
@@ -325,7 +344,7 @@ impl<T> Channel<T> {
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect(&self) -> bool {
let mut inner = self.inner.lock();
if !inner.is_disconnected {
@@ -339,31 +358,32 @@ impl<T> Channel<T> {
}
/// Returns the current number of messages inside the channel.
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
- pub fn capacity(&self) -> Option<usize> {
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
/// Returns `true` if the channel is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
true
}
}
/// Receiver handle to a channel.
-pub struct Receiver<'a, T>(&'a Channel<T>);
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
-pub struct Sender<'a, T>(&'a Channel<T>);
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
@@ -380,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
let mut inner = self.0.inner.lock();
inner
.receivers
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
@@ -394,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}
@@ -430,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> {
let mut inner = self.0.inner.lock();
inner
.senders
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
@@ -444,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}
diff --git a/src/lib.rs b/src/lib.rs
index e08ac08..cc1ef11 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -294,7 +294,7 @@
//!
//! * [`after`] creates a channel that delivers a single message after a certain duration of time.
//! * [`tick`] creates a channel that delivers messages periodically.
-//! * [`never`] creates a channel that never delivers messages.
+//! * [`never`](never()) creates a channel that never delivers messages.
//!
//! These channels are very efficient because messages get lazily generated on receive operations.
//!
@@ -328,10 +328,13 @@
allow(dead_code, unused_assignments, unused_variables)
)
))]
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+#![warn(
+ missing_docs,
+ missing_debug_implementations,
+ rust_2018_idioms,
+ unreachable_pub
+)]
#![cfg_attr(not(feature = "std"), no_std)]
-// matches! requires Rust 1.42
-#![allow(clippy::match_like_matches_macro)]
use cfg_if::cfg_if;
diff --git a/src/select.rs b/src/select.rs
index 1488f80..6103ef4 100644
--- a/src/select.rs
+++ b/src/select.rs
@@ -19,6 +19,7 @@ use crate::utils;
/// `read` or `write`.
///
/// Each field contains data associated with a specific channel flavor.
+// This is a private API that is used by the select macro.
#[derive(Debug, Default)]
pub struct Token {
pub at: flavors::at::AtToken,
@@ -93,6 +94,7 @@ impl Into<usize> for Selected {
///
/// This is a handle that assists select in executing an operation, registration, deciding on the
/// appropriate deadline for blocking, etc.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
pub trait SelectHandle {
/// Attempts to select an operation and returns `true` on success.
fn try_select(&self, token: &mut Token) -> bool;
@@ -442,6 +444,7 @@ fn run_ready(
}
/// Attempts to select one of the operations without blocking.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
@@ -458,6 +461,7 @@ pub fn try_select<'a>(
}
/// Blocks until one of the operations becomes ready and selects it.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
@@ -476,6 +480,7 @@ pub fn select<'a>(
}
/// Blocks for a limited time until one of the operations becomes ready and selects it.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
@@ -486,7 +491,7 @@ pub fn select_timeout<'a>(
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
#[inline]
-pub fn select_deadline<'a>(
+pub(crate) fn select_deadline<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
deadline: Instant,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
@@ -864,9 +869,6 @@ impl<'a> Select<'a> {
/// The selected operation must be completed with [`SelectedOperation::send`]
/// or [`SelectedOperation::recv`].
///
- /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
- /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
- ///
/// # Examples
///
/// ```
diff --git a/src/utils.rs b/src/utils.rs
index 3fe171b..557b6a0 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use crossbeam_utils::Backoff;
/// Randomly shuffles a slice.
-pub fn shuffle<T>(v: &mut [T]) {
+pub(crate) fn shuffle<T>(v: &mut [T]) {
let len = v.len();
if len <= 1 {
return;
@@ -46,7 +46,7 @@ pub fn shuffle<T>(v: &mut [T]) {
}
/// Sleeps until the deadline, or forever if the deadline isn't specified.
-pub fn sleep_until(deadline: Option<Instant>) {
+pub(crate) fn sleep_until(deadline: Option<Instant>) {
loop {
match deadline {
None => thread::sleep(Duration::from_secs(1000)),
@@ -62,14 +62,14 @@ pub fn sleep_until(deadline: Option<Instant>) {
}
/// A simple spinlock.
-pub struct Spinlock<T> {
+pub(crate) struct Spinlock<T> {
flag: AtomicBool,
value: UnsafeCell<T>,
}
impl<T> Spinlock<T> {
/// Returns a new spinlock initialized with `value`.
- pub fn new(value: T) -> Spinlock<T> {
+ pub(crate) fn new(value: T) -> Spinlock<T> {
Spinlock {
flag: AtomicBool::new(false),
value: UnsafeCell::new(value),
@@ -77,7 +77,7 @@ impl<T> Spinlock<T> {
}
/// Locks the spinlock.
- pub fn lock(&self) -> SpinlockGuard<'_, T> {
+ pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> {
let backoff = Backoff::new();
while self.flag.swap(true, Ordering::Acquire) {
backoff.snooze();
@@ -87,7 +87,7 @@ impl<T> Spinlock<T> {
}
/// A guard holding a spinlock locked.
-pub struct SpinlockGuard<'a, T> {
+pub(crate) struct SpinlockGuard<'a, T> {
parent: &'a Spinlock<T>,
}
diff --git a/src/waker.rs b/src/waker.rs
index 3d0af26..dec73a9 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -1,5 +1,6 @@
//! Waking mechanism for threads blocked on channel operations.
+use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, ThreadId};
@@ -8,22 +9,22 @@ use crate::select::{Operation, Selected};
use crate::utils::Spinlock;
/// Represents a thread blocked on a specific channel operation.
-pub struct Entry {
+pub(crate) struct Entry {
/// The operation.
- pub oper: Operation,
+ pub(crate) oper: Operation,
/// Optional packet.
- pub packet: usize,
+ pub(crate) packet: *mut (),
/// Context associated with the thread owning this operation.
- pub cx: Context,
+ pub(crate) cx: Context,
}
/// A queue of threads blocked on channel operations.
///
/// This data structure is used by threads to register blocking operations and get woken up once
/// an operation becomes ready.
-pub struct Waker {
+pub(crate) struct Waker {
/// A list of select operations.
selectors: Vec<Entry>,
@@ -34,7 +35,7 @@ pub struct Waker {
impl Waker {
/// Creates a new `Waker`.
#[inline]
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Waker {
selectors: Vec::new(),
observers: Vec::new(),
@@ -43,13 +44,13 @@ impl Waker {
/// Registers a select operation.
#[inline]
- pub fn register(&mut self, oper: Operation, cx: &Context) {
- self.register_with_packet(oper, 0, cx);
+ pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
+ self.register_with_packet(oper, ptr::null_mut(), cx);
}
/// Registers a select operation and a packet.
#[inline]
- pub fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry {
oper,
packet,
@@ -59,7 +60,7 @@ impl Waker {
/// Unregisters a select operation.
#[inline]
- pub fn unregister(&mut self, oper: Operation) -> Option<Entry> {
+ pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
if let Some((i, _)) = self
.selectors
.iter()
@@ -75,40 +76,32 @@ impl Waker {
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
- pub fn try_select(&mut self) -> Option<Entry> {
- let mut entry = None;
-
- if !self.selectors.is_empty() {
- let thread_id = current_thread_id();
-
- for i in 0..self.selectors.len() {
+ pub(crate) fn try_select(&mut self) -> Option<Entry> {
+ self.selectors
+ .iter()
+ .position(|selector| {
// Does the entry belong to a different thread?
- if self.selectors[i].cx.thread_id() != thread_id {
- // Try selecting this operation.
- let sel = Selected::Operation(self.selectors[i].oper);
- let res = self.selectors[i].cx.try_select(sel);
-
- if res.is_ok() {
+ selector.cx.thread_id() != current_thread_id()
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
// Provide the packet.
- self.selectors[i].cx.store_packet(self.selectors[i].packet);
+ selector.cx.store_packet(selector.packet);
// Wake the thread up.
- self.selectors[i].cx.unpark();
-
- // Remove the entry from the queue to keep it clean and improve
- // performance.
- entry = Some(self.selectors.remove(i));
- break;
+ selector.cx.unpark();
+ true
}
- }
- }
- }
-
- entry
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
}
/// Returns `true` if there is an entry which can be selected by the current thread.
#[inline]
- pub fn can_select(&self) -> bool {
+ pub(crate) fn can_select(&self) -> bool {
if self.selectors.is_empty() {
false
} else {
@@ -122,23 +115,23 @@ impl Waker {
/// Registers an operation waiting to be ready.
#[inline]
- pub fn watch(&mut self, oper: Operation, cx: &Context) {
+ pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
self.observers.push(Entry {
oper,
- packet: 0,
+ packet: ptr::null_mut(),
cx: cx.clone(),
});
}
/// Unregisters an operation waiting to be ready.
#[inline]
- pub fn unwatch(&mut self, oper: Operation) {
+ pub(crate) fn unwatch(&mut self, oper: Operation) {
self.observers.retain(|e| e.oper != oper);
}
/// Notifies all operations waiting to be ready.
#[inline]
- pub fn notify(&mut self) {
+ pub(crate) fn notify(&mut self) {
for entry in self.observers.drain(..) {
if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
entry.cx.unpark();
@@ -148,7 +141,7 @@ impl Waker {
/// Notifies all registered operations that the channel is disconnected.
#[inline]
- pub fn disconnect(&mut self) {
+ pub(crate) fn disconnect(&mut self) {
for entry in self.selectors.iter() {
if entry.cx.try_select(Selected::Disconnected).is_ok() {
// Wake the thread up.
@@ -175,7 +168,7 @@ impl Drop for Waker {
/// A waker that can be shared among threads without locking.
///
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
-pub struct SyncWaker {
+pub(crate) struct SyncWaker {
/// The inner `Waker`.
inner: Spinlock<Waker>,
@@ -186,7 +179,7 @@ pub struct SyncWaker {
impl SyncWaker {
/// Creates a new `SyncWaker`.
#[inline]
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
SyncWaker {
inner: Spinlock::new(Waker::new()),
is_empty: AtomicBool::new(true),
@@ -195,7 +188,7 @@ impl SyncWaker {
/// Registers the current thread with an operation.
#[inline]
- pub fn register(&self, oper: Operation, cx: &Context) {
+ pub(crate) fn register(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock();
inner.register(oper, cx);
self.is_empty.store(
@@ -206,7 +199,7 @@ impl SyncWaker {
/// Unregisters an operation previously registered by the current thread.
#[inline]
- pub fn unregister(&self, oper: Operation) -> Option<Entry> {
+ pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
let mut inner = self.inner.lock();
let entry = inner.unregister(oper);
self.is_empty.store(
@@ -218,7 +211,7 @@ impl SyncWaker {
/// Attempts to find one thread (not the current one), select its operation, and wake it up.
#[inline]
- pub fn notify(&self) {
+ pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.inner.lock();
if !self.is_empty.load(Ordering::SeqCst) {
@@ -234,7 +227,7 @@ impl SyncWaker {
/// Registers an operation waiting to be ready.
#[inline]
- pub fn watch(&self, oper: Operation, cx: &Context) {
+ pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock();
inner.watch(oper, cx);
self.is_empty.store(
@@ -245,7 +238,7 @@ impl SyncWaker {
/// Unregisters an operation waiting to be ready.
#[inline]
- pub fn unwatch(&self, oper: Operation) {
+ pub(crate) fn unwatch(&self, oper: Operation) {
let mut inner = self.inner.lock();
inner.unwatch(oper);
self.is_empty.store(
@@ -256,7 +249,7 @@ impl SyncWaker {
/// Notifies all threads that the channel is disconnected.
#[inline]
- pub fn disconnect(&self) {
+ pub(crate) fn disconnect(&self) {
let mut inner = self.inner.lock();
inner.disconnect();
self.is_empty.store(
@@ -269,7 +262,7 @@ impl SyncWaker {
impl Drop for SyncWaker {
#[inline]
fn drop(&mut self) {
- debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
}
}
diff --git a/tests/after.rs b/tests/after.rs
index 20670dc..678a8c6 100644
--- a/tests/after.rs
+++ b/tests/after.rs
@@ -1,5 +1,7 @@
//! Tests for the after channel flavor.
+#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
+
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::thread;
@@ -56,20 +58,20 @@ fn len_empty_full() {
let r = after(ms(50));
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
thread::sleep(ms(100));
assert_eq!(r.len(), 1);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), true);
+ assert!(!r.is_empty());
+ assert!(r.is_full());
r.try_recv().unwrap();
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
}
#[test]
@@ -211,7 +213,7 @@ fn select() {
break;
}
i => {
- oper.recv(&v[i]).unwrap();
+ oper.recv(v[i]).unwrap();
hits.fetch_add(1, Ordering::SeqCst);
}
}
diff --git a/tests/array.rs b/tests/array.rs
index a7ae323..bb2cebe 100644
--- a/tests/array.rs
+++ b/tests/array.rs
@@ -1,5 +1,7 @@
//! Tests for the array channel flavor.
+#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
+
use std::any::Any;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -43,38 +45,38 @@ fn len_empty_full() {
let (s, r) = bounded(2);
assert_eq!(s.len(), 0);
- assert_eq!(s.is_empty(), true);
- assert_eq!(s.is_full(), false);
+ assert!(s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
s.send(()).unwrap();
assert_eq!(s.len(), 1);
- assert_eq!(s.is_empty(), false);
- assert_eq!(s.is_full(), false);
+ assert!(!s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 1);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), false);
+ assert!(!r.is_empty());
+ assert!(!r.is_full());
s.send(()).unwrap();
assert_eq!(s.len(), 2);
- assert_eq!(s.is_empty(), false);
- assert_eq!(s.is_full(), true);
+ assert!(!s.is_empty());
+ assert!(s.is_full());
assert_eq!(r.len(), 2);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), true);
+ assert!(!r.is_empty());
+ assert!(r.is_full());
r.recv().unwrap();
assert_eq!(s.len(), 1);
- assert_eq!(s.is_empty(), false);
- assert_eq!(s.is_full(), false);
+ assert!(!s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 1);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), false);
+ assert!(!r.is_empty());
+ assert!(!r.is_full());
}
#[test]
@@ -497,8 +499,8 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0, 10_000);
- let additional = rng.gen_range(0, 50);
+ let steps = rng.gen_range(0..10_000);
+ let additional = rng.gen_range(0..50);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = bounded::<DropCounter>(50);
diff --git a/tests/golang.rs b/tests/golang.rs
index 69a9315..05d67f6 100644
--- a/tests/golang.rs
+++ b/tests/golang.rs
@@ -9,6 +9,8 @@
//! - https://golang.org/LICENSE
//! - https://golang.org/PATENTS
+#![allow(clippy::mutex_atomic, clippy::redundant_clone)]
+
use std::alloc::{GlobalAlloc, Layout, System};
use std::any::Any;
use std::cell::Cell;
@@ -176,7 +178,7 @@ unsafe impl GlobalAlloc for Counter {
if !ret.is_null() {
ALLOCATED.fetch_add(layout.size(), SeqCst);
}
- return ret;
+ ret
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
@@ -232,6 +234,9 @@ macro_rules! go {
mod doubleselect {
use super::*;
+ #[cfg(miri)]
+ const ITERATIONS: i32 = 100;
+ #[cfg(not(miri))]
const ITERATIONS: i32 = 10_000;
fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
@@ -315,7 +320,7 @@ mod fifo {
fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) {
inp.recv();
if ch.recv() != Some(val) {
- panic!(val);
+ panic!("{}", val);
}
out.send(1);
}
@@ -691,6 +696,11 @@ mod select {
mod select2 {
use super::*;
+ #[cfg(miri)]
+ const N: i32 = 1000;
+ #[cfg(not(miri))]
+ const N: i32 = 100000;
+
#[test]
fn main() {
fn sender(c: &Chan<i32>, n: i32) {
@@ -702,9 +712,7 @@ mod select2 {
fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) {
for _ in 0..n {
select! {
- recv(c.rx()) -> _ => {
- ()
- }
+ recv(c.rx()) -> _ => {}
recv(dummy.rx()) -> _ => {
panic!("dummy");
}
@@ -717,15 +725,18 @@ mod select2 {
ALLOCATED.store(0, SeqCst);
- go!(c, sender(&c, 100000));
- receiver(&c, &dummy, 100000);
+ go!(c, sender(&c, N));
+ receiver(&c, &dummy, N);
let alloc = ALLOCATED.load(SeqCst);
- go!(c, sender(&c, 100000));
- receiver(&c, &dummy, 100000);
+ go!(c, sender(&c, N));
+ receiver(&c, &dummy, N);
- assert!(!(ALLOCATED.load(SeqCst) > alloc && (ALLOCATED.load(SeqCst) - alloc) > 110000))
+ assert!(
+ !(ALLOCATED.load(SeqCst) > alloc
+ && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000))
+ )
}
}
@@ -913,6 +924,9 @@ mod chan_test {
#[test]
fn test_chan() {
+ #[cfg(miri)]
+ const N: i32 = 20;
+ #[cfg(not(miri))]
const N: i32 = 200;
for cap in 0..N {
@@ -1052,6 +1066,9 @@ mod chan_test {
#[test]
fn test_nonblock_recv_race() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
const N: usize = 1000;
for _ in 0..N {
@@ -1073,6 +1090,9 @@ mod chan_test {
#[test]
fn test_nonblock_select_race() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
const N: usize = 1000;
let done = make::<bool>(1);
@@ -1106,6 +1126,9 @@ mod chan_test {
#[test]
fn test_nonblock_select_race2() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
const N: usize = 1000;
let done = make::<bool>(1);
@@ -1142,6 +1165,11 @@ mod chan_test {
// Ensure that send/recv on the same chan in select
// does not crash nor deadlock.
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 1000;
+
for &cap in &[0, 10] {
let wg = WaitGroup::new();
wg.add(2);
@@ -1151,7 +1179,7 @@ mod chan_test {
let p = p;
go!(wg, p, c, {
defer! { wg.done() }
- for i in 0..1000 {
+ for i in 0..N {
if p == 0 || i % 2 == 0 {
select! {
send(c.tx(), p) -> _ => {}
@@ -1180,6 +1208,11 @@ mod chan_test {
#[test]
fn test_select_stress() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 10000;
+
let c = vec![
make::<i32>(0),
make::<i32>(0),
@@ -1187,8 +1220,6 @@ mod chan_test {
make::<i32>(3),
];
- const N: usize = 10000;
-
// There are 4 goroutines that send N values on each of the chans,
// + 4 goroutines that receive N values on each of the chans,
// + 1 goroutine that sends N values on each of the chans in a single select,
@@ -1286,6 +1317,9 @@ mod chan_test {
#[test]
fn test_select_fairness() {
+ #[cfg(miri)]
+ const TRIALS: usize = 100;
+ #[cfg(not(miri))]
const TRIALS: usize = 10000;
let c1 = make::<u8>(TRIALS + 1);
@@ -1369,6 +1403,9 @@ mod chan_test {
#[test]
fn test_pseudo_random_send() {
+ #[cfg(miri)]
+ const N: usize = 20;
+ #[cfg(not(miri))]
const N: usize = 100;
for cap in 0..N {
@@ -1412,6 +1449,9 @@ mod chan_test {
#[test]
fn test_multi_consumer() {
const NWORK: usize = 23;
+ #[cfg(miri)]
+ const NITER: usize = 100;
+ #[cfg(not(miri))]
const NITER: usize = 271828;
let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];
@@ -1507,5 +1547,61 @@ mod chan {
// https://github.com/golang/go/blob/master/test/ken/chan1.go
mod chan1 {
- // TODO
+ use super::*;
+
+ // sent messages
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 1000;
+ // receiving "goroutines"
+ const M: usize = 10;
+ // channel buffering
+ const W: usize = 2;
+
+ fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) {
+ loop {
+ select! {
+ recv(c.rx()) -> rr => {
+ let r = rr.unwrap();
+ let mut data = h.lock().unwrap();
+ if data[r] != 1 {
+ println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]);
+ panic!("fail")
+ }
+ data[r] = 2;
+ }
+ }
+ }
+ }
+
+ fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) {
+ for n in 0..N {
+ let r = n;
+ let mut data = h.lock().unwrap();
+ if data[r] != 0 {
+ println!("s");
+ panic!("fail");
+ }
+ data[r] = 1;
+ // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094
+ drop(data);
+ c.send(r);
+ }
+ }
+
+ #[test]
+ fn main() {
+ let h = Arc::new(Mutex::new([0usize; N]));
+ let c = make::<usize>(W);
+ for m in 0..M {
+ go!(c, h, {
+ r(c, m, h);
+ });
+ thread::yield_now();
+ }
+ thread::yield_now();
+ thread::yield_now();
+ s(c, h);
+ }
}
diff --git a/tests/iter.rs b/tests/iter.rs
index 38bcac2..463f3b0 100644
--- a/tests/iter.rs
+++ b/tests/iter.rs
@@ -93,7 +93,7 @@ fn recv_into_iter_owned() {
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
+ assert!(iter.next().is_none());
}
#[test]
@@ -106,5 +106,5 @@ fn recv_into_iter_borrowed() {
let mut iter = (&r).into_iter();
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
+ assert!(iter.next().is_none());
}
diff --git a/tests/list.rs b/tests/list.rs
index 8b84105..619e1fc 100644
--- a/tests/list.rs
+++ b/tests/list.rs
@@ -41,29 +41,29 @@ fn len_empty_full() {
let (s, r) = unbounded();
assert_eq!(s.len(), 0);
- assert_eq!(s.is_empty(), true);
- assert_eq!(s.is_full(), false);
+ assert!(s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
s.send(()).unwrap();
assert_eq!(s.len(), 1);
- assert_eq!(s.is_empty(), false);
- assert_eq!(s.is_full(), false);
+ assert!(!s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 1);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), false);
+ assert!(!r.is_empty());
+ assert!(!r.is_full());
r.recv().unwrap();
assert_eq!(s.len(), 0);
- assert_eq!(s.is_empty(), true);
- assert_eq!(s.is_full(), false);
+ assert!(s.is_empty());
+ assert!(!s.is_full());
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
}
#[test]
@@ -239,6 +239,9 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (s, r) = unbounded();
@@ -261,6 +264,9 @@ fn spsc() {
#[test]
fn mpmc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -295,6 +301,9 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
for _ in 0..COUNT {
@@ -310,6 +319,9 @@ fn stress_oneshot() {
#[test]
fn stress_iter() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (request_s, request_r) = unbounded();
@@ -371,8 +383,11 @@ fn stress_timeout_two_threads() {
.unwrap();
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ const RUNS: usize = 100;
+
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
@@ -386,9 +401,9 @@ fn drops() {
let mut rng = thread_rng();
- for _ in 0..100 {
- let steps = rng.gen_range(0, 10_000);
- let additional = rng.gen_range(0, 1000);
+ for _ in 0..RUNS {
+ let steps = rng.gen_range(0..10_000);
+ let additional = rng.gen_range(0..1000);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = unbounded::<DropCounter>();
@@ -421,6 +436,9 @@ fn drops() {
#[test]
fn linearizable() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -441,6 +459,9 @@ fn linearizable() {
#[test]
fn fairness() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded::<()>();
@@ -463,6 +484,9 @@ fn fairness() {
#[test]
fn fairness_duplicates() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s, r) = unbounded();
@@ -496,6 +520,9 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 2a0786a..4d6e179 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -20,6 +20,12 @@
//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
//! - https://www.rust-lang.org/en-US/legal.html
+#![allow(
+ clippy::drop_copy,
+ clippy::match_single_binding,
+ clippy::redundant_clone
+)]
+
use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
use std::sync::mpsc::{SendError, TrySendError};
use std::thread::JoinHandle;
@@ -176,7 +182,7 @@ macro_rules! select {
) => ({
cc::crossbeam_channel_internal! {
$(
- recv(($rx).inner) -> res => {
+ $meth(($rx).inner) -> res => {
let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
$code
}
@@ -314,13 +320,18 @@ mod channel_tests {
#[test]
fn stress() {
+ #[cfg(miri)]
+ const COUNT: usize = 500;
+ #[cfg(not(miri))]
+ const COUNT: usize = 10000;
+
let (tx, rx) = channel::<i32>();
let t = thread::spawn(move || {
- for _ in 0..10000 {
+ for _ in 0..COUNT {
tx.send(1).unwrap();
}
});
- for _ in 0..10000 {
+ for _ in 0..COUNT {
assert_eq!(rx.recv().unwrap(), 1);
}
t.join().ok().unwrap();
@@ -328,6 +339,9 @@ mod channel_tests {
#[test]
fn stress_shared() {
+ #[cfg(miri)]
+ const AMT: u32 = 500;
+ #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = channel::<i32>();
@@ -336,10 +350,7 @@ mod channel_tests {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
- match rx.try_recv() {
- Ok(..) => panic!(),
- _ => {}
- }
+ assert!(rx.try_recv().is_err());
});
let mut ts = Vec::with_capacity(NTHREADS as usize);
@@ -735,12 +746,17 @@ mod channel_tests {
#[test]
fn recv_a_lot() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 10000;
+
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = channel();
- for _ in 0..10000 {
+ for _ in 0..N {
tx.send(()).unwrap();
}
- for _ in 0..10000 {
+ for _ in 0..N {
rx.recv().unwrap();
}
}
@@ -880,7 +896,7 @@ mod channel_tests {
};
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
+ assert!(iter.next().is_none());
}
#[test]
@@ -892,7 +908,7 @@ mod channel_tests {
let mut iter = (&rx).into_iter();
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
+ assert!(iter.next().is_none());
}
#[test]
@@ -1079,13 +1095,18 @@ mod sync_channel_tests {
#[test]
fn stress() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 10000;
+
let (tx, rx) = sync_channel::<i32>(0);
let t = thread::spawn(move || {
- for _ in 0..10000 {
+ for _ in 0..N {
tx.send(1).unwrap();
}
});
- for _ in 0..10000 {
+ for _ in 0..N {
assert_eq!(rx.recv().unwrap(), 1);
}
t.join().unwrap();
@@ -1093,10 +1114,15 @@ mod sync_channel_tests {
#[test]
fn stress_recv_timeout_two_threads() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 10000;
+
let (tx, rx) = sync_channel::<i32>(0);
let t = thread::spawn(move || {
- for _ in 0..10000 {
+ for _ in 0..N {
tx.send(1).unwrap();
}
});
@@ -1113,12 +1139,15 @@ mod sync_channel_tests {
}
}
- assert_eq!(recv_count, 10000);
+ assert_eq!(recv_count, N);
t.join().unwrap();
}
#[test]
fn stress_recv_timeout_shared() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
let (tx, rx) = sync_channel::<i32>(0);
@@ -1165,6 +1194,9 @@ mod sync_channel_tests {
#[test]
fn stress_shared() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
let (tx, rx) = sync_channel::<i32>(0);
@@ -1174,10 +1206,7 @@ mod sync_channel_tests {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
- match rx.try_recv() {
- Ok(..) => panic!(),
- _ => {}
- }
+ assert!(rx.try_recv().is_err());
dtx.send(()).unwrap();
});
@@ -1449,12 +1478,17 @@ mod sync_channel_tests {
#[test]
fn recv_a_lot() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 10000;
+
// Regression test that we don't run out of stack in scheduler context
- let (tx, rx) = sync_channel(10000);
- for _ in 0..10000 {
+ let (tx, rx) = sync_channel(N);
+ for _ in 0..N {
tx.send(()).unwrap();
}
- for _ in 0..10000 {
+ for _ in 0..N {
rx.recv().unwrap();
}
}
@@ -1792,7 +1826,11 @@ mod select_tests {
#[test]
fn stress() {
+ #[cfg(miri)]
+ const AMT: i32 = 100;
+ #[cfg(not(miri))]
const AMT: i32 = 10000;
+
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<i32>();
let (tx3, rx3) = channel::<()>();
diff --git a/tests/never.rs b/tests/never.rs
index 31cebf6..f275126 100644
--- a/tests/never.rs
+++ b/tests/never.rs
@@ -65,8 +65,8 @@ fn capacity() {
fn len_empty_full() {
let r = never::<i32>();
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), true);
+ assert!(r.is_empty());
+ assert!(r.is_full());
}
#[test]
diff --git a/tests/ready.rs b/tests/ready.rs
index 700f487..d8dd6ce 100644
--- a/tests/ready.rs
+++ b/tests/ready.rs
@@ -1,5 +1,7 @@
//! Tests for channel readiness using the `Select` struct.
+#![allow(clippy::drop_copy)]
+
use std::any::Any;
use std::cell::Cell;
use std::thread;
@@ -490,6 +492,9 @@ fn nesting() {
#[test]
fn stress_recv() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded();
@@ -527,6 +532,9 @@ fn stress_recv() {
#[test]
fn stress_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -561,6 +569,9 @@ fn stress_send() {
#[test]
fn stress_mixed() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -606,8 +617,7 @@ fn stress_timeout_two_threads() {
thread::sleep(ms(500));
}
- let done = false;
- while !done {
+ loop {
let mut sel = Select::new();
sel.send(&s);
match sel.ready_timeout(ms(100)) {
@@ -628,15 +638,14 @@ fn stress_timeout_two_threads() {
thread::sleep(ms(500));
}
- let mut done = false;
- while !done {
+ loop {
let mut sel = Select::new();
sel.recv(&r);
match sel.ready_timeout(ms(100)) {
Err(_) => {}
Ok(0) => {
assert_eq!(r.try_recv(), Ok(i));
- done = true;
+ break;
}
Ok(_) => panic!(),
}
@@ -668,6 +677,9 @@ fn send_recv_same_channel() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
@@ -724,6 +736,9 @@ fn channel_through_channel() {
#[test]
fn fairness1() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -769,6 +784,9 @@ fn fairness1() {
#[test]
fn fairness2() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (s1, r1) = unbounded::<()>();
diff --git a/tests/select.rs b/tests/select.rs
index 4cf08b6..f24aed8 100644
--- a/tests/select.rs
+++ b/tests/select.rs
@@ -1,5 +1,7 @@
//! Tests for channel selection using the `Select` struct.
+#![allow(clippy::drop_copy)]
+
use std::any::Any;
use std::cell::Cell;
use std::thread;
@@ -406,6 +408,7 @@ fn both_ready() {
.unwrap();
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn loop_try() {
const RUNS: usize = 20;
@@ -690,6 +693,9 @@ fn nesting() {
#[test]
fn stress_recv() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded();
@@ -728,6 +734,9 @@ fn stress_recv() {
#[test]
fn stress_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -763,6 +772,9 @@ fn stress_send() {
#[test]
fn stress_mixed() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -809,8 +821,7 @@ fn stress_timeout_two_threads() {
thread::sleep(ms(500));
}
- let done = false;
- while !done {
+ loop {
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select_timeout(ms(100));
@@ -834,8 +845,7 @@ fn stress_timeout_two_threads() {
thread::sleep(ms(500));
}
- let mut done = false;
- while !done {
+ loop {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select_timeout(ms(100));
@@ -844,7 +854,7 @@ fn stress_timeout_two_threads() {
Ok(oper) => match oper.index() {
ix if ix == oper1 => {
assert_eq!(oper.recv(&r), Ok(i));
- done = true;
+ break;
}
_ => unreachable!(),
},
@@ -897,12 +907,12 @@ fn matching() {
for i in 0..THREADS {
scope.spawn(move |_| {
let mut sel = Select::new();
- let oper1 = sel.recv(&r);
- let oper2 = sel.send(&s);
+ let oper1 = sel.recv(r);
+ let oper2 = sel.send(s);
let oper = sel.select();
match oper.index() {
- ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
- ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
+ ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
+ ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
_ => unreachable!(),
}
});
@@ -923,12 +933,12 @@ fn matching_with_leftover() {
for i in 0..THREADS {
scope.spawn(move |_| {
let mut sel = Select::new();
- let oper1 = sel.recv(&r);
- let oper2 = sel.send(&s);
+ let oper1 = sel.recv(r);
+ let oper2 = sel.send(s);
let oper = sel.select();
match oper.index() {
- ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
- ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
+ ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
+ ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
_ => unreachable!(),
}
});
@@ -942,6 +952,9 @@ fn matching_with_leftover() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
@@ -1000,6 +1013,9 @@ fn channel_through_channel() {
#[test]
fn linearizable_try() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
for step in 0..2 {
@@ -1052,6 +1068,9 @@ fn linearizable_try() {
#[test]
fn linearizable_timeout() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
for step in 0..2 {
@@ -1104,6 +1123,9 @@ fn linearizable_timeout() {
#[test]
fn fairness1() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -1150,6 +1172,9 @@ fn fairness1() {
#[test]
fn fairness2() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded::<()>();
@@ -1214,8 +1239,8 @@ fn sync_and_clone() {
let (s, r) = &bounded::<usize>(0);
let mut sel = Select::new();
- let oper1 = sel.recv(&r);
- let oper2 = sel.send(&s);
+ let oper1 = sel.recv(r);
+ let oper2 = sel.send(s);
let sel = &sel;
scope(|scope| {
@@ -1224,8 +1249,8 @@ fn sync_and_clone() {
let mut sel = sel.clone();
let oper = sel.select();
match oper.index() {
- ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
- ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
+ ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
+ ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
_ => unreachable!(),
}
});
@@ -1243,8 +1268,8 @@ fn send_and_clone() {
let (s, r) = &bounded::<usize>(0);
let mut sel = Select::new();
- let oper1 = sel.recv(&r);
- let oper2 = sel.send(&s);
+ let oper1 = sel.recv(r);
+ let oper2 = sel.send(s);
scope(|scope| {
for i in 0..THREADS {
@@ -1252,8 +1277,8 @@ fn send_and_clone() {
scope.spawn(move |_| {
let oper = sel.select();
match oper.index() {
- ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
- ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
+ ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
+ ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
_ => unreachable!(),
}
});
@@ -1266,6 +1291,9 @@ fn send_and_clone() {
#[test]
fn reuse() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
diff --git a/tests/select_macro.rs b/tests/select_macro.rs
index c05f7a0..0b9a21a 100644
--- a/tests/select_macro.rs
+++ b/tests/select_macro.rs
@@ -1,6 +1,7 @@
//! Tests for the `select!` macro.
#![forbid(unsafe_code)] // select! is safe.
+#![allow(clippy::drop_copy, clippy::match_single_binding)]
use std::any::Any;
use std::cell::Cell;
@@ -283,6 +284,7 @@ fn both_ready() {
.unwrap();
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn loop_try() {
const RUNS: usize = 20;
@@ -485,6 +487,9 @@ fn panic_receiver() {
#[test]
fn stress_recv() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded();
@@ -518,6 +523,9 @@ fn stress_recv() {
#[test]
fn stress_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -548,6 +556,9 @@ fn stress_send() {
#[test]
fn stress_mixed() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
@@ -681,6 +692,9 @@ fn matching_with_leftover() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
@@ -726,6 +740,9 @@ fn channel_through_channel() {
#[test]
fn linearizable_default() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
for step in 0..2 {
@@ -770,6 +787,9 @@ fn linearizable_default() {
#[test]
fn linearizable_timeout() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
for step in 0..2 {
@@ -814,6 +834,9 @@ fn linearizable_timeout() {
#[test]
fn fairness1() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -838,6 +861,9 @@ fn fairness1() {
#[test]
fn fairness2() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = unbounded::<()>();
@@ -875,6 +901,9 @@ fn fairness2() {
#[test]
fn fairness_recv() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -897,6 +926,9 @@ fn fairness_recv() {
#[test]
fn fairness_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, _r1) = bounded::<()>(COUNT);
@@ -912,6 +944,7 @@ fn fairness_send() {
assert!(hits.iter().all(|x| *x >= COUNT / 4));
}
+#[allow(clippy::or_fun_call)] // This is intentional.
#[test]
fn references() {
let (s, r) = unbounded::<i32>();
@@ -958,6 +991,7 @@ fn case_blocks() {
drop(s);
}
+#[allow(clippy::redundant_closure_call)] // This is intentional.
#[test]
fn move_handles() {
let (s, r) = unbounded::<i32>();
diff --git a/tests/thread_locals.rs b/tests/thread_locals.rs
index 9e27146..effb6a1 100644
--- a/tests/thread_locals.rs
+++ b/tests/thread_locals.rs
@@ -1,5 +1,7 @@
//! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics.
+#![cfg(not(miri))] // error: abnormal termination: the evaluated program aborted execution
+
use std::thread;
use std::time::Duration;
diff --git a/tests/tick.rs b/tests/tick.rs
index 5dc8730..23bbb1f 100644
--- a/tests/tick.rs
+++ b/tests/tick.rs
@@ -1,5 +1,7 @@
//! Tests for the tick channel flavor.
+#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
+
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::thread;
@@ -78,20 +80,20 @@ fn len_empty_full() {
let r = tick(ms(50));
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
thread::sleep(ms(100));
assert_eq!(r.len(), 1);
- assert_eq!(r.is_empty(), false);
- assert_eq!(r.is_full(), true);
+ assert!(!r.is_empty());
+ assert!(r.is_full());
r.try_recv().unwrap();
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), false);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
}
#[test]
@@ -127,6 +129,7 @@ fn recv() {
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}
+#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
#[test]
fn recv_timeout() {
let start = Instant::now();
@@ -251,6 +254,7 @@ fn select() {
assert_eq!(hits.load(Ordering::SeqCst), 8);
}
+#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
#[test]
fn ready() {
const THREADS: usize = 4;
diff --git a/tests/zero.rs b/tests/zero.rs
index 66dcc1e..ba41b1a 100644
--- a/tests/zero.rs
+++ b/tests/zero.rs
@@ -35,11 +35,11 @@ fn len_empty_full() {
let (s, r) = bounded(0);
assert_eq!(s.len(), 0);
- assert_eq!(s.is_empty(), true);
- assert_eq!(s.is_full(), true);
+ assert!(s.is_empty());
+ assert!(s.is_full());
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), true);
+ assert!(r.is_empty());
+ assert!(r.is_full());
scope(|scope| {
scope.spawn(|_| s.send(0).unwrap());
@@ -48,11 +48,11 @@ fn len_empty_full() {
.unwrap();
assert_eq!(s.len(), 0);
- assert_eq!(s.is_empty(), true);
- assert_eq!(s.is_full(), true);
+ assert!(s.is_empty());
+ assert!(s.is_full());
assert_eq!(r.len(), 0);
- assert_eq!(r.is_empty(), true);
- assert_eq!(r.is_full(), true);
+ assert!(r.is_empty());
+ assert!(r.is_full());
}
#[test]
@@ -187,6 +187,9 @@ fn send_timeout() {
#[test]
fn len() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
let (s, r) = bounded(0);
@@ -249,6 +252,9 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (s, r) = bounded(0);
@@ -271,6 +277,9 @@ fn spsc() {
#[test]
fn mpmc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -303,6 +312,9 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
for _ in 0..COUNT {
@@ -316,6 +328,7 @@ fn stress_oneshot() {
}
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_iter() {
const COUNT: usize = 1000;
@@ -383,8 +396,11 @@ fn stress_timeout_two_threads() {
.unwrap();
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ const RUNS: usize = 100;
+
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
@@ -398,8 +414,8 @@ fn drops() {
let mut rng = thread_rng();
- for _ in 0..100 {
- let steps = rng.gen_range(0, 3_000);
+ for _ in 0..RUNS {
+ let steps = rng.gen_range(0..3_000);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = bounded::<DropCounter>(0);
@@ -428,6 +444,9 @@ fn drops() {
#[test]
fn fairness() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(0);
@@ -459,6 +478,9 @@ fn fairness() {
#[test]
fn fairness_duplicates() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s, r) = bounded::<()>(0);
@@ -517,6 +539,9 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;