aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 04:43:27 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 04:43:27 +0000
commite3dfb1ddbf4cb2a5d2701a6d4a91fcc07b1677a7 (patch)
tree41a3064fbbb4b862c41a57421365ee8c362760bf
parentba5e88288d83aebdb552486f14a2c37c74d2b87e (diff)
parent61a284f21e182c391151d6a63a7be5a240931d6c (diff)
downloadcrossbeam-channel-aml_ads_341517040.tar.gz
Change-Id: Icc3b6c2df98179c9634d5b39456cb08ed6f7371a
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp8
-rw-r--r--CHANGELOG.md28
-rw-r--r--Cargo.lock63
-rw-r--r--Cargo.toml21
-rw-r--r--Cargo.toml.orig4
-rw-r--r--METADATA12
-rw-r--r--README.md4
-rw-r--r--TEST_MAPPING3
-rw-r--r--src/channel.rs33
-rw-r--r--src/err.rs4
-rw-r--r--src/flavors/array.rs78
-rw-r--r--src/flavors/at.rs8
-rw-r--r--src/flavors/list.rs28
-rw-r--r--src/flavors/never.rs1
-rw-r--r--src/flavors/tick.rs5
-rw-r--r--src/flavors/zero.rs71
-rw-r--r--src/select.rs25
-rw-r--r--src/select_macro.rs60
-rw-r--r--src/utils.rs56
-rw-r--r--src/waker.rs64
-rw-r--r--tests/array.rs94
-rw-r--r--tests/golang.rs628
-rw-r--r--tests/list.rs34
-rw-r--r--tests/mpsc.rs19
-rw-r--r--tests/ready.rs1
-rw-r--r--tests/select.rs17
-rw-r--r--tests/select_macro.rs14
-rw-r--r--tests/thread_locals.rs2
-rw-r--r--tests/zero.rs26
30 files changed, 1015 insertions, 398 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f15a046..548f675 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83"
+ "sha1": "721382b00b5dadd81954ed66764d547e2f1bb7a3"
},
"path_in_vcs": "crossbeam-channel"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index a5821d8..be32d00 100644
--- a/Android.bp
+++ b/Android.bp
@@ -47,7 +47,7 @@ rust_library {
host_supported: true,
crate_name: "crossbeam_channel",
cargo_env_compat: true,
- cargo_pkg_version: "0.5.2",
+ cargo_pkg_version: "0.5.7",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -59,4 +59,10 @@ rust_library {
"libcfg_if",
"libcrossbeam_utils",
],
+ apex_available: [
+ "//apex_available:platform",
+ "//apex_available:anyapex",
+ ],
+ product_available: true,
+ vendor_available: true,
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6bfd923..3277f15 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,30 @@
+# Version 0.5.7
+
+- Improve handling of very large timeout. (#953)
+
+# Version 0.5.6
+
+- Bump the minimum supported Rust version to 1.38. (#877)
+
+# Version 0.5.5
+
+- Replace Spinlock with Mutex. (#835)
+
+# Version 0.5.4
+
+- Workaround a bug in upstream related to TLS access on AArch64 Linux. (#802)
+
+# Version 0.5.3
+
+**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details.
+
+- Fix panic on very large timeout. (#798)
+
# Version 0.5.2
-- Fix stacked borrows violations. (#763, #764)
+**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details.
+
+- Fix stacked borrows violations when `-Zmiri-tag-raw-pointers` is enabled. (#763, #764)
# Version 0.5.1
@@ -22,6 +46,8 @@
# Version 0.4.3
+**Note:** This release has been yanked. See [GHSA-v5m7-53cv-f3hx](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-v5m7-53cv-f3hx) for details.
+
- Change license to "MIT OR Apache-2.0".
# Version 0.4.2
diff --git a/Cargo.lock b/Cargo.lock
index feb6e75..e4b10f4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10,7 +10,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
-version = "0.5.2"
+version = "0.5.7"
dependencies = [
"cfg-if",
"crossbeam-utils",
@@ -21,19 +21,18 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
-version = "0.8.6"
+version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
+checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b"
dependencies = [
"cfg-if",
- "lazy_static",
]
[[package]]
name = "getrandom"
-version = "0.2.3"
+version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
+checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if",
"libc",
@@ -42,30 +41,24 @@ dependencies = [
[[package]]
name = "hermit-abi"
-version = "0.1.19"
+version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
+checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
[[package]]
-name = "lazy_static"
-version = "1.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
-
-[[package]]
name = "libc"
-version = "0.2.112"
+version = "0.2.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
+checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
[[package]]
name = "num_cpus"
-version = "1.13.1"
+version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
+checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"libc",
@@ -73,20 +66,19 @@ dependencies = [
[[package]]
name = "ppv-lite86"
-version = "0.2.16"
+version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
+checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "rand"
-version = "0.8.4"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
+checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
- "rand_hc",
]
[[package]]
@@ -101,27 +93,18 @@ dependencies = [
[[package]]
name = "rand_core"
-version = "0.6.3"
+version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
+checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
-name = "rand_hc"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
-dependencies = [
- "rand_core",
-]
-
-[[package]]
name = "signal-hook"
-version = "0.3.13"
+version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d"
+checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9"
dependencies = [
"libc",
"signal-hook-registry",
@@ -129,15 +112,15 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
-version = "1.4.0"
+version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
+checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "wasi"
-version = "0.10.2+wasi-snapshot-preview1"
+version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
diff --git a/Cargo.toml b/Cargo.toml
index 87a889f..b0e95c3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,15 +11,27 @@
[package]
edition = "2018"
-rust-version = "1.36"
+rust-version = "1.38"
name = "crossbeam-channel"
-version = "0.5.2"
+version = "0.5.7"
description = "Multi-producer multi-consumer channels for message passing"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel"
-keywords = ["channel", "mpmc", "select", "golang", "message"]
-categories = ["algorithms", "concurrency", "data-structures"]
+readme = "README.md"
+keywords = [
+ "channel",
+ "mpmc",
+ "select",
+ "golang",
+ "message",
+]
+categories = [
+ "algorithms",
+ "concurrency",
+ "data-structures",
+]
license = "MIT OR Apache-2.0"
repository = "https://github.com/crossbeam-rs/crossbeam"
+
[dependencies.cfg-if]
version = "1"
@@ -27,6 +39,7 @@ version = "1"
version = "0.8"
optional = true
default-features = false
+
[dev-dependencies.num_cpus]
version = "1.13.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 640a808..25c3678 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,9 +4,9 @@ name = "crossbeam-channel"
# - Update CHANGELOG.md
# - Update README.md
# - Create "crossbeam-channel-X.Y.Z" git tag
-version = "0.5.2"
+version = "0.5.7"
edition = "2018"
-rust-version = "1.36"
+rust-version = "1.38"
license = "MIT OR Apache-2.0"
repository = "https://github.com/crossbeam-rs/crossbeam"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel"
diff --git a/METADATA b/METADATA
index 6cbb05f..d824734 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/crossbeam-channel
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "crossbeam-channel"
description: "Multi-producer multi-consumer channels for message passing"
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate"
+ value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.7.crate"
}
- version: "0.5.2"
+ version: "0.5.7"
license_type: NOTICE
last_upgrade_date {
- year: 2022
+ year: 2023
month: 3
- day: 1
+ day: 6
}
}
diff --git a/README.md b/README.md
index f5077c5..4c42d86 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@ https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel#license)
https://crates.io/crates/crossbeam-channel)
[![Documentation](https://docs.rs/crossbeam-channel/badge.svg)](
https://docs.rs/crossbeam-channel)
-[![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)](
+[![Rust 1.38+](https://img.shields.io/badge/rust-1.38+-lightgray.svg)](
https://www.rust-lang.org)
[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ)
@@ -48,7 +48,7 @@ crossbeam-channel = "0.5"
Crossbeam Channel supports stable Rust releases going back at least six months,
and every time the minimum supported Rust version is increased, a new minor
-version is released. Currently, the minimum supported Rust version is 1.36.
+version is released. Currently, the minimum supported Rust version is 1.38.
## License
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 3cbd48d..55384f0 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -5,6 +5,9 @@
"path": "external/rust/crates/base64"
},
{
+ "path": "external/rust/crates/hashbrown"
+ },
+ {
"path": "external/rust/crates/tinytemplate"
},
{
diff --git a/src/channel.rs b/src/channel.rs
index 8988235..bd24115 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -159,7 +159,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
/// let ms = |ms| Duration::from_millis(ms);
///
/// // Returns `true` if `a` and `b` are very close `Instant`s.
-/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
+/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
///
/// let start = Instant::now();
/// let r = after(ms(100));
@@ -171,8 +171,11 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
/// assert!(eq(Instant::now(), start + ms(500)));
/// ```
pub fn after(duration: Duration) -> Receiver<Instant> {
- Receiver {
- flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
+ match Instant::now().checked_add(duration) {
+ Some(deadline) => Receiver {
+ flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
+ },
+ None => never(),
}
}
@@ -232,6 +235,8 @@ pub fn at(when: Instant) -> Receiver<Instant> {
///
/// Using a `never` channel to optionally add a timeout to [`select!`]:
///
+/// [`select!`]: crate::select!
+///
/// ```
/// use std::thread;
/// use std::time::Duration;
@@ -297,7 +302,7 @@ pub fn never<T>() -> Receiver<T> {
/// let ms = |ms| Duration::from_millis(ms);
///
/// // Returns `true` if `a` and `b` are very close `Instant`s.
-/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
+/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
///
/// let start = Instant::now();
/// let r = tick(ms(100));
@@ -317,8 +322,14 @@ pub fn never<T>() -> Receiver<T> {
/// assert!(eq(Instant::now(), start + ms(700)));
/// ```
pub fn tick(duration: Duration) -> Receiver<Instant> {
- Receiver {
- flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
+ match Instant::now().checked_add(duration) {
+ Some(delivery_time) => Receiver {
+ flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
+ delivery_time,
+ duration,
+ ))),
+ },
+ None => never(),
}
}
@@ -471,7 +482,10 @@ impl<T> Sender<T> {
/// );
/// ```
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
- self.send_deadline(msg, Instant::now() + timeout)
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.send_deadline(msg, deadline),
+ None => self.send(msg).map_err(SendTimeoutError::from),
+ }
}
/// Waits for a message to be sent into the channel, but only until a given deadline.
@@ -861,7 +875,10 @@ impl<T> Receiver<T> {
/// );
/// ```
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- self.recv_deadline(Instant::now() + timeout)
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ None => self.recv().map_err(RecvTimeoutError::from),
+ }
}
/// Waits for a message to be received from the channel, but only before a given deadline.
diff --git a/src/err.rs b/src/err.rs
index 578acc6..18cb830 100644
--- a/src/err.rs
+++ b/src/err.rs
@@ -308,7 +308,6 @@ impl From<RecvError> for TryRecvError {
impl TryRecvError {
/// Returns `true` if the receive operation failed because the channel is empty.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
@@ -317,7 +316,6 @@ impl TryRecvError {
}
/// Returns `true` if the receive operation failed because the channel is disconnected.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_disconnected(&self) -> bool {
match self {
TryRecvError::Disconnected => true,
@@ -347,7 +345,6 @@ impl From<RecvError> for RecvTimeoutError {
impl RecvTimeoutError {
/// Returns `true` if the receive operation timed out.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_timeout(&self) -> bool {
match self {
RecvTimeoutError::Timeout => true,
@@ -356,7 +353,6 @@ impl RecvTimeoutError {
}
/// Returns `true` if the receive operation failed because the channel is disconnected.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_disconnected(&self) -> bool {
match self {
RecvTimeoutError::Disconnected => true,
diff --git a/src/flavors/array.rs b/src/flavors/array.rs
index 871768c..63b82eb 100644
--- a/src/flavors/array.rs
+++ b/src/flavors/array.rs
@@ -9,7 +9,6 @@
//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
use std::cell::UnsafeCell;
-use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
@@ -33,7 +32,7 @@ struct Slot<T> {
/// The token type for the array flavor.
#[derive(Debug)]
-pub struct ArrayToken {
+pub(crate) struct ArrayToken {
/// Slot to read from or write to.
slot: *const u8,
@@ -72,7 +71,7 @@ pub(crate) struct Channel<T> {
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
- buffer: *mut Slot<T>,
+ buffer: Box<[Slot<T>]>,
/// The channel capacity.
cap: usize,
@@ -88,9 +87,6 @@ pub(crate) struct Channel<T> {
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
-
- /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
- _marker: PhantomData<T>,
}
impl<T> Channel<T> {
@@ -109,18 +105,15 @@ impl<T> Channel<T> {
// Allocate a buffer of `cap` slots initialized
// with stamps.
- let buffer = {
- let boxed: Box<[Slot<T>]> = (0..cap)
- .map(|i| {
- // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
- Slot {
- stamp: AtomicUsize::new(i),
- msg: UnsafeCell::new(MaybeUninit::uninit()),
- }
- })
- .collect();
- Box::into_raw(boxed) as *mut Slot<T>
- };
+ let buffer: Box<[Slot<T>]> = (0..cap)
+ .map(|i| {
+ // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+ Slot {
+ stamp: AtomicUsize::new(i),
+ msg: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ })
+ .collect();
Channel {
buffer,
@@ -131,7 +124,6 @@ impl<T> Channel<T> {
tail: CachePadded::new(AtomicUsize::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
- _marker: PhantomData,
}
}
@@ -163,7 +155,8 @@ impl<T> Channel<T> {
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
- let slot = unsafe { &*self.buffer.add(index) };
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
@@ -223,7 +216,7 @@ impl<T> Channel<T> {
return Err(msg);
}
- let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+ let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
// Write the message into the slot and update the stamp.
slot.msg.get().write(MaybeUninit::new(msg));
@@ -245,7 +238,8 @@ impl<T> Channel<T> {
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
- let slot = unsafe { &*self.buffer.add(index) };
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
@@ -313,7 +307,7 @@ impl<T> Channel<T> {
return Err(());
}
- let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+ let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
// Read the message from the slot and update the stamp.
let msg = slot.msg.get().read().assume_init();
@@ -475,7 +469,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(self.cap)
}
@@ -528,10 +521,24 @@ impl<T> Channel<T> {
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
- let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
+ let head = *self.head.get_mut();
+ let tail = *self.tail.get_mut();
+
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ let len = if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
// Loop over all slots that hold a message and drop them.
- for i in 0..self.len() {
+ for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i
@@ -540,23 +547,12 @@ impl<T> Drop for Channel<T> {
};
unsafe {
- let p = {
- let slot = &mut *self.buffer.add(index);
- let msg = &mut *slot.msg.get();
- msg.as_mut_ptr()
- };
- p.drop_in_place();
+ debug_assert!(index < self.buffer.len());
+ let slot = self.buffer.get_unchecked_mut(index);
+ let msg = &mut *slot.msg.get();
+ msg.as_mut_ptr().drop_in_place();
}
}
-
- // Finally, deallocate the buffer, but don't run any destructors.
- unsafe {
- // Create a slice from the buffer to make
- // a fat pointer. Then, use Box::from_raw
- // to deallocate it.
- let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
- Box::from_raw(ptr);
- }
}
}
diff --git a/src/flavors/at.rs b/src/flavors/at.rs
index 4581edb..515c4e3 100644
--- a/src/flavors/at.rs
+++ b/src/flavors/at.rs
@@ -4,7 +4,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
-use std::time::{Duration, Instant};
+use std::time::Instant;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
@@ -32,11 +32,6 @@ impl Channel {
received: AtomicBool::new(false),
}
}
- /// Creates a channel that delivers a message after a certain duration of time.
- #[inline]
- pub(crate) fn new_timeout(dur: Duration) -> Self {
- Self::new_deadline(Instant::now() + dur)
- }
/// Attempts to receive a message without blocking.
#[inline]
@@ -142,7 +137,6 @@ impl Channel {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
diff --git a/src/flavors/list.rs b/src/flavors/list.rs
index 5056aa4..6090b8d 100644
--- a/src/flavors/list.rs
+++ b/src/flavors/list.rs
@@ -49,6 +49,11 @@ struct Slot<T> {
}
impl<T> Slot<T> {
+ const UNINIT: Self = Self {
+ msg: UnsafeCell::new(MaybeUninit::uninit()),
+ state: AtomicUsize::new(0),
+ };
+
/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
@@ -72,13 +77,10 @@ struct Block<T> {
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
- // SAFETY: This is safe because:
- // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
- // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
- // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
- // holds a MaybeUninit.
- // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
- unsafe { MaybeUninit::zeroed().assume_init() }
+ Self {
+ next: AtomicPtr::new(ptr::null_mut()),
+ slots: [Slot::UNINIT; BLOCK_CAP],
+ }
}
/// Waits until the next pointer is set.
@@ -126,7 +128,7 @@ struct Position<T> {
/// The token type for the list flavor.
#[derive(Debug)]
-pub struct ListToken {
+pub(crate) struct ListToken {
/// The block of slots.
block: *const u8,
@@ -283,7 +285,7 @@ impl<T> Channel<T> {
}
// Write the message into the slot.
- let block = token.list.block as *mut Block<T>;
+ let block = token.list.block.cast::<Block<T>>();
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(MaybeUninit::new(msg));
@@ -634,9 +636,9 @@ impl<T> Channel<T> {
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
- let mut head = self.head.index.load(Ordering::Relaxed);
- let mut tail = self.tail.index.load(Ordering::Relaxed);
- let mut block = self.head.block.load(Ordering::Relaxed);
+ let mut head = *self.head.index.get_mut();
+ let mut tail = *self.tail.index.get_mut();
+ let mut block = *self.head.block.get_mut();
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
@@ -654,7 +656,7 @@ impl<T> Drop for Channel<T> {
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
- let next = (*block).next.load(Ordering::Relaxed);
+ let next = *(*block).next.get_mut();
drop(Box::from_raw(block));
block = next;
}
diff --git a/src/flavors/never.rs b/src/flavors/never.rs
index 1951e96..277a61d 100644
--- a/src/flavors/never.rs
+++ b/src/flavors/never.rs
@@ -65,7 +65,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs
index d4b1f6c..d38f6a5 100644
--- a/src/flavors/tick.rs
+++ b/src/flavors/tick.rs
@@ -26,9 +26,9 @@ pub(crate) struct Channel {
impl Channel {
/// Creates a channel that delivers messages periodically.
#[inline]
- pub(crate) fn new(dur: Duration) -> Self {
+ pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
Channel {
- delivery_time: AtomicCell::new(Instant::now() + dur),
+ delivery_time: AtomicCell::new(delivery_time),
duration: dur,
}
}
@@ -112,7 +112,6 @@ impl Channel {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
index 4afbd8f..aae2ea3 100644
--- a/src/flavors/zero.rs
+++ b/src/flavors/zero.rs
@@ -5,6 +5,7 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
use std::time::Instant;
use std::{fmt, ptr};
@@ -13,11 +14,10 @@ use crossbeam_utils::Backoff;
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
-use crate::utils::Spinlock;
use crate::waker::Waker;
/// A pointer to a packet.
-pub struct ZeroToken(*mut ());
+pub(crate) struct ZeroToken(*mut ());
impl Default for ZeroToken {
fn default() -> Self {
@@ -95,7 +95,7 @@ struct Inner {
/// Zero-capacity channel.
pub(crate) struct Channel<T> {
/// Inner representation of the channel.
- inner: Spinlock<Inner>,
+ inner: Mutex<Inner>,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
@@ -105,7 +105,7 @@ impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
pub(crate) fn new() -> Self {
Channel {
- inner: Spinlock::new(Inner {
+ inner: Mutex::new(Inner {
senders: Waker::new(),
receivers: Waker::new(),
is_disconnected: false,
@@ -126,7 +126,7 @@ impl<T> Channel<T> {
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -155,7 +155,7 @@ impl<T> Channel<T> {
/// Attempts to pair up with a sender.
fn start_recv(&self, token: &mut Token) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -190,7 +190,7 @@ impl<T> Channel<T> {
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
- drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
+ drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
Ok(msg)
}
}
@@ -198,7 +198,7 @@ impl<T> Channel<T> {
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -222,7 +222,7 @@ impl<T> Channel<T> {
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -254,12 +254,12 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
- self.inner.lock().senders.unregister(oper).unwrap();
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Timeout(msg))
}
Selected::Disconnected => {
- self.inner.lock().senders.unregister(oper).unwrap();
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Disconnected(msg))
}
@@ -275,7 +275,7 @@ impl<T> Channel<T> {
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -292,7 +292,7 @@ impl<T> Channel<T> {
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -325,11 +325,21 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
- self.inner.lock().receivers.unregister(oper).unwrap();
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
Err(RecvTimeoutError::Timeout)
}
Selected::Disconnected => {
- self.inner.lock().receivers.unregister(oper).unwrap();
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
Err(RecvTimeoutError::Disconnected)
}
Selected::Operation(_) => {
@@ -345,7 +355,7 @@ impl<T> Channel<T> {
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
if !inner.is_disconnected {
inner.is_disconnected = true;
@@ -363,7 +373,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
@@ -397,18 +406,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner
.receivers
- .register_with_packet(oper, packet as *mut (), cx);
+ .register_with_packet(oper, packet.cast::<()>(), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
fn unregister(&self, oper: Operation) {
- if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
+ if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
unsafe {
- drop(Box::from_raw(operation.packet as *mut Packet<T>));
+ drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
}
}
}
@@ -419,18 +428,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn is_ready(&self) -> bool {
- let inner = self.0.inner.lock();
+ let inner = self.0.inner.lock().unwrap();
inner.senders.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.receivers.watch(oper, cx);
inner.senders.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.receivers.unwatch(oper);
}
}
@@ -447,18 +456,18 @@ impl<T> SelectHandle for Sender<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner
.senders
- .register_with_packet(oper, packet as *mut (), cx);
+ .register_with_packet(oper, packet.cast::<()>(), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
fn unregister(&self, oper: Operation) {
- if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
+ if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
unsafe {
- drop(Box::from_raw(operation.packet as *mut Packet<T>));
+ drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
}
}
}
@@ -469,18 +478,18 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn is_ready(&self) -> bool {
- let inner = self.0.inner.lock();
+ let inner = self.0.inner.lock().unwrap();
inner.receivers.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.senders.watch(oper, cx);
inner.receivers.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.senders.unwatch(oper);
}
}
diff --git a/src/select.rs b/src/select.rs
index 6103ef4..3eb0b97 100644
--- a/src/select.rs
+++ b/src/select.rs
@@ -22,12 +22,13 @@ use crate::utils;
// This is a private API that is used by the select macro.
#[derive(Debug, Default)]
pub struct Token {
- pub at: flavors::at::AtToken,
- pub array: flavors::array::ArrayToken,
- pub list: flavors::list::ListToken,
- pub never: flavors::never::NeverToken,
- pub tick: flavors::tick::TickToken,
- pub zero: flavors::zero::ZeroToken,
+ pub(crate) at: flavors::at::AtToken,
+ pub(crate) array: flavors::array::ArrayToken,
+ pub(crate) list: flavors::list::ListToken,
+ #[allow(dead_code)]
+ pub(crate) never: flavors::never::NeverToken,
+ pub(crate) tick: flavors::tick::TickToken,
+ pub(crate) zero: flavors::zero::ZeroToken,
}
/// Identifier associated with an operation by a specific thread on a specific channel.
@@ -486,7 +487,10 @@ pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
- select_deadline(handles, Instant::now() + timeout)
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => select_deadline(handles, deadline),
+ None => Ok(select(handles)),
+ }
}
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
@@ -518,6 +522,8 @@ pub(crate) fn select_deadline<'a>(
/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
/// dynamically created list of channel operations.
///
+/// [`select!`]: crate::select!
+///
/// Once a list of operations has been built with `Select`, there are two different ways of
/// proceeding:
///
@@ -1042,7 +1048,10 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
- self.ready_deadline(Instant::now() + timeout)
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.ready_deadline(deadline),
+ None => Ok(self.ready()),
+ }
}
/// Blocks until a given deadline, or until one of the operations becomes ready.
diff --git a/src/select_macro.rs b/src/select_macro.rs
index f8b247e..efe0ae4 100644
--- a/src/select_macro.rs
+++ b/src/select_macro.rs
@@ -121,18 +121,7 @@ macro_rules! crossbeam_channel_internal {
};
// Only one case remains.
(@list
- ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr)
- ($($head:tt)*)
- ) => {
- $crate::crossbeam_channel_internal!(
- @list
- ()
- ($($head)* $case ($($args)*) $(-> $res)* => { $body },)
- )
- };
- // Accept a trailing comma at the end of the list.
- (@list
- ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr,)
+ ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr $(,)?)
($($head:tt)*)
) => {
$crate::crossbeam_channel_internal!(
@@ -373,20 +362,7 @@ macro_rules! crossbeam_channel_internal {
// Check the format of a recv case.
(@case
- (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*)
- ($($cases:tt)*)
- $default:tt
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- ($($cases)* recv($r) -> $res => $body,)
- $default
- )
- };
- // Allow trailing comma...
- (@case
- (recv($r:expr,) -> $res:pat => $body:tt, $($tail:tt)*)
+ (recv($r:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*)
($($cases:tt)*)
$default:tt
) => {
@@ -428,20 +404,7 @@ macro_rules! crossbeam_channel_internal {
// Check the format of a send case.
(@case
- (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*)
- ($($cases:tt)*)
- $default:tt
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- ($($cases)* send($s, $m) -> $res => $body,)
- $default
- )
- };
- // Allow trailing comma...
- (@case
- (send($s:expr, $m:expr,) -> $res:pat => $body:tt, $($tail:tt)*)
+ (send($s:expr, $m:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*)
($($cases:tt)*)
$default:tt
) => {
@@ -496,20 +459,7 @@ macro_rules! crossbeam_channel_internal {
};
// Check the format of a default case with timeout.
(@case
- (default($timeout:expr) => $body:tt, $($tail:tt)*)
- $cases:tt
- ()
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- $cases
- (default($timeout) => $body,)
- )
- };
- // Allow trailing comma...
- (@case
- (default($timeout:expr,) => $body:tt, $($tail:tt)*)
+ (default($timeout:expr $(,)?) => $body:tt, $($tail:tt)*)
$cases:tt
()
) => {
@@ -1043,7 +993,7 @@ macro_rules! crossbeam_channel_internal {
/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
/// when it will simply return an error because the channel is disconnected.
///
-/// The `select` macro is a convenience wrapper around [`Select`]. However, it cannot select over a
+/// The `select!` macro is a convenience wrapper around [`Select`]. However, it cannot select over a
/// dynamically created list of channel operations.
///
/// [`Select`]: super::Select
diff --git a/src/utils.rs b/src/utils.rs
index 557b6a0..f623f27 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -1,14 +1,10 @@
//! Miscellaneous utilities.
-use std::cell::{Cell, UnsafeCell};
+use std::cell::Cell;
use std::num::Wrapping;
-use std::ops::{Deref, DerefMut};
-use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
-use crossbeam_utils::Backoff;
-
/// Randomly shuffles a slice.
pub(crate) fn shuffle<T>(v: &mut [T]) {
let len = v.len();
@@ -60,53 +56,3 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
}
}
}
-
-/// A simple spinlock.
-pub(crate) struct Spinlock<T> {
- flag: AtomicBool,
- value: UnsafeCell<T>,
-}
-
-impl<T> Spinlock<T> {
- /// Returns a new spinlock initialized with `value`.
- pub(crate) fn new(value: T) -> Spinlock<T> {
- Spinlock {
- flag: AtomicBool::new(false),
- value: UnsafeCell::new(value),
- }
- }
-
- /// Locks the spinlock.
- pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> {
- let backoff = Backoff::new();
- while self.flag.swap(true, Ordering::Acquire) {
- backoff.snooze();
- }
- SpinlockGuard { parent: self }
- }
-}
-
-/// A guard holding a spinlock locked.
-pub(crate) struct SpinlockGuard<'a, T> {
- parent: &'a Spinlock<T>,
-}
-
-impl<T> Drop for SpinlockGuard<'_, T> {
- fn drop(&mut self) {
- self.parent.flag.store(false, Ordering::Release);
- }
-}
-
-impl<T> Deref for SpinlockGuard<'_, T> {
- type Target = T;
-
- fn deref(&self) -> &T {
- unsafe { &*self.parent.value.get() }
- }
-}
-
-impl<T> DerefMut for SpinlockGuard<'_, T> {
- fn deref_mut(&mut self) -> &mut T {
- unsafe { &mut *self.parent.value.get() }
- }
-}
diff --git a/src/waker.rs b/src/waker.rs
index dec73a9..7eb58ba 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -2,11 +2,11 @@
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
use std::thread::{self, ThreadId};
use crate::context::Context;
use crate::select::{Operation, Selected};
-use crate::utils::Spinlock;
/// Represents a thread blocked on a specific channel operation.
pub(crate) struct Entry {
@@ -77,26 +77,32 @@ impl Waker {
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
- self.selectors
- .iter()
- .position(|selector| {
- // Does the entry belong to a different thread?
- selector.cx.thread_id() != current_thread_id()
- && selector // Try selecting this operation.
- .cx
- .try_select(Selected::Operation(selector.oper))
- .is_ok()
- && {
- // Provide the packet.
- selector.cx.store_packet(selector.packet);
- // Wake the thread up.
- selector.cx.unpark();
- true
- }
- })
- // Remove the entry from the queue to keep it clean and improve
- // performance.
- .map(|pos| self.selectors.remove(pos))
+ if self.selectors.is_empty() {
+ None
+ } else {
+ let thread_id = current_thread_id();
+
+ self.selectors
+ .iter()
+ .position(|selector| {
+ // Does the entry belong to a different thread?
+ selector.cx.thread_id() != thread_id
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
+ // Provide the packet.
+ selector.cx.store_packet(selector.packet);
+ // Wake the thread up.
+ selector.cx.unpark();
+ true
+ }
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
+ }
}
/// Returns `true` if there is an entry which can be selected by the current thread.
@@ -170,7 +176,7 @@ impl Drop for Waker {
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
pub(crate) struct SyncWaker {
/// The inner `Waker`.
- inner: Spinlock<Waker>,
+ inner: Mutex<Waker>,
/// `true` if the waker is empty.
is_empty: AtomicBool,
@@ -181,7 +187,7 @@ impl SyncWaker {
#[inline]
pub(crate) fn new() -> Self {
SyncWaker {
- inner: Spinlock::new(Waker::new()),
+ inner: Mutex::new(Waker::new()),
is_empty: AtomicBool::new(true),
}
}
@@ -189,7 +195,7 @@ impl SyncWaker {
/// Registers the current thread with an operation.
#[inline]
pub(crate) fn register(&self, oper: Operation, cx: &Context) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.register(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -200,7 +206,7 @@ impl SyncWaker {
/// Unregisters an operation previously registered by the current thread.
#[inline]
pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
let entry = inner.unregister(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -213,7 +219,7 @@ impl SyncWaker {
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
if !self.is_empty.load(Ordering::SeqCst) {
inner.try_select();
inner.notify();
@@ -228,7 +234,7 @@ impl SyncWaker {
/// Registers an operation waiting to be ready.
#[inline]
pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.watch(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -239,7 +245,7 @@ impl SyncWaker {
/// Unregisters an operation waiting to be ready.
#[inline]
pub(crate) fn unwatch(&self, oper: Operation) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.unwatch(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -250,7 +256,7 @@ impl SyncWaker {
/// Notifies all threads that the channel is disconnected.
#[inline]
pub(crate) fn disconnect(&self) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.disconnect();
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
diff --git a/tests/array.rs b/tests/array.rs
index bb2cebe..6fd8ffc 100644
--- a/tests/array.rs
+++ b/tests/array.rs
@@ -1,7 +1,5 @@
//! Tests for the array channel flavor.
-#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
-
use std::any::Any;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -254,7 +252,13 @@ fn recv_after_disconnect() {
#[test]
fn len() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
+ #[cfg(miri)]
+ const CAP: usize = 50;
+ #[cfg(not(miri))]
const CAP: usize = 1000;
let (s, r) = bounded(CAP);
@@ -347,6 +351,9 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (s, r) = bounded(3);
@@ -369,6 +376,9 @@ fn spsc() {
#[test]
fn mpmc() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -401,6 +411,9 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
for _ in 0..COUNT {
@@ -416,6 +429,9 @@ fn stress_oneshot() {
#[test]
fn stress_iter() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (request_s, request_r) = bounded(1);
@@ -483,7 +499,14 @@ fn stress_timeout_two_threads() {
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 10;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -499,7 +522,7 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..10_000);
+ let steps = rng.gen_range(0..STEPS);
let additional = rng.gen_range(0..50);
DROPS.store(0, Ordering::SeqCst);
@@ -533,6 +556,9 @@ fn drops() {
#[test]
fn linearizable() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -553,6 +579,9 @@ fn linearizable() {
#[test]
fn fairness() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -575,6 +604,9 @@ fn fairness() {
#[test]
fn fairness_duplicates() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s, r) = bounded::<()>(COUNT);
@@ -619,6 +651,9 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
@@ -654,3 +689,56 @@ fn channel_through_channel() {
})
.unwrap();
}
+
+#[test]
+fn panic_on_drop() {
+ struct Msg1<'a>(&'a mut bool);
+ impl Drop for Msg1<'_> {
+ fn drop(&mut self) {
+ if *self.0 && !std::thread::panicking() {
+ panic!("double drop");
+ } else {
+ *self.0 = true;
+ }
+ }
+ }
+
+ struct Msg2<'a>(&'a mut bool);
+ impl Drop for Msg2<'_> {
+ fn drop(&mut self) {
+ if *self.0 {
+ panic!("double drop");
+ } else {
+ *self.0 = true;
+ panic!("first drop");
+ }
+ }
+ }
+
+ // normal
+ let (s, r) = bounded(2);
+ let (mut a, mut b) = (false, false);
+ s.send(Msg1(&mut a)).unwrap();
+ s.send(Msg1(&mut b)).unwrap();
+ drop(s);
+ drop(r);
+ assert!(a);
+ assert!(b);
+
+ // panic on drop
+ let (s, r) = bounded(2);
+ let (mut a, mut b) = (false, false);
+ s.send(Msg2(&mut a)).unwrap();
+ s.send(Msg2(&mut b)).unwrap();
+ drop(s);
+ let res = std::panic::catch_unwind(move || {
+ drop(r);
+ });
+ assert_eq!(
+ *res.unwrap_err().downcast_ref::<&str>().unwrap(),
+ "first drop"
+ );
+ assert!(a);
+ // Elements after the panicked element will leak.
+ assert!(!b);
+}
diff --git a/tests/golang.rs b/tests/golang.rs
index 05d67f6..41149f4 100644
--- a/tests/golang.rs
+++ b/tests/golang.rs
@@ -9,18 +9,18 @@
//! - https://golang.org/LICENSE
//! - https://golang.org/PATENTS
-#![allow(clippy::mutex_atomic, clippy::redundant_clone)]
+#![allow(clippy::redundant_clone)]
use std::alloc::{GlobalAlloc, Layout, System};
use std::any::Any;
use std::cell::Cell;
use std::collections::HashMap;
-use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
+use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
-use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender};
+use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
@@ -32,7 +32,13 @@ struct Chan<T> {
struct ChanInner<T> {
s: Option<Sender<T>>,
- r: Receiver<T>,
+ r: Option<Receiver<T>>,
+ // Receiver to use when r is None (Go blocks on receiving from nil)
+ nil_r: Receiver<T>,
+ // Sender to use when s is None (Go blocks on sending to nil)
+ nil_s: Sender<T>,
+ // Hold this receiver to prevent nil sender channel from disconnection
+ _nil_sr: Receiver<T>,
}
impl<T> Clone for Chan<T> {
@@ -57,35 +63,53 @@ impl<T> Chan<T> {
}
fn try_recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.try_recv().ok()
}
fn recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.recv().ok()
}
- fn close(&self) {
+ fn close_s(&self) {
self.inner
.lock()
.unwrap()
.s
.take()
- .expect("channel already closed");
+ .expect("channel sender already closed");
+ }
+
+ fn close_r(&self) {
+ self.inner
+ .lock()
+ .unwrap()
+ .r
+ .take()
+ .expect("channel receiver already closed");
+ }
+
+ fn has_rx(&self) -> bool {
+ self.inner.lock().unwrap().r.is_some()
+ }
+
+ fn has_tx(&self) -> bool {
+ self.inner.lock().unwrap().s.is_some()
}
fn rx(&self) -> Receiver<T> {
- self.inner.lock().unwrap().r.clone()
+ let inner = self.inner.lock().unwrap();
+ match inner.r.as_ref() {
+ None => inner.nil_r.clone(),
+ Some(r) => r.clone(),
+ }
}
fn tx(&self) -> Sender<T> {
- match self.inner.lock().unwrap().s.as_ref() {
- None => {
- let (s, r) = bounded(0);
- std::mem::forget(r);
- s
- }
+ let inner = self.inner.lock().unwrap();
+ match inner.s.as_ref() {
+ None => inner.nil_s.clone(),
Some(s) => s.clone(),
}
}
@@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> {
fn make<T>(cap: usize) -> Chan<T> {
let (s, r) = bounded(cap);
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
fn make_unbounded<T>() -> Chan<T> {
let (s, r) = unbounded();
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
+
#[derive(Clone)]
struct WaitGroup(Arc<WaitGroupInner>);
@@ -199,14 +238,6 @@ macro_rules! defer {
}
macro_rules! go {
- (@parse ref $v:ident, $($tail:tt)*) => {{
- let ref $v = $v;
- go!(@parse $($tail)*)
- }};
- (@parse move $v:ident, $($tail:tt)*) => {{
- let $v = $v;
- go!(@parse $($tail)*)
- }};
(@parse $v:ident, $($tail:tt)*) => {{
let $v = $v.clone();
go!(@parse $($tail)*)
@@ -240,10 +271,10 @@ mod doubleselect {
const ITERATIONS: i32 = 10_000;
fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
- defer! { c1.close() }
- defer! { c2.close() }
- defer! { c3.close() }
- defer! { c4.close() }
+ defer! { c1.close_s() }
+ defer! { c2.close_s() }
+ defer! { c3.close_s() }
+ defer! { c4.close_s() }
for i in 0..n {
select! {
@@ -292,7 +323,7 @@ mod doubleselect {
done.recv();
done.recv();
done.recv();
- cmux.close();
+ cmux.close_s();
});
recver(cmux);
}
@@ -697,7 +728,7 @@ mod select2 {
use super::*;
#[cfg(miri)]
- const N: i32 = 1000;
+ const N: i32 = 200;
#[cfg(not(miri))]
const N: i32 = 100000;
@@ -892,6 +923,9 @@ mod sieve1 {
2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
89, 97,
];
+ #[cfg(miri)]
+ let a = &a[..10];
+
for item in a.iter() {
let x = primes.recv().unwrap();
if x != *item {
@@ -925,10 +959,15 @@ mod chan_test {
#[test]
fn test_chan() {
#[cfg(miri)]
- const N: i32 = 20;
+ const N: i32 = 12;
#[cfg(not(miri))]
const N: i32 = 200;
+ #[cfg(miri)]
+ const MESSAGES_COUNT: i32 = 20;
+ #[cfg(not(miri))]
+ const MESSAGES_COUNT: i32 = 100;
+
for cap in 0..N {
{
// Ensure that receive from empty chan blocks.
@@ -999,7 +1038,7 @@ mod chan_test {
for i in 0..cap {
c.send(i);
}
- c.close();
+ c.close_s();
for i in 0..cap {
let v = c.recv();
@@ -1027,7 +1066,7 @@ mod chan_test {
});
thread::sleep(ms(1));
- c.close();
+ c.close_s();
if !done.recv().unwrap() {
panic!();
@@ -1035,15 +1074,15 @@ mod chan_test {
}
{
- // Send 100 integers,
+ // Send many integers,
// ensure that we receive them non-corrupted in FIFO order.
let c = make::<i32>(cap as usize);
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1051,11 +1090,11 @@ mod chan_test {
// Same, but using recv2.
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1082,7 +1121,7 @@ mod chan_test {
}
});
- c.close();
+ c.close_s();
c.recv();
t.join().unwrap();
}
@@ -1149,7 +1188,7 @@ mod chan_test {
done.send(true);
});
- c2.close();
+ c2.close_s();
select! {
recv(c1.rx()) -> _ => {}
default => {}
@@ -1378,7 +1417,7 @@ mod chan_test {
);
}
- done.close();
+ done.close_s();
wg.wait();
}
@@ -1450,7 +1489,7 @@ mod chan_test {
fn test_multi_consumer() {
const NWORK: usize = 23;
#[cfg(miri)]
- const NITER: usize = 100;
+ const NITER: usize = 50;
#[cfg(not(miri))]
const NITER: usize = 271828;
@@ -1481,9 +1520,9 @@ mod chan_test {
*expect.lock().unwrap() += v;
q.send(v);
}
- q.close();
+ q.close_s();
wg.wait();
- r.close();
+ r.close_s();
});
let mut n = 0;
@@ -1542,7 +1581,502 @@ mod race_chan_test {
// https://github.com/golang/go/blob/master/test/ken/chan.go
mod chan {
- // TODO
+ use super::*;
+
+ const MESSAGES_PER_CHANEL: u32 = 76;
+ const MESSAGES_RANGE_LEN: u32 = 100;
+ const END: i32 = 10000;
+
+ struct ChanWithVals {
+ chan: Chan<i32>,
+ /// Next value to send
+ sv: Arc<AtomicI32>,
+ /// Next value to receive
+ rv: Arc<AtomicI32>,
+ }
+
+ struct Totals {
+ /// Total sent messages
+ tots: u32,
+ /// Total received messages
+ totr: u32,
+ }
+
+ struct Context {
+ nproc: Arc<Mutex<i32>>,
+ cval: Arc<Mutex<i32>>,
+ tot: Arc<Mutex<Totals>>,
+ nc: ChanWithVals,
+ randx: Arc<Mutex<i32>>,
+ }
+
+ impl ChanWithVals {
+ fn with_capacity(capacity: usize) -> Self {
+ ChanWithVals {
+ chan: make(capacity),
+ sv: Arc::new(AtomicI32::new(0)),
+ rv: Arc::new(AtomicI32::new(0)),
+ }
+ }
+
+ fn closed() -> Self {
+ let ch = ChanWithVals::with_capacity(0);
+ ch.chan.close_r();
+ ch.chan.close_s();
+ ch
+ }
+
+ fn rv(&self) -> i32 {
+ self.rv.load(SeqCst)
+ }
+
+ fn sv(&self) -> i32 {
+ self.sv.load(SeqCst)
+ }
+
+ fn send(&mut self, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.tots += 1
+ }
+ let esv = expect(self.sv(), self.sv());
+ self.sv.store(esv, SeqCst);
+ if self.sv() == END {
+ self.chan.close_s();
+ return true;
+ }
+ false
+ }
+
+ fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.totr += 1
+ }
+ let erv = expect(self.rv(), v);
+ self.rv.store(erv, SeqCst);
+ if self.rv() == END {
+ self.chan.close_r();
+ return true;
+ }
+ false
+ }
+ }
+
+ impl Clone for ChanWithVals {
+ fn clone(&self) -> Self {
+ ChanWithVals {
+ chan: self.chan.clone(),
+ sv: self.sv.clone(),
+ rv: self.rv.clone(),
+ }
+ }
+ }
+
+ impl Context {
+ fn nproc(&self) -> &Mutex<i32> {
+ self.nproc.as_ref()
+ }
+
+ fn cval(&self) -> &Mutex<i32> {
+ self.cval.as_ref()
+ }
+
+ fn tot(&self) -> &Mutex<Totals> {
+ self.tot.as_ref()
+ }
+
+ fn randx(&self) -> &Mutex<i32> {
+ self.randx.as_ref()
+ }
+ }
+
+ impl Clone for Context {
+ fn clone(&self) -> Self {
+ Context {
+ nproc: self.nproc.clone(),
+ cval: self.cval.clone(),
+ tot: self.tot.clone(),
+ nc: self.nc.clone(),
+ randx: self.randx.clone(),
+ }
+ }
+ }
+
+ fn nrand(n: i32, randx: &Mutex<i32>) -> i32 {
+ let mut randx = randx.lock().unwrap();
+ *randx += 10007;
+ if *randx >= 1000000 {
+ *randx -= 1000000
+ }
+ *randx % n
+ }
+
+ fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 {
+ let mut nproc = nproc.lock().unwrap();
+ *nproc += adjust;
+ *nproc
+ }
+
+ fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> {
+ let mut ca = Vec::<ChanWithVals>::with_capacity(n);
+ let mut cval = cval.lock().unwrap();
+ for _ in 0..n {
+ *cval += MESSAGES_RANGE_LEN as i32;
+ let chl = ChanWithVals::with_capacity(c);
+ chl.sv.store(*cval, SeqCst);
+ chl.rv.store(*cval, SeqCst);
+ ca.push(chl);
+ }
+ ca
+ }
+
+ fn expect(v: i32, v0: i32) -> i32 {
+ if v == v0 {
+ return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 {
+ END
+ } else {
+ v + 1
+ };
+ }
+ panic!("got {}, expected {}", v, v0 + 1);
+ }
+
+ fn send(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in 0..=nrand(10, ctx.randx()) {
+ thread::yield_now();
+ }
+ c.chan.tx().send(c.sv()).unwrap();
+ if c.send(ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn recv(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in (0..nrand(10, ctx.randx())).rev() {
+ thread::yield_now();
+ }
+ let v = c.chan.rx().recv().unwrap();
+ if c.recv(v, ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ fn sel(
+ mut r0: ChanWithVals,
+ mut r1: ChanWithVals,
+ mut r2: ChanWithVals,
+ mut r3: ChanWithVals,
+ mut s0: ChanWithVals,
+ mut s1: ChanWithVals,
+ mut s2: ChanWithVals,
+ mut s3: ChanWithVals,
+ ctx: Context,
+ ) {
+ let mut a = 0; // local chans running
+
+ if r0.chan.has_rx() {
+ a += 1;
+ }
+ if r1.chan.has_rx() {
+ a += 1;
+ }
+ if r2.chan.has_rx() {
+ a += 1;
+ }
+ if r3.chan.has_rx() {
+ a += 1;
+ }
+ if s0.chan.has_tx() {
+ a += 1;
+ }
+ if s1.chan.has_tx() {
+ a += 1;
+ }
+ if s2.chan.has_tx() {
+ a += 1;
+ }
+ if s3.chan.has_tx() {
+ a += 1;
+ }
+
+ loop {
+ for _ in 0..=nrand(5, ctx.randx()) {
+ thread::yield_now();
+ }
+ select! {
+ recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 },
+ send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 },
+ send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 },
+ send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 },
+ }
+ if a == 0 {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals {
+ vec.get(idx).unwrap().clone()
+ }
+
+ /// Direct send to direct recv
+ fn test1(c: ChanWithVals, ctx: &mut Context) {
+ change_nproc(2, ctx.nproc());
+ go!(c, ctx, send(c, ctx));
+ go!(c, ctx, recv(c, ctx));
+ }
+
+ /// Direct send to select recv
+ fn test2(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 0), ctx));
+ go!(ca, ctx, send(get(&ca, 1), ctx));
+ go!(ca, ctx, send(get(&ca, 2), ctx));
+ go!(ca, ctx, send(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to direct recv
+ fn test3(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 0), ctx));
+ go!(ca, ctx, recv(get(&ca, 1), ctx));
+ go!(ca, ctx, recv(get(&ca, 2), ctx));
+ go!(ca, ctx, recv(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 4 channels
+ fn test4(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 8 channels
+ fn test5(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 8, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ ctx,
+ )
+ );
+ }
+
+ // Direct and select send to direct and select recv
+ fn test6(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 12, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 4), ctx));
+ go!(ca, ctx, send(get(&ca, 5), ctx));
+ go!(ca, ctx, send(get(&ca, 6), ctx));
+ go!(ca, ctx, send(get(&ca, 7), ctx));
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 8), ctx));
+ go!(ca, ctx, recv(get(&ca, 9), ctx));
+ go!(ca, ctx, recv(get(&ca, 10), ctx));
+ go!(ca, ctx, recv(get(&ca, 11), ctx));
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 8),
+ get(&ca, 9),
+ get(&ca, 10),
+ get(&ca, 11),
+ ctx,
+ )
+ );
+ }
+
+ fn wait(ctx: &mut Context) {
+ thread::yield_now();
+ while change_nproc(0, ctx.nproc()) != 0 {
+ thread::yield_now();
+ }
+ }
+
+ fn tests(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+ test1(get(&ca, 0), ctx);
+ test1(get(&ca, 1), ctx);
+ test1(get(&ca, 2), ctx);
+ test1(get(&ca, 3), ctx);
+ wait(ctx);
+
+ test2(c, ctx);
+ wait(ctx);
+
+ test3(c, ctx);
+ wait(ctx);
+
+ test4(c, ctx);
+ wait(ctx);
+
+ test5(c, ctx);
+ wait(ctx);
+
+ test6(c, ctx);
+ wait(ctx);
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)] // Miri is too slow
+ fn main() {
+ let mut ctx = Context {
+ nproc: Arc::new(Mutex::new(0)),
+ cval: Arc::new(Mutex::new(0)),
+ tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })),
+ nc: ChanWithVals::closed(),
+ randx: Arc::new(Mutex::new(0)),
+ };
+
+ tests(0, &mut ctx);
+ tests(1, &mut ctx);
+ tests(10, &mut ctx);
+ tests(100, &mut ctx);
+
+ #[rustfmt::skip]
+ let t = 4 * // buffer sizes
+ (4*4 + // tests 1,2,3,4 channels
+ 8 + // test 5 channels
+ 12) * // test 6 channels
+ MESSAGES_PER_CHANEL; // sends/recvs on a channel
+
+ let tot = ctx.tot.lock().unwrap();
+ if tot.tots != t || tot.totr != t {
+ panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t);
+ }
+ }
}
// https://github.com/golang/go/blob/master/test/ken/chan1.go
@@ -1551,7 +2085,7 @@ mod chan1 {
// sent messages
#[cfg(miri)]
- const N: usize = 100;
+ const N: usize = 20;
#[cfg(not(miri))]
const N: usize = 1000;
// receiving "goroutines"
diff --git a/tests/list.rs b/tests/list.rs
index 619e1fc..ebe6f6f 100644
--- a/tests/list.rs
+++ b/tests/list.rs
@@ -67,6 +67,7 @@ fn len_empty_full() {
}
#[test]
+#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them
fn try_recv() {
let (s, r) = unbounded();
@@ -132,8 +133,13 @@ fn recv_timeout() {
#[test]
fn try_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.try_send(i), Ok(()));
}
@@ -143,8 +149,13 @@ fn try_send() {
#[test]
fn send() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.send(i), Ok(()));
}
@@ -154,8 +165,13 @@ fn send() {
#[test]
fn send_timeout() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
}
@@ -383,10 +399,16 @@ fn stress_timeout_two_threads() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 20;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -402,8 +424,8 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..10_000);
- let additional = rng.gen_range(0..1000);
+ let steps = rng.gen_range(0..STEPS);
+ let additional = rng.gen_range(0..STEPS / 10);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = unbounded::<DropCounter>();
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 4d6e179..d7cc8e2 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -321,7 +321,7 @@ mod channel_tests {
#[test]
fn stress() {
#[cfg(miri)]
- const COUNT: usize = 500;
+ const COUNT: usize = 100;
#[cfg(not(miri))]
const COUNT: usize = 10000;
@@ -339,25 +339,22 @@ mod channel_tests {
#[test]
fn stress_shared() {
- #[cfg(miri)]
- const AMT: u32 = 500;
- #[cfg(not(miri))]
- const AMT: u32 = 10000;
- const NTHREADS: u32 = 8;
+ let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
+ let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
let (tx, rx) = channel::<i32>();
let t = thread::spawn(move || {
- for _ in 0..AMT * NTHREADS {
+ for _ in 0..amt * nthreads {
assert_eq!(rx.recv().unwrap(), 1);
}
assert!(rx.try_recv().is_err());
});
- let mut ts = Vec::with_capacity(NTHREADS as usize);
- for _ in 0..NTHREADS {
+ let mut ts = Vec::with_capacity(nthreads as usize);
+ for _ in 0..nthreads {
let tx = tx.clone();
let t = thread::spawn(move || {
- for _ in 0..AMT {
+ for _ in 0..amt {
tx.send(1).unwrap();
}
});
@@ -747,7 +744,7 @@ mod channel_tests {
#[test]
fn recv_a_lot() {
#[cfg(miri)]
- const N: usize = 100;
+ const N: usize = 50;
#[cfg(not(miri))]
const N: usize = 10000;
diff --git a/tests/ready.rs b/tests/ready.rs
index d8dd6ce..6e3fb2b 100644
--- a/tests/ready.rs
+++ b/tests/ready.rs
@@ -229,6 +229,7 @@ fn default_when_disconnected() {
}
#[test]
+#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them
fn default_only() {
let start = Instant::now();
diff --git a/tests/select.rs b/tests/select.rs
index f24aed8..bc5824d 100644
--- a/tests/select.rs
+++ b/tests/select.rs
@@ -408,7 +408,6 @@ fn both_ready() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn loop_try() {
const RUNS: usize = 20;
@@ -694,7 +693,7 @@ fn nesting() {
#[test]
fn stress_recv() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -735,7 +734,7 @@ fn stress_recv() {
#[test]
fn stress_send() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -953,7 +952,7 @@ fn matching_with_leftover() {
#[test]
fn channel_through_channel() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 1000;
@@ -1014,7 +1013,7 @@ fn channel_through_channel() {
#[test]
fn linearizable_try() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -1069,7 +1068,7 @@ fn linearizable_try() {
#[test]
fn linearizable_timeout() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -1124,7 +1123,7 @@ fn linearizable_timeout() {
#[test]
fn fairness1() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1173,7 +1172,7 @@ fn fairness1() {
#[test]
fn fairness2() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1292,7 +1291,7 @@ fn send_and_clone() {
#[test]
fn reuse() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
diff --git a/tests/select_macro.rs b/tests/select_macro.rs
index 0b9a21a..119454c 100644
--- a/tests/select_macro.rs
+++ b/tests/select_macro.rs
@@ -284,7 +284,6 @@ fn both_ready() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn loop_try() {
const RUNS: usize = 20;
@@ -488,7 +487,7 @@ fn panic_receiver() {
#[test]
fn stress_recv() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1468,3 +1467,14 @@ fn disconnect_wakes_receiver() {
})
.unwrap();
}
+
+#[test]
+fn trailing_comma() {
+ let (s, r) = unbounded::<usize>();
+
+ select! {
+ send(s, 1,) -> _ => {},
+ recv(r,) -> _ => {},
+ default(ms(1000),) => {},
+ }
+}
diff --git a/tests/thread_locals.rs b/tests/thread_locals.rs
index effb6a1..fb4e577 100644
--- a/tests/thread_locals.rs
+++ b/tests/thread_locals.rs
@@ -1,6 +1,6 @@
//! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics.
-#![cfg(not(miri))] // error: abnormal termination: the evaluated program aborted execution
+#![cfg(not(miri))] // Miri detects that this test is buggy: the destructor of `FOO` uses `std::thread::current()`!
use std::thread;
use std::time::Duration;
diff --git a/tests/zero.rs b/tests/zero.rs
index ba41b1a..74c9a3e 100644
--- a/tests/zero.rs
+++ b/tests/zero.rs
@@ -188,7 +188,7 @@ fn send_timeout() {
#[test]
fn len() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 25_000;
@@ -253,7 +253,7 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -278,7 +278,7 @@ fn spsc() {
#[test]
fn mpmc() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -313,7 +313,7 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -328,9 +328,11 @@ fn stress_oneshot() {
}
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_iter() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
let (request_s, request_r) = bounded(0);
@@ -396,10 +398,16 @@ fn stress_timeout_two_threads() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 20;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -415,7 +423,7 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..3_000);
+ let steps = rng.gen_range(0..STEPS);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = bounded::<DropCounter>(0);
@@ -445,7 +453,7 @@ fn drops() {
#[test]
fn fairness() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -540,7 +548,7 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 1000;