aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid LeGare <legare@google.com>2022-06-29 18:47:01 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-06-29 18:47:01 +0000
commit8430278912a461368d1ae6a1444ab51932f4b486 (patch)
tree75a4e401ccfe098595fc5f85f17b162d1ea55998
parent134994aa018e129c4bca68272a1cb9f6144de116 (diff)
parent4ad4ad1d55bd53c183401441b7d36ae72e237817 (diff)
downloadcrossbeam-channel-8430278912a461368d1ae6a1444ab51932f4b486.tar.gz
Upgrade rust/crates/crossbeam-channel to 0.5.5 am: 45db964b0c am: b113b5566a am: dd3b0286c9 am: 4ad4ad1d55
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/crossbeam-channel/+/2137795 Change-Id: I476ec84f44c522de0b958e36bf66099637fc00fb Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp6
-rw-r--r--CHANGELOG.md20
-rw-r--r--Cargo.lock50
-rw-r--r--Cargo.toml19
-rw-r--r--Cargo.toml.orig2
-rw-r--r--METADATA8
-rw-r--r--src/channel.rs9
-rw-r--r--src/err.rs4
-rw-r--r--src/flavors/array.rs74
-rw-r--r--src/flavors/at.rs3
-rw-r--r--src/flavors/list.rs10
-rw-r--r--src/flavors/never.rs1
-rw-r--r--src/flavors/tick.rs4
-rw-r--r--src/flavors/zero.rs61
-rw-r--r--src/select.rs19
-rw-r--r--src/select_macro.rs60
-rw-r--r--src/utils.rs58
-rw-r--r--src/waker.rs64
-rw-r--r--tests/array.rs98
-rw-r--r--tests/golang.rs623
-rw-r--r--tests/list.rs35
-rw-r--r--tests/mpsc.rs6
-rw-r--r--tests/select.rs16
-rw-r--r--tests/select_macro.rs13
-rw-r--r--tests/zero.rs24
26 files changed, 948 insertions, 341 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f15a046..1ea95d0 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83"
+ "sha1": "16769892dc5e1ba14c8ea14e92e85f442e25f4aa"
},
"path_in_vcs": "crossbeam-channel"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index a5821d8..740fcd8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -47,7 +47,7 @@ rust_library {
host_supported: true,
crate_name: "crossbeam_channel",
cargo_env_compat: true,
- cargo_pkg_version: "0.5.2",
+ cargo_pkg_version: "0.5.5",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -59,4 +59,8 @@ rust_library {
"libcfg_if",
"libcrossbeam_utils",
],
+ apex_available: [
+ "//apex_available:platform",
+ "//apex_available:anyapex",
+ ],
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6bfd923..2f627cf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,22 @@
+# Version 0.5.5
+
+- Replace Spinlock with Mutex. (#835)
+
+# Version 0.5.4
+
+- Workaround a bug in upstream related to TLS access on AArch64 Linux. (#802)
+
+# Version 0.5.3
+
+**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details.
+
+- Fix panic on very large timeout. (#798)
+
# Version 0.5.2
-- Fix stacked borrows violations. (#763, #764)
+**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details.
+
+- Fix stacked borrows violations when `-Zmiri-tag-raw-pointers` is enabled. (#763, #764)
# Version 0.5.1
@@ -22,6 +38,8 @@
# Version 0.4.3
+**Note:** This release has been yanked. See [GHSA-v5m7-53cv-f3hx](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-v5m7-53cv-f3hx) for details.
+
- Change license to "MIT OR Apache-2.0".
# Version 0.4.2
diff --git a/Cargo.lock b/Cargo.lock
index feb6e75..cd6eb05 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10,7 +10,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
-version = "0.5.2"
+version = "0.5.5"
dependencies = [
"cfg-if",
"crossbeam-utils",
@@ -21,19 +21,19 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
-version = "0.8.6"
+version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
+checksum = "8ff1f980957787286a554052d03c7aee98d99cc32e09f6d45f0a814133c87978"
dependencies = [
"cfg-if",
- "lazy_static",
+ "once_cell",
]
[[package]]
name = "getrandom"
-version = "0.2.3"
+version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
+checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
dependencies = [
"cfg-if",
"libc",
@@ -50,16 +50,10 @@ dependencies = [
]
[[package]]
-name = "lazy_static"
-version = "1.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
-
-[[package]]
name = "libc"
-version = "0.2.112"
+version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
+checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
[[package]]
name = "num_cpus"
@@ -72,6 +66,12 @@ dependencies = [
]
[[package]]
+name = "once_cell"
+version = "1.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
+
+[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -79,14 +79,13 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "rand"
-version = "0.8.4"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
+checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
- "rand_hc",
]
[[package]]
@@ -109,19 +108,10 @@ dependencies = [
]
[[package]]
-name = "rand_hc"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
-dependencies = [
- "rand_core",
-]
-
-[[package]]
name = "signal-hook"
-version = "0.3.13"
+version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d"
+checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d"
dependencies = [
"libc",
"signal-hook-registry",
@@ -138,6 +128,6 @@ dependencies = [
[[package]]
name = "wasi"
-version = "0.10.2+wasi-snapshot-preview1"
+version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
diff --git a/Cargo.toml b/Cargo.toml
index 87a889f..c02f859 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,13 +13,25 @@
edition = "2018"
rust-version = "1.36"
name = "crossbeam-channel"
-version = "0.5.2"
+version = "0.5.5"
description = "Multi-producer multi-consumer channels for message passing"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel"
-keywords = ["channel", "mpmc", "select", "golang", "message"]
-categories = ["algorithms", "concurrency", "data-structures"]
+readme = "README.md"
+keywords = [
+ "channel",
+ "mpmc",
+ "select",
+ "golang",
+ "message",
+]
+categories = [
+ "algorithms",
+ "concurrency",
+ "data-structures",
+]
license = "MIT OR Apache-2.0"
repository = "https://github.com/crossbeam-rs/crossbeam"
+
[dependencies.cfg-if]
version = "1"
@@ -27,6 +39,7 @@ version = "1"
version = "0.8"
optional = true
default-features = false
+
[dev-dependencies.num_cpus]
version = "1.13.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 640a808..344092c 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,7 +4,7 @@ name = "crossbeam-channel"
# - Update CHANGELOG.md
# - Update README.md
# - Create "crossbeam-channel-X.Y.Z" git tag
-version = "0.5.2"
+version = "0.5.5"
edition = "2018"
rust-version = "1.36"
license = "MIT OR Apache-2.0"
diff --git a/METADATA b/METADATA
index 6cbb05f..8146f87 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate"
+ value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.5.crate"
}
- version: "0.5.2"
+ version: "0.5.5"
license_type: NOTICE
last_upgrade_date {
year: 2022
- month: 3
- day: 1
+ month: 6
+ day: 28
}
}
diff --git a/src/channel.rs b/src/channel.rs
index 8988235..800fe63 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -14,6 +14,7 @@ use crate::err::{
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
/// Creates a channel of unbounded capacity.
///
@@ -232,6 +233,8 @@ pub fn at(when: Instant) -> Receiver<Instant> {
///
/// Using a `never` channel to optionally add a timeout to [`select!`]:
///
+/// [`select!`]: crate::select!
+///
/// ```
/// use std::thread;
/// use std::time::Duration;
@@ -297,7 +300,7 @@ pub fn never<T>() -> Receiver<T> {
/// let ms = |ms| Duration::from_millis(ms);
///
/// // Returns `true` if `a` and `b` are very close `Instant`s.
-/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
+/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
///
/// let start = Instant::now();
/// let r = tick(ms(100));
@@ -471,7 +474,7 @@ impl<T> Sender<T> {
/// );
/// ```
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
- self.send_deadline(msg, Instant::now() + timeout)
+ self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout))
}
/// Waits for a message to be sent into the channel, but only until a given deadline.
@@ -861,7 +864,7 @@ impl<T> Receiver<T> {
/// );
/// ```
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- self.recv_deadline(Instant::now() + timeout)
+ self.recv_deadline(utils::convert_timeout_to_deadline(timeout))
}
/// Waits for a message to be received from the channel, but only before a given deadline.
diff --git a/src/err.rs b/src/err.rs
index 578acc6..18cb830 100644
--- a/src/err.rs
+++ b/src/err.rs
@@ -308,7 +308,6 @@ impl From<RecvError> for TryRecvError {
impl TryRecvError {
/// Returns `true` if the receive operation failed because the channel is empty.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
@@ -317,7 +316,6 @@ impl TryRecvError {
}
/// Returns `true` if the receive operation failed because the channel is disconnected.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_disconnected(&self) -> bool {
match self {
TryRecvError::Disconnected => true,
@@ -347,7 +345,6 @@ impl From<RecvError> for RecvTimeoutError {
impl RecvTimeoutError {
/// Returns `true` if the receive operation timed out.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_timeout(&self) -> bool {
match self {
RecvTimeoutError::Timeout => true,
@@ -356,7 +353,6 @@ impl RecvTimeoutError {
}
/// Returns `true` if the receive operation failed because the channel is disconnected.
- #[allow(clippy::trivially_copy_pass_by_ref)]
pub fn is_disconnected(&self) -> bool {
match self {
RecvTimeoutError::Disconnected => true,
diff --git a/src/flavors/array.rs b/src/flavors/array.rs
index 871768c..73557d3 100644
--- a/src/flavors/array.rs
+++ b/src/flavors/array.rs
@@ -9,7 +9,6 @@
//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
use std::cell::UnsafeCell;
-use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
@@ -33,7 +32,7 @@ struct Slot<T> {
/// The token type for the array flavor.
#[derive(Debug)]
-pub struct ArrayToken {
+pub(crate) struct ArrayToken {
/// Slot to read from or write to.
slot: *const u8,
@@ -72,7 +71,7 @@ pub(crate) struct Channel<T> {
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
- buffer: *mut Slot<T>,
+ buffer: Box<[Slot<T>]>,
/// The channel capacity.
cap: usize,
@@ -88,9 +87,6 @@ pub(crate) struct Channel<T> {
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
-
- /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
- _marker: PhantomData<T>,
}
impl<T> Channel<T> {
@@ -109,18 +105,15 @@ impl<T> Channel<T> {
// Allocate a buffer of `cap` slots initialized
// with stamps.
- let buffer = {
- let boxed: Box<[Slot<T>]> = (0..cap)
- .map(|i| {
- // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
- Slot {
- stamp: AtomicUsize::new(i),
- msg: UnsafeCell::new(MaybeUninit::uninit()),
- }
- })
- .collect();
- Box::into_raw(boxed) as *mut Slot<T>
- };
+ let buffer: Box<[Slot<T>]> = (0..cap)
+ .map(|i| {
+ // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+ Slot {
+ stamp: AtomicUsize::new(i),
+ msg: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ })
+ .collect();
Channel {
buffer,
@@ -131,7 +124,6 @@ impl<T> Channel<T> {
tail: CachePadded::new(AtomicUsize::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
- _marker: PhantomData,
}
}
@@ -163,7 +155,8 @@ impl<T> Channel<T> {
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
- let slot = unsafe { &*self.buffer.add(index) };
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
@@ -245,7 +238,8 @@ impl<T> Channel<T> {
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
- let slot = unsafe { &*self.buffer.add(index) };
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
@@ -475,7 +469,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(self.cap)
}
@@ -528,10 +521,24 @@ impl<T> Channel<T> {
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
- let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
+ let head = *self.head.get_mut();
+ let tail = *self.tail.get_mut();
+
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ let len = if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
// Loop over all slots that hold a message and drop them.
- for i in 0..self.len() {
+ for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i
@@ -540,23 +547,12 @@ impl<T> Drop for Channel<T> {
};
unsafe {
- let p = {
- let slot = &mut *self.buffer.add(index);
- let msg = &mut *slot.msg.get();
- msg.as_mut_ptr()
- };
- p.drop_in_place();
+ debug_assert!(index < self.buffer.len());
+ let slot = self.buffer.get_unchecked_mut(index);
+ let msg = &mut *slot.msg.get();
+ msg.as_mut_ptr().drop_in_place();
}
}
-
- // Finally, deallocate the buffer, but don't run any destructors.
- unsafe {
- // Create a slice from the buffer to make
- // a fat pointer. Then, use Box::from_raw
- // to deallocate it.
- let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
- Box::from_raw(ptr);
- }
}
}
diff --git a/src/flavors/at.rs b/src/flavors/at.rs
index 4581edb..ca5ee60 100644
--- a/src/flavors/at.rs
+++ b/src/flavors/at.rs
@@ -35,7 +35,7 @@ impl Channel {
/// Creates a channel that delivers a message after a certain duration of time.
#[inline]
pub(crate) fn new_timeout(dur: Duration) -> Self {
- Self::new_deadline(Instant::now() + dur)
+ Self::new_deadline(utils::convert_timeout_to_deadline(dur))
}
/// Attempts to receive a message without blocking.
@@ -142,7 +142,6 @@ impl Channel {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
diff --git a/src/flavors/list.rs b/src/flavors/list.rs
index 5056aa4..9bda6d1 100644
--- a/src/flavors/list.rs
+++ b/src/flavors/list.rs
@@ -126,7 +126,7 @@ struct Position<T> {
/// The token type for the list flavor.
#[derive(Debug)]
-pub struct ListToken {
+pub(crate) struct ListToken {
/// The block of slots.
block: *const u8,
@@ -634,9 +634,9 @@ impl<T> Channel<T> {
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
- let mut head = self.head.index.load(Ordering::Relaxed);
- let mut tail = self.tail.index.load(Ordering::Relaxed);
- let mut block = self.head.block.load(Ordering::Relaxed);
+ let mut head = *self.head.index.get_mut();
+ let mut tail = *self.tail.index.get_mut();
+ let mut block = *self.head.block.get_mut();
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
@@ -654,7 +654,7 @@ impl<T> Drop for Channel<T> {
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
- let next = (*block).next.load(Ordering::Relaxed);
+ let next = *(*block).next.get_mut();
drop(Box::from_raw(block));
block = next;
}
diff --git a/src/flavors/never.rs b/src/flavors/never.rs
index 1951e96..277a61d 100644
--- a/src/flavors/never.rs
+++ b/src/flavors/never.rs
@@ -65,7 +65,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs
index d4b1f6c..4201b6e 100644
--- a/src/flavors/tick.rs
+++ b/src/flavors/tick.rs
@@ -10,6 +10,7 @@ use crossbeam_utils::atomic::AtomicCell;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
/// Result of a receive operation.
pub(crate) type TickToken = Option<Instant>;
@@ -28,7 +29,7 @@ impl Channel {
#[inline]
pub(crate) fn new(dur: Duration) -> Self {
Channel {
- delivery_time: AtomicCell::new(Instant::now() + dur),
+ delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)),
duration: dur,
}
}
@@ -112,7 +113,6 @@ impl Channel {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
index 4afbd8f..31e62af 100644
--- a/src/flavors/zero.rs
+++ b/src/flavors/zero.rs
@@ -5,6 +5,7 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
use std::time::Instant;
use std::{fmt, ptr};
@@ -13,11 +14,10 @@ use crossbeam_utils::Backoff;
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
-use crate::utils::Spinlock;
use crate::waker::Waker;
/// A pointer to a packet.
-pub struct ZeroToken(*mut ());
+pub(crate) struct ZeroToken(*mut ());
impl Default for ZeroToken {
fn default() -> Self {
@@ -95,7 +95,7 @@ struct Inner {
/// Zero-capacity channel.
pub(crate) struct Channel<T> {
/// Inner representation of the channel.
- inner: Spinlock<Inner>,
+ inner: Mutex<Inner>,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
@@ -105,7 +105,7 @@ impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
pub(crate) fn new() -> Self {
Channel {
- inner: Spinlock::new(Inner {
+ inner: Mutex::new(Inner {
senders: Waker::new(),
receivers: Waker::new(),
is_disconnected: false,
@@ -126,7 +126,7 @@ impl<T> Channel<T> {
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -155,7 +155,7 @@ impl<T> Channel<T> {
/// Attempts to pair up with a sender.
fn start_recv(&self, token: &mut Token) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -198,7 +198,7 @@ impl<T> Channel<T> {
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -222,7 +222,7 @@ impl<T> Channel<T> {
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
@@ -254,12 +254,12 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
- self.inner.lock().senders.unregister(oper).unwrap();
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Timeout(msg))
}
Selected::Disconnected => {
- self.inner.lock().senders.unregister(oper).unwrap();
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Disconnected(msg))
}
@@ -275,7 +275,7 @@ impl<T> Channel<T> {
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -292,7 +292,7 @@ impl<T> Channel<T> {
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
@@ -325,11 +325,21 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
- self.inner.lock().receivers.unregister(oper).unwrap();
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
Err(RecvTimeoutError::Timeout)
}
Selected::Disconnected => {
- self.inner.lock().receivers.unregister(oper).unwrap();
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
Err(RecvTimeoutError::Disconnected)
}
Selected::Operation(_) => {
@@ -345,7 +355,7 @@ impl<T> Channel<T> {
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
if !inner.is_disconnected {
inner.is_disconnected = true;
@@ -363,7 +373,6 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- #[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
@@ -397,7 +406,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner
.receivers
.register_with_packet(oper, packet as *mut (), cx);
@@ -406,7 +415,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn unregister(&self, oper: Operation) {
- if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
+ if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet as *mut Packet<T>));
}
@@ -419,18 +428,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn is_ready(&self) -> bool {
- let inner = self.0.inner.lock();
+ let inner = self.0.inner.lock().unwrap();
inner.senders.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.receivers.watch(oper, cx);
inner.senders.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.receivers.unwatch(oper);
}
}
@@ -447,7 +456,7 @@ impl<T> SelectHandle for Sender<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner
.senders
.register_with_packet(oper, packet as *mut (), cx);
@@ -456,7 +465,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn unregister(&self, oper: Operation) {
- if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
+ if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet as *mut Packet<T>));
}
@@ -469,18 +478,18 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn is_ready(&self) -> bool {
- let inner = self.0.inner.lock();
+ let inner = self.0.inner.lock().unwrap();
inner.receivers.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.senders.watch(oper, cx);
inner.receivers.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
- let mut inner = self.0.inner.lock();
+ let mut inner = self.0.inner.lock().unwrap();
inner.senders.unwatch(oper);
}
}
diff --git a/src/select.rs b/src/select.rs
index 6103ef4..57d67a3 100644
--- a/src/select.rs
+++ b/src/select.rs
@@ -22,12 +22,13 @@ use crate::utils;
// This is a private API that is used by the select macro.
#[derive(Debug, Default)]
pub struct Token {
- pub at: flavors::at::AtToken,
- pub array: flavors::array::ArrayToken,
- pub list: flavors::list::ListToken,
- pub never: flavors::never::NeverToken,
- pub tick: flavors::tick::TickToken,
- pub zero: flavors::zero::ZeroToken,
+ pub(crate) at: flavors::at::AtToken,
+ pub(crate) array: flavors::array::ArrayToken,
+ pub(crate) list: flavors::list::ListToken,
+ #[allow(dead_code)]
+ pub(crate) never: flavors::never::NeverToken,
+ pub(crate) tick: flavors::tick::TickToken,
+ pub(crate) zero: flavors::zero::ZeroToken,
}
/// Identifier associated with an operation by a specific thread on a specific channel.
@@ -486,7 +487,7 @@ pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
- select_deadline(handles, Instant::now() + timeout)
+ select_deadline(handles, utils::convert_timeout_to_deadline(timeout))
}
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
@@ -518,6 +519,8 @@ pub(crate) fn select_deadline<'a>(
/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
/// dynamically created list of channel operations.
///
+/// [`select!`]: crate::select!
+///
/// Once a list of operations has been built with `Select`, there are two different ways of
/// proceeding:
///
@@ -1042,7 +1045,7 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
- self.ready_deadline(Instant::now() + timeout)
+ self.ready_deadline(utils::convert_timeout_to_deadline(timeout))
}
/// Blocks until a given deadline, or until one of the operations becomes ready.
diff --git a/src/select_macro.rs b/src/select_macro.rs
index f8b247e..efe0ae4 100644
--- a/src/select_macro.rs
+++ b/src/select_macro.rs
@@ -121,18 +121,7 @@ macro_rules! crossbeam_channel_internal {
};
// Only one case remains.
(@list
- ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr)
- ($($head:tt)*)
- ) => {
- $crate::crossbeam_channel_internal!(
- @list
- ()
- ($($head)* $case ($($args)*) $(-> $res)* => { $body },)
- )
- };
- // Accept a trailing comma at the end of the list.
- (@list
- ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr,)
+ ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr $(,)?)
($($head:tt)*)
) => {
$crate::crossbeam_channel_internal!(
@@ -373,20 +362,7 @@ macro_rules! crossbeam_channel_internal {
// Check the format of a recv case.
(@case
- (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*)
- ($($cases:tt)*)
- $default:tt
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- ($($cases)* recv($r) -> $res => $body,)
- $default
- )
- };
- // Allow trailing comma...
- (@case
- (recv($r:expr,) -> $res:pat => $body:tt, $($tail:tt)*)
+ (recv($r:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*)
($($cases:tt)*)
$default:tt
) => {
@@ -428,20 +404,7 @@ macro_rules! crossbeam_channel_internal {
// Check the format of a send case.
(@case
- (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*)
- ($($cases:tt)*)
- $default:tt
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- ($($cases)* send($s, $m) -> $res => $body,)
- $default
- )
- };
- // Allow trailing comma...
- (@case
- (send($s:expr, $m:expr,) -> $res:pat => $body:tt, $($tail:tt)*)
+ (send($s:expr, $m:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*)
($($cases:tt)*)
$default:tt
) => {
@@ -496,20 +459,7 @@ macro_rules! crossbeam_channel_internal {
};
// Check the format of a default case with timeout.
(@case
- (default($timeout:expr) => $body:tt, $($tail:tt)*)
- $cases:tt
- ()
- ) => {
- $crate::crossbeam_channel_internal!(
- @case
- ($($tail)*)
- $cases
- (default($timeout) => $body,)
- )
- };
- // Allow trailing comma...
- (@case
- (default($timeout:expr,) => $body:tt, $($tail:tt)*)
+ (default($timeout:expr $(,)?) => $body:tt, $($tail:tt)*)
$cases:tt
()
) => {
@@ -1043,7 +993,7 @@ macro_rules! crossbeam_channel_internal {
/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
/// when it will simply return an error because the channel is disconnected.
///
-/// The `select` macro is a convenience wrapper around [`Select`]. However, it cannot select over a
+/// The `select!` macro is a convenience wrapper around [`Select`]. However, it cannot select over a
/// dynamically created list of channel operations.
///
/// [`Select`]: super::Select
diff --git a/src/utils.rs b/src/utils.rs
index 557b6a0..9f14c8e 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -1,14 +1,10 @@
//! Miscellaneous utilities.
-use std::cell::{Cell, UnsafeCell};
+use std::cell::Cell;
use std::num::Wrapping;
-use std::ops::{Deref, DerefMut};
-use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
-use crossbeam_utils::Backoff;
-
/// Randomly shuffles a slice.
pub(crate) fn shuffle<T>(v: &mut [T]) {
let len = v.len();
@@ -61,52 +57,10 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
}
}
-/// A simple spinlock.
-pub(crate) struct Spinlock<T> {
- flag: AtomicBool,
- value: UnsafeCell<T>,
-}
-
-impl<T> Spinlock<T> {
- /// Returns a new spinlock initialized with `value`.
- pub(crate) fn new(value: T) -> Spinlock<T> {
- Spinlock {
- flag: AtomicBool::new(false),
- value: UnsafeCell::new(value),
- }
- }
-
- /// Locks the spinlock.
- pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> {
- let backoff = Backoff::new();
- while self.flag.swap(true, Ordering::Acquire) {
- backoff.snooze();
- }
- SpinlockGuard { parent: self }
- }
-}
-
-/// A guard holding a spinlock locked.
-pub(crate) struct SpinlockGuard<'a, T> {
- parent: &'a Spinlock<T>,
-}
-
-impl<T> Drop for SpinlockGuard<'_, T> {
- fn drop(&mut self) {
- self.parent.flag.store(false, Ordering::Release);
- }
-}
-
-impl<T> Deref for SpinlockGuard<'_, T> {
- type Target = T;
-
- fn deref(&self) -> &T {
- unsafe { &*self.parent.value.get() }
- }
-}
-
-impl<T> DerefMut for SpinlockGuard<'_, T> {
- fn deref_mut(&mut self) -> &mut T {
- unsafe { &mut *self.parent.value.get() }
+// https://github.com/crossbeam-rs/crossbeam/issues/795
+pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => deadline,
+ None => Instant::now() + Duration::from_secs(86400 * 365 * 30),
}
}
diff --git a/src/waker.rs b/src/waker.rs
index dec73a9..7eb58ba 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -2,11 +2,11 @@
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
use std::thread::{self, ThreadId};
use crate::context::Context;
use crate::select::{Operation, Selected};
-use crate::utils::Spinlock;
/// Represents a thread blocked on a specific channel operation.
pub(crate) struct Entry {
@@ -77,26 +77,32 @@ impl Waker {
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
- self.selectors
- .iter()
- .position(|selector| {
- // Does the entry belong to a different thread?
- selector.cx.thread_id() != current_thread_id()
- && selector // Try selecting this operation.
- .cx
- .try_select(Selected::Operation(selector.oper))
- .is_ok()
- && {
- // Provide the packet.
- selector.cx.store_packet(selector.packet);
- // Wake the thread up.
- selector.cx.unpark();
- true
- }
- })
- // Remove the entry from the queue to keep it clean and improve
- // performance.
- .map(|pos| self.selectors.remove(pos))
+ if self.selectors.is_empty() {
+ None
+ } else {
+ let thread_id = current_thread_id();
+
+ self.selectors
+ .iter()
+ .position(|selector| {
+ // Does the entry belong to a different thread?
+ selector.cx.thread_id() != thread_id
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
+ // Provide the packet.
+ selector.cx.store_packet(selector.packet);
+ // Wake the thread up.
+ selector.cx.unpark();
+ true
+ }
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
+ }
}
/// Returns `true` if there is an entry which can be selected by the current thread.
@@ -170,7 +176,7 @@ impl Drop for Waker {
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
pub(crate) struct SyncWaker {
/// The inner `Waker`.
- inner: Spinlock<Waker>,
+ inner: Mutex<Waker>,
/// `true` if the waker is empty.
is_empty: AtomicBool,
@@ -181,7 +187,7 @@ impl SyncWaker {
#[inline]
pub(crate) fn new() -> Self {
SyncWaker {
- inner: Spinlock::new(Waker::new()),
+ inner: Mutex::new(Waker::new()),
is_empty: AtomicBool::new(true),
}
}
@@ -189,7 +195,7 @@ impl SyncWaker {
/// Registers the current thread with an operation.
#[inline]
pub(crate) fn register(&self, oper: Operation, cx: &Context) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.register(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -200,7 +206,7 @@ impl SyncWaker {
/// Unregisters an operation previously registered by the current thread.
#[inline]
pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
let entry = inner.unregister(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -213,7 +219,7 @@ impl SyncWaker {
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
if !self.is_empty.load(Ordering::SeqCst) {
inner.try_select();
inner.notify();
@@ -228,7 +234,7 @@ impl SyncWaker {
/// Registers an operation waiting to be ready.
#[inline]
pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.watch(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -239,7 +245,7 @@ impl SyncWaker {
/// Unregisters an operation waiting to be ready.
#[inline]
pub(crate) fn unwatch(&self, oper: Operation) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.unwatch(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
@@ -250,7 +256,7 @@ impl SyncWaker {
/// Notifies all threads that the channel is disconnected.
#[inline]
pub(crate) fn disconnect(&self) {
- let mut inner = self.inner.lock();
+ let mut inner = self.inner.lock().unwrap();
inner.disconnect();
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
diff --git a/tests/array.rs b/tests/array.rs
index bb2cebe..de843cd 100644
--- a/tests/array.rs
+++ b/tests/array.rs
@@ -1,7 +1,5 @@
//! Tests for the array channel flavor.
-#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
-
use std::any::Any;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -254,7 +252,13 @@ fn recv_after_disconnect() {
#[test]
fn len() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
+ #[cfg(miri)]
+ const CAP: usize = 50;
+ #[cfg(not(miri))]
const CAP: usize = 1000;
let (s, r) = bounded(CAP);
@@ -347,6 +351,9 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (s, r) = bounded(3);
@@ -369,6 +376,9 @@ fn spsc() {
#[test]
fn mpmc() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -401,6 +411,9 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
for _ in 0..COUNT {
@@ -416,6 +429,9 @@ fn stress_oneshot() {
#[test]
fn stress_iter() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 100_000;
let (request_s, request_r) = bounded(1);
@@ -483,7 +499,14 @@ fn stress_timeout_two_threads() {
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 10;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -499,7 +522,7 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..10_000);
+ let steps = rng.gen_range(0..STEPS);
let additional = rng.gen_range(0..50);
DROPS.store(0, Ordering::SeqCst);
@@ -509,12 +532,16 @@ fn drops() {
scope.spawn(|_| {
for _ in 0..steps {
r.recv().unwrap();
+ #[cfg(miri)]
+ std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388
}
});
scope.spawn(|_| {
for _ in 0..steps {
s.send(DropCounter).unwrap();
+ #[cfg(miri)]
+ std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388
}
});
})
@@ -533,6 +560,9 @@ fn drops() {
#[test]
fn linearizable() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -553,6 +583,9 @@ fn linearizable() {
#[test]
fn fairness() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
@@ -575,6 +608,9 @@ fn fairness() {
#[test]
fn fairness_duplicates() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 10_000;
let (s, r) = bounded::<()>(COUNT);
@@ -619,6 +655,9 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
+ #[cfg(miri)]
+ const COUNT: usize = 100;
+ #[cfg(not(miri))]
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
@@ -654,3 +693,56 @@ fn channel_through_channel() {
})
.unwrap();
}
+
+#[test]
+fn panic_on_drop() {
+ struct Msg1<'a>(&'a mut bool);
+ impl Drop for Msg1<'_> {
+ fn drop(&mut self) {
+ if *self.0 && !std::thread::panicking() {
+ panic!("double drop");
+ } else {
+ *self.0 = true;
+ }
+ }
+ }
+
+ struct Msg2<'a>(&'a mut bool);
+ impl Drop for Msg2<'_> {
+ fn drop(&mut self) {
+ if *self.0 {
+ panic!("double drop");
+ } else {
+ *self.0 = true;
+ panic!("first drop");
+ }
+ }
+ }
+
+ // normal
+ let (s, r) = bounded(2);
+ let (mut a, mut b) = (false, false);
+ s.send(Msg1(&mut a)).unwrap();
+ s.send(Msg1(&mut b)).unwrap();
+ drop(s);
+ drop(r);
+ assert!(a);
+ assert!(b);
+
+ // panic on drop
+ let (s, r) = bounded(2);
+ let (mut a, mut b) = (false, false);
+ s.send(Msg2(&mut a)).unwrap();
+ s.send(Msg2(&mut b)).unwrap();
+ drop(s);
+ let res = std::panic::catch_unwind(move || {
+ drop(r);
+ });
+ assert_eq!(
+ *res.unwrap_err().downcast_ref::<&str>().unwrap(),
+ "first drop"
+ );
+ assert!(a);
+ // Elements after the panicked element will leak.
+ assert!(!b);
+}
diff --git a/tests/golang.rs b/tests/golang.rs
index 05d67f6..6a46c03 100644
--- a/tests/golang.rs
+++ b/tests/golang.rs
@@ -15,12 +15,12 @@ use std::alloc::{GlobalAlloc, Layout, System};
use std::any::Any;
use std::cell::Cell;
use std::collections::HashMap;
-use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
+use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
-use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender};
+use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
@@ -32,7 +32,13 @@ struct Chan<T> {
struct ChanInner<T> {
s: Option<Sender<T>>,
- r: Receiver<T>,
+ r: Option<Receiver<T>>,
+ // Receiver to use when r is None (Go blocks on receiving from nil)
+ nil_r: Receiver<T>,
+ // Sender to use when s is None (Go blocks on sending to nil)
+ nil_s: Sender<T>,
+ // Hold this receiver to prevent nil sender channel from disconnection
+ _nil_sr: Receiver<T>,
}
impl<T> Clone for Chan<T> {
@@ -57,35 +63,53 @@ impl<T> Chan<T> {
}
fn try_recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.try_recv().ok()
}
fn recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.recv().ok()
}
- fn close(&self) {
+ fn close_s(&self) {
self.inner
.lock()
.unwrap()
.s
.take()
- .expect("channel already closed");
+ .expect("channel sender already closed");
+ }
+
+ fn close_r(&self) {
+ self.inner
+ .lock()
+ .unwrap()
+ .r
+ .take()
+ .expect("channel receiver already closed");
+ }
+
+ fn has_rx(&self) -> bool {
+ self.inner.lock().unwrap().r.is_some()
+ }
+
+ fn has_tx(&self) -> bool {
+ self.inner.lock().unwrap().s.is_some()
}
fn rx(&self) -> Receiver<T> {
- self.inner.lock().unwrap().r.clone()
+ let inner = self.inner.lock().unwrap();
+ match inner.r.as_ref() {
+ None => inner.nil_r.clone(),
+ Some(r) => r.clone(),
+ }
}
fn tx(&self) -> Sender<T> {
- match self.inner.lock().unwrap().s.as_ref() {
- None => {
- let (s, r) = bounded(0);
- std::mem::forget(r);
- s
- }
+ let inner = self.inner.lock().unwrap();
+ match inner.s.as_ref() {
+ None => inner.nil_s.clone(),
Some(s) => s.clone(),
}
}
@@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> {
fn make<T>(cap: usize) -> Chan<T> {
let (s, r) = bounded(cap);
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
fn make_unbounded<T>() -> Chan<T> {
let (s, r) = unbounded();
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
+
#[derive(Clone)]
struct WaitGroup(Arc<WaitGroupInner>);
@@ -199,14 +238,6 @@ macro_rules! defer {
}
macro_rules! go {
- (@parse ref $v:ident, $($tail:tt)*) => {{
- let ref $v = $v;
- go!(@parse $($tail)*)
- }};
- (@parse move $v:ident, $($tail:tt)*) => {{
- let $v = $v;
- go!(@parse $($tail)*)
- }};
(@parse $v:ident, $($tail:tt)*) => {{
let $v = $v.clone();
go!(@parse $($tail)*)
@@ -240,10 +271,10 @@ mod doubleselect {
const ITERATIONS: i32 = 10_000;
fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
- defer! { c1.close() }
- defer! { c2.close() }
- defer! { c3.close() }
- defer! { c4.close() }
+ defer! { c1.close_s() }
+ defer! { c2.close_s() }
+ defer! { c3.close_s() }
+ defer! { c4.close_s() }
for i in 0..n {
select! {
@@ -292,7 +323,7 @@ mod doubleselect {
done.recv();
done.recv();
done.recv();
- cmux.close();
+ cmux.close_s();
});
recver(cmux);
}
@@ -697,7 +728,7 @@ mod select2 {
use super::*;
#[cfg(miri)]
- const N: i32 = 1000;
+ const N: i32 = 200;
#[cfg(not(miri))]
const N: i32 = 100000;
@@ -892,6 +923,9 @@ mod sieve1 {
2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
89, 97,
];
+ #[cfg(miri)]
+ let a = &a[..10];
+
for item in a.iter() {
let x = primes.recv().unwrap();
if x != *item {
@@ -929,6 +963,11 @@ mod chan_test {
#[cfg(not(miri))]
const N: i32 = 200;
+ #[cfg(miri)]
+ const MESSAGES_COUNT: i32 = 20;
+ #[cfg(not(miri))]
+ const MESSAGES_COUNT: i32 = 100;
+
for cap in 0..N {
{
// Ensure that receive from empty chan blocks.
@@ -999,7 +1038,7 @@ mod chan_test {
for i in 0..cap {
c.send(i);
}
- c.close();
+ c.close_s();
for i in 0..cap {
let v = c.recv();
@@ -1027,7 +1066,7 @@ mod chan_test {
});
thread::sleep(ms(1));
- c.close();
+ c.close_s();
if !done.recv().unwrap() {
panic!();
@@ -1035,15 +1074,15 @@ mod chan_test {
}
{
- // Send 100 integers,
+ // Send many integers,
// ensure that we receive them non-corrupted in FIFO order.
let c = make::<i32>(cap as usize);
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1051,11 +1090,11 @@ mod chan_test {
// Same, but using recv2.
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1082,7 +1121,7 @@ mod chan_test {
}
});
- c.close();
+ c.close_s();
c.recv();
t.join().unwrap();
}
@@ -1149,7 +1188,7 @@ mod chan_test {
done.send(true);
});
- c2.close();
+ c2.close_s();
select! {
recv(c1.rx()) -> _ => {}
default => {}
@@ -1378,7 +1417,7 @@ mod chan_test {
);
}
- done.close();
+ done.close_s();
wg.wait();
}
@@ -1481,9 +1520,9 @@ mod chan_test {
*expect.lock().unwrap() += v;
q.send(v);
}
- q.close();
+ q.close_s();
wg.wait();
- r.close();
+ r.close_s();
});
let mut n = 0;
@@ -1541,8 +1580,504 @@ mod race_chan_test {
}
// https://github.com/golang/go/blob/master/test/ken/chan.go
+#[cfg(not(miri))] // Miri is too slow
mod chan {
- // TODO
+
+ use super::*;
+
+ const MESSAGES_PER_CHANEL: u32 = 76;
+ const MESSAGES_RANGE_LEN: u32 = 100;
+ const END: i32 = 10000;
+
+ struct ChanWithVals {
+ chan: Chan<i32>,
+ /// Next value to send
+ sv: Arc<AtomicI32>,
+ /// Next value to receive
+ rv: Arc<AtomicI32>,
+ }
+
+ struct Totals {
+ /// Total sent messages
+ tots: u32,
+ /// Total received messages
+ totr: u32,
+ }
+
+ struct Context {
+ nproc: Arc<Mutex<i32>>,
+ cval: Arc<Mutex<i32>>,
+ tot: Arc<Mutex<Totals>>,
+ nc: ChanWithVals,
+ randx: Arc<Mutex<i32>>,
+ }
+
+ impl ChanWithVals {
+ fn with_capacity(capacity: usize) -> Self {
+ ChanWithVals {
+ chan: make(capacity),
+ sv: Arc::new(AtomicI32::new(0)),
+ rv: Arc::new(AtomicI32::new(0)),
+ }
+ }
+
+ fn closed() -> Self {
+ let ch = ChanWithVals::with_capacity(0);
+ ch.chan.close_r();
+ ch.chan.close_s();
+ ch
+ }
+
+ fn rv(&self) -> i32 {
+ self.rv.load(SeqCst)
+ }
+
+ fn sv(&self) -> i32 {
+ self.sv.load(SeqCst)
+ }
+
+ fn send(&mut self, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.tots += 1
+ }
+ let esv = expect(self.sv(), self.sv());
+ self.sv.store(esv, SeqCst);
+ if self.sv() == END {
+ self.chan.close_s();
+ return true;
+ }
+ false
+ }
+
+ fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.totr += 1
+ }
+ let erv = expect(self.rv(), v);
+ self.rv.store(erv, SeqCst);
+ if self.rv() == END {
+ self.chan.close_r();
+ return true;
+ }
+ false
+ }
+ }
+
+ impl Clone for ChanWithVals {
+ fn clone(&self) -> Self {
+ ChanWithVals {
+ chan: self.chan.clone(),
+ sv: self.sv.clone(),
+ rv: self.rv.clone(),
+ }
+ }
+ }
+
+ impl Context {
+ fn nproc(&self) -> &Mutex<i32> {
+ self.nproc.as_ref()
+ }
+
+ fn cval(&self) -> &Mutex<i32> {
+ self.cval.as_ref()
+ }
+
+ fn tot(&self) -> &Mutex<Totals> {
+ self.tot.as_ref()
+ }
+
+ fn randx(&self) -> &Mutex<i32> {
+ self.randx.as_ref()
+ }
+ }
+
+ impl Clone for Context {
+ fn clone(&self) -> Self {
+ Context {
+ nproc: self.nproc.clone(),
+ cval: self.cval.clone(),
+ tot: self.tot.clone(),
+ nc: self.nc.clone(),
+ randx: self.randx.clone(),
+ }
+ }
+ }
+
+ fn nrand(n: i32, randx: &Mutex<i32>) -> i32 {
+ let mut randx = randx.lock().unwrap();
+ *randx += 10007;
+ if *randx >= 1000000 {
+ *randx -= 1000000
+ }
+ *randx % n
+ }
+
+ fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 {
+ let mut nproc = nproc.lock().unwrap();
+ *nproc += adjust;
+ *nproc
+ }
+
+ fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> {
+ let mut ca = Vec::<ChanWithVals>::with_capacity(n);
+ let mut cval = cval.lock().unwrap();
+ for _ in 0..n {
+ *cval += MESSAGES_RANGE_LEN as i32;
+ let chl = ChanWithVals::with_capacity(c);
+ chl.sv.store(*cval, SeqCst);
+ chl.rv.store(*cval, SeqCst);
+ ca.push(chl);
+ }
+ ca
+ }
+
+ fn expect(v: i32, v0: i32) -> i32 {
+ if v == v0 {
+ return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 {
+ END
+ } else {
+ v + 1
+ };
+ }
+ panic!("got {}, expected {}", v, v0 + 1);
+ }
+
+ fn send(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in 0..=nrand(10, ctx.randx()) {
+ thread::yield_now();
+ }
+ c.chan.tx().send(c.sv()).unwrap();
+ if c.send(ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn recv(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in (0..nrand(10, ctx.randx())).rev() {
+ thread::yield_now();
+ }
+ let v = c.chan.rx().recv().unwrap();
+ if c.recv(v, ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ fn sel(
+ mut r0: ChanWithVals,
+ mut r1: ChanWithVals,
+ mut r2: ChanWithVals,
+ mut r3: ChanWithVals,
+ mut s0: ChanWithVals,
+ mut s1: ChanWithVals,
+ mut s2: ChanWithVals,
+ mut s3: ChanWithVals,
+ ctx: Context,
+ ) {
+ let mut a = 0; // local chans running
+
+ if r0.chan.has_rx() {
+ a += 1;
+ }
+ if r1.chan.has_rx() {
+ a += 1;
+ }
+ if r2.chan.has_rx() {
+ a += 1;
+ }
+ if r3.chan.has_rx() {
+ a += 1;
+ }
+ if s0.chan.has_tx() {
+ a += 1;
+ }
+ if s1.chan.has_tx() {
+ a += 1;
+ }
+ if s2.chan.has_tx() {
+ a += 1;
+ }
+ if s3.chan.has_tx() {
+ a += 1;
+ }
+
+ loop {
+ for _ in 0..=nrand(5, ctx.randx()) {
+ thread::yield_now();
+ }
+ select! {
+ recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 },
+ send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 },
+ send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 },
+ send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 },
+ }
+ if a == 0 {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals {
+ vec.get(idx).unwrap().clone()
+ }
+
+ /// Direct send to direct recv
+ fn test1(c: ChanWithVals, ctx: &mut Context) {
+ change_nproc(2, ctx.nproc());
+ go!(c, ctx, send(c, ctx));
+ go!(c, ctx, recv(c, ctx));
+ }
+
+ /// Direct send to select recv
+ fn test2(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 0), ctx));
+ go!(ca, ctx, send(get(&ca, 1), ctx));
+ go!(ca, ctx, send(get(&ca, 2), ctx));
+ go!(ca, ctx, send(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to direct recv
+ fn test3(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 0), ctx));
+ go!(ca, ctx, recv(get(&ca, 1), ctx));
+ go!(ca, ctx, recv(get(&ca, 2), ctx));
+ go!(ca, ctx, recv(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 4 channels
+ fn test4(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 8 channels
+ fn test5(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 8, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ ctx,
+ )
+ );
+ }
+
+ // Direct and select send to direct and select recv
+ fn test6(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 12, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 4), ctx));
+ go!(ca, ctx, send(get(&ca, 5), ctx));
+ go!(ca, ctx, send(get(&ca, 6), ctx));
+ go!(ca, ctx, send(get(&ca, 7), ctx));
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 8), ctx));
+ go!(ca, ctx, recv(get(&ca, 9), ctx));
+ go!(ca, ctx, recv(get(&ca, 10), ctx));
+ go!(ca, ctx, recv(get(&ca, 11), ctx));
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 8),
+ get(&ca, 9),
+ get(&ca, 10),
+ get(&ca, 11),
+ ctx,
+ )
+ );
+ }
+
+ fn wait(ctx: &mut Context) {
+ thread::yield_now();
+ while change_nproc(0, ctx.nproc()) != 0 {
+ thread::yield_now();
+ }
+ }
+
+ fn tests(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+ test1(get(&ca, 0), ctx);
+ test1(get(&ca, 1), ctx);
+ test1(get(&ca, 2), ctx);
+ test1(get(&ca, 3), ctx);
+ wait(ctx);
+
+ test2(c, ctx);
+ wait(ctx);
+
+ test3(c, ctx);
+ wait(ctx);
+
+ test4(c, ctx);
+ wait(ctx);
+
+ test5(c, ctx);
+ wait(ctx);
+
+ test6(c, ctx);
+ wait(ctx);
+ }
+
+ #[test]
+ fn main() {
+ let mut ctx = Context {
+ nproc: Arc::new(Mutex::new(0)),
+ cval: Arc::new(Mutex::new(0)),
+ tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })),
+ nc: ChanWithVals::closed(),
+ randx: Arc::new(Mutex::new(0)),
+ };
+
+ tests(0, &mut ctx);
+ tests(1, &mut ctx);
+ tests(10, &mut ctx);
+ tests(100, &mut ctx);
+
+ #[rustfmt::skip]
+ let t = 4 * // buffer sizes
+ (4*4 + // tests 1,2,3,4 channels
+ 8 + // test 5 channels
+ 12) * // test 6 channels
+ MESSAGES_PER_CHANEL; // sends/recvs on a channel
+
+ let tot = ctx.tot.lock().unwrap();
+ if tot.tots != t || tot.totr != t {
+ panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t);
+ }
+ }
}
// https://github.com/golang/go/blob/master/test/ken/chan1.go
@@ -1551,7 +2086,7 @@ mod chan1 {
// sent messages
#[cfg(miri)]
- const N: usize = 100;
+ const N: usize = 20;
#[cfg(not(miri))]
const N: usize = 1000;
// receiving "goroutines"
diff --git a/tests/list.rs b/tests/list.rs
index 619e1fc..a0b9087 100644
--- a/tests/list.rs
+++ b/tests/list.rs
@@ -132,8 +132,13 @@ fn recv_timeout() {
#[test]
fn try_send() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.try_send(i), Ok(()));
}
@@ -143,8 +148,13 @@ fn try_send() {
#[test]
fn send() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.send(i), Ok(()));
}
@@ -154,8 +164,13 @@ fn send() {
#[test]
fn send_timeout() {
+ #[cfg(miri)]
+ const COUNT: usize = 50;
+ #[cfg(not(miri))]
+ const COUNT: usize = 1000;
+
let (s, r) = unbounded();
- for i in 0..1000 {
+ for i in 0..COUNT {
assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
}
@@ -383,10 +398,16 @@ fn stress_timeout_two_threads() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 20;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 100;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -402,8 +423,8 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..10_000);
- let additional = rng.gen_range(0..1000);
+ let steps = rng.gen_range(0..STEPS);
+ let additional = rng.gen_range(0..STEPS / 10);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = unbounded::<DropCounter>();
@@ -412,6 +433,8 @@ fn drops() {
scope.spawn(|_| {
for _ in 0..steps {
r.recv().unwrap();
+ #[cfg(miri)]
+ std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388
}
});
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 4d6e179..3db4812 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -321,7 +321,7 @@ mod channel_tests {
#[test]
fn stress() {
#[cfg(miri)]
- const COUNT: usize = 500;
+ const COUNT: usize = 100;
#[cfg(not(miri))]
const COUNT: usize = 10000;
@@ -340,7 +340,7 @@ mod channel_tests {
#[test]
fn stress_shared() {
#[cfg(miri)]
- const AMT: u32 = 500;
+ const AMT: u32 = 100;
#[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
@@ -747,7 +747,7 @@ mod channel_tests {
#[test]
fn recv_a_lot() {
#[cfg(miri)]
- const N: usize = 100;
+ const N: usize = 50;
#[cfg(not(miri))]
const N: usize = 10000;
diff --git a/tests/select.rs b/tests/select.rs
index f24aed8..e7691f5 100644
--- a/tests/select.rs
+++ b/tests/select.rs
@@ -694,7 +694,7 @@ fn nesting() {
#[test]
fn stress_recv() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -735,7 +735,7 @@ fn stress_recv() {
#[test]
fn stress_send() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -953,7 +953,7 @@ fn matching_with_leftover() {
#[test]
fn channel_through_channel() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 1000;
@@ -1014,7 +1014,7 @@ fn channel_through_channel() {
#[test]
fn linearizable_try() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -1069,7 +1069,7 @@ fn linearizable_try() {
#[test]
fn linearizable_timeout() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -1124,7 +1124,7 @@ fn linearizable_timeout() {
#[test]
fn fairness1() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1173,7 +1173,7 @@ fn fairness1() {
#[test]
fn fairness2() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1292,7 +1292,7 @@ fn send_and_clone() {
#[test]
fn reuse() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
diff --git a/tests/select_macro.rs b/tests/select_macro.rs
index 0b9a21a..91c04e1 100644
--- a/tests/select_macro.rs
+++ b/tests/select_macro.rs
@@ -488,7 +488,7 @@ fn panic_receiver() {
#[test]
fn stress_recv() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -1468,3 +1468,14 @@ fn disconnect_wakes_receiver() {
})
.unwrap();
}
+
+#[test]
+fn trailing_comma() {
+ let (s, r) = unbounded::<usize>();
+
+ select! {
+ send(s, 1,) -> _ => {},
+ recv(r,) -> _ => {},
+ default(ms(1000),) => {},
+ }
+}
diff --git a/tests/zero.rs b/tests/zero.rs
index ba41b1a..c90d741 100644
--- a/tests/zero.rs
+++ b/tests/zero.rs
@@ -188,7 +188,7 @@ fn send_timeout() {
#[test]
fn len() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 25_000;
@@ -253,7 +253,7 @@ fn disconnect_wakes_receiver() {
#[test]
fn spsc() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
@@ -278,7 +278,7 @@ fn spsc() {
#[test]
fn mpmc() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 25_000;
const THREADS: usize = 4;
@@ -313,7 +313,7 @@ fn mpmc() {
#[test]
fn stress_oneshot() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -396,10 +396,16 @@ fn stress_timeout_two_threads() {
.unwrap();
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
+ #[cfg(miri)]
+ const RUNS: usize = 20;
+ #[cfg(not(miri))]
const RUNS: usize = 100;
+ #[cfg(miri)]
+ const STEPS: usize = 500;
+ #[cfg(not(miri))]
+ const STEPS: usize = 10_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
@@ -415,7 +421,7 @@ fn drops() {
let mut rng = thread_rng();
for _ in 0..RUNS {
- let steps = rng.gen_range(0..3_000);
+ let steps = rng.gen_range(0..STEPS);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = bounded::<DropCounter>(0);
@@ -445,7 +451,7 @@ fn drops() {
#[test]
fn fairness() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -479,7 +485,7 @@ fn fairness() {
#[test]
fn fairness_duplicates() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 10_000;
@@ -540,7 +546,7 @@ fn recv_in_send() {
#[test]
fn channel_through_channel() {
#[cfg(miri)]
- const COUNT: usize = 100;
+ const COUNT: usize = 50;
#[cfg(not(miri))]
const COUNT: usize = 1000;