diff options
author | David LeGare <legare@google.com> | 2022-06-29 17:11:05 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2022-06-29 17:11:05 +0000 |
commit | b113b5566a510090f58421f427ed5ebbfa27ce93 (patch) | |
tree | 75a4e401ccfe098595fc5f85f17b162d1ea55998 | |
parent | 134994aa018e129c4bca68272a1cb9f6144de116 (diff) | |
parent | 45db964b0c7f909b89ab97243a5aa4e973914faa (diff) | |
download | crossbeam-channel-b113b5566a510090f58421f427ed5ebbfa27ce93.tar.gz |
Upgrade rust/crates/crossbeam-channel to 0.5.5 am: 45db964b0cmain-16k
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/crossbeam-channel/+/2137795
Change-Id: Ie70d25054e4d9f1943fc9bc9ae84e1ff0b2e1ee6
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 20 | ||||
-rw-r--r-- | Cargo.lock | 50 | ||||
-rw-r--r-- | Cargo.toml | 19 | ||||
-rw-r--r-- | Cargo.toml.orig | 2 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | src/channel.rs | 9 | ||||
-rw-r--r-- | src/err.rs | 4 | ||||
-rw-r--r-- | src/flavors/array.rs | 74 | ||||
-rw-r--r-- | src/flavors/at.rs | 3 | ||||
-rw-r--r-- | src/flavors/list.rs | 10 | ||||
-rw-r--r-- | src/flavors/never.rs | 1 | ||||
-rw-r--r-- | src/flavors/tick.rs | 4 | ||||
-rw-r--r-- | src/flavors/zero.rs | 61 | ||||
-rw-r--r-- | src/select.rs | 19 | ||||
-rw-r--r-- | src/select_macro.rs | 60 | ||||
-rw-r--r-- | src/utils.rs | 58 | ||||
-rw-r--r-- | src/waker.rs | 64 | ||||
-rw-r--r-- | tests/array.rs | 98 | ||||
-rw-r--r-- | tests/golang.rs | 623 | ||||
-rw-r--r-- | tests/list.rs | 35 | ||||
-rw-r--r-- | tests/mpsc.rs | 6 | ||||
-rw-r--r-- | tests/select.rs | 16 | ||||
-rw-r--r-- | tests/select_macro.rs | 13 | ||||
-rw-r--r-- | tests/zero.rs | 24 |
26 files changed, 948 insertions, 341 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f15a046..1ea95d0 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83" + "sha1": "16769892dc5e1ba14c8ea14e92e85f442e25f4aa" }, "path_in_vcs": "crossbeam-channel" }
\ No newline at end of file @@ -47,7 +47,7 @@ rust_library { host_supported: true, crate_name: "crossbeam_channel", cargo_env_compat: true, - cargo_pkg_version: "0.5.2", + cargo_pkg_version: "0.5.5", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -59,4 +59,8 @@ rust_library { "libcfg_if", "libcrossbeam_utils", ], + apex_available: [ + "//apex_available:platform", + "//apex_available:anyapex", + ], } diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bfd923..2f627cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,22 @@ +# Version 0.5.5 + +- Replace Spinlock with Mutex. (#835) + +# Version 0.5.4 + +- Workaround a bug in upstream related to TLS access on AArch64 Linux. (#802) + +# Version 0.5.3 + +**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details. + +- Fix panic on very large timeout. (#798) + # Version 0.5.2 -- Fix stacked borrows violations. (#763, #764) +**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details. + +- Fix stacked borrows violations when `-Zmiri-tag-raw-pointers` is enabled. (#763, #764) # Version 0.5.1 @@ -22,6 +38,8 @@ # Version 0.4.3 +**Note:** This release has been yanked. See [GHSA-v5m7-53cv-f3hx](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-v5m7-53cv-f3hx) for details. + - Change license to "MIT OR Apache-2.0". # Version 0.4.2 @@ -10,7 +10,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.5" dependencies = [ "cfg-if", "crossbeam-utils", @@ -21,19 +21,19 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.6" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" +checksum = "8ff1f980957787286a554052d03c7aee98d99cc32e09f6d45f0a814133c87978" dependencies = [ "cfg-if", - "lazy_static", + "once_cell", ] [[package]] name = "getrandom" -version = "0.2.3" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", "libc", @@ -50,16 +50,10 @@ dependencies = [ ] [[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] name = "libc" -version = "0.2.112" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "num_cpus" @@ -72,6 +66,12 @@ dependencies = [ ] [[package]] +name = "once_cell" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" + +[[package]] name = "ppv-lite86" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -79,14 +79,13 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "rand" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", "rand_core", - "rand_hc", ] [[package]] @@ -109,19 +108,10 @@ dependencies = [ ] [[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core", -] - -[[package]] name = "signal-hook" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" dependencies = [ "libc", "signal-hook-registry", @@ -138,6 +128,6 @@ dependencies = [ [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" @@ -13,13 +13,25 @@ edition = "2018" rust-version = "1.36" name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.5" description = "Multi-producer multi-consumer channels for message passing" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel" -keywords = ["channel", "mpmc", "select", "golang", "message"] -categories = ["algorithms", "concurrency", "data-structures"] +readme = "README.md" +keywords = [ + "channel", + "mpmc", + "select", + "golang", + "message", +] +categories = [ + "algorithms", + "concurrency", + "data-structures", +] license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" + [dependencies.cfg-if] version = "1" @@ -27,6 +39,7 @@ version = "1" version = "0.8" optional = true default-features = false + [dev-dependencies.num_cpus] version = "1.13.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 640a808..344092c 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,7 +4,7 @@ name = "crossbeam-channel" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-channel-X.Y.Z" git tag -version = "0.5.2" +version = "0.5.5" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate" + value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.5.crate" } - version: "0.5.2" + version: "0.5.5" license_type: NOTICE last_upgrade_date { year: 2022 - month: 3 - day: 1 + month: 6 + day: 28 } } diff --git a/src/channel.rs b/src/channel.rs index 8988235..800fe63 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -14,6 +14,7 @@ use crate::err::{ }; use crate::flavors; use crate::select::{Operation, SelectHandle, Token}; +use crate::utils; /// Creates a channel of unbounded capacity. /// @@ -232,6 +233,8 @@ pub fn at(when: Instant) -> Receiver<Instant> { /// /// Using a `never` channel to optionally add a timeout to [`select!`]: /// +/// [`select!`]: crate::select! +/// /// ``` /// use std::thread; /// use std::time::Duration; @@ -297,7 +300,7 @@ pub fn never<T>() -> Receiver<T> { /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. -/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a; /// /// let start = Instant::now(); /// let r = tick(ms(100)); @@ -471,7 +474,7 @@ impl<T> Sender<T> { /// ); /// ``` pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { - self.send_deadline(msg, Instant::now() + timeout) + self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout)) } /// Waits for a message to be sent into the channel, but only until a given deadline. @@ -861,7 +864,7 @@ impl<T> Receiver<T> { /// ); /// ``` pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { - self.recv_deadline(Instant::now() + timeout) + self.recv_deadline(utils::convert_timeout_to_deadline(timeout)) } /// Waits for a message to be received from the channel, but only before a given deadline. @@ -308,7 +308,6 @@ impl From<RecvError> for TryRecvError { impl TryRecvError { /// Returns `true` if the receive operation failed because the channel is empty. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_empty(&self) -> bool { match self { TryRecvError::Empty => true, @@ -317,7 +316,6 @@ impl TryRecvError { } /// Returns `true` if the receive operation failed because the channel is disconnected. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_disconnected(&self) -> bool { match self { TryRecvError::Disconnected => true, @@ -347,7 +345,6 @@ impl From<RecvError> for RecvTimeoutError { impl RecvTimeoutError { /// Returns `true` if the receive operation timed out. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_timeout(&self) -> bool { match self { RecvTimeoutError::Timeout => true, @@ -356,7 +353,6 @@ impl RecvTimeoutError { } /// Returns `true` if the receive operation failed because the channel is disconnected. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_disconnected(&self) -> bool { match self { RecvTimeoutError::Disconnected => true, diff --git a/src/flavors/array.rs b/src/flavors/array.rs index 871768c..73557d3 100644 --- a/src/flavors/array.rs +++ b/src/flavors/array.rs @@ -9,7 +9,6 @@ //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> use std::cell::UnsafeCell; -use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; @@ -33,7 +32,7 @@ struct Slot<T> { /// The token type for the array flavor. #[derive(Debug)] -pub struct ArrayToken { +pub(crate) struct ArrayToken { /// Slot to read from or write to. slot: *const u8, @@ -72,7 +71,7 @@ pub(crate) struct Channel<T> { tail: CachePadded<AtomicUsize>, /// The buffer holding slots. - buffer: *mut Slot<T>, + buffer: Box<[Slot<T>]>, /// The channel capacity. cap: usize, @@ -88,9 +87,6 @@ pub(crate) struct Channel<T> { /// Receivers waiting while the channel is empty and not disconnected. receivers: SyncWaker, - - /// Indicates that dropping a `Channel<T>` may drop values of type `T`. - _marker: PhantomData<T>, } impl<T> Channel<T> { @@ -109,18 +105,15 @@ impl<T> Channel<T> { // Allocate a buffer of `cap` slots initialized // with stamps. - let buffer = { - let boxed: Box<[Slot<T>]> = (0..cap) - .map(|i| { - // Set the stamp to `{ lap: 0, mark: 0, index: i }`. - Slot { - stamp: AtomicUsize::new(i), - msg: UnsafeCell::new(MaybeUninit::uninit()), - } - }) - .collect(); - Box::into_raw(boxed) as *mut Slot<T> - }; + let buffer: Box<[Slot<T>]> = (0..cap) + .map(|i| { + // Set the stamp to `{ lap: 0, mark: 0, index: i }`. + Slot { + stamp: AtomicUsize::new(i), + msg: UnsafeCell::new(MaybeUninit::uninit()), + } + }) + .collect(); Channel { buffer, @@ -131,7 +124,6 @@ impl<T> Channel<T> { tail: CachePadded::new(AtomicUsize::new(tail)), senders: SyncWaker::new(), receivers: SyncWaker::new(), - _marker: PhantomData, } } @@ -163,7 +155,8 @@ impl<T> Channel<T> { let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. @@ -245,7 +238,8 @@ impl<T> Channel<T> { let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the the stamp is ahead of the head by 1, we may attempt to pop. @@ -475,7 +469,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. pub(crate) fn capacity(&self) -> Option<usize> { Some(self.cap) } @@ -528,10 +521,24 @@ impl<T> Channel<T> { impl<T> Drop for Channel<T> { fn drop(&mut self) { // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); + let head = *self.head.get_mut(); + let tail = *self.tail.get_mut(); + + let hix = head & (self.mark_bit - 1); + let tix = tail & (self.mark_bit - 1); + + let len = if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if (tail & !self.mark_bit) == head { + 0 + } else { + self.cap + }; // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { + for i in 0..len { // Compute the index of the next slot holding a message. let index = if hix + i < self.cap { hix + i @@ -540,23 +547,12 @@ impl<T> Drop for Channel<T> { }; unsafe { - let p = { - let slot = &mut *self.buffer.add(index); - let msg = &mut *slot.msg.get(); - msg.as_mut_ptr() - }; - p.drop_in_place(); + debug_assert!(index < self.buffer.len()); + let slot = self.buffer.get_unchecked_mut(index); + let msg = &mut *slot.msg.get(); + msg.as_mut_ptr().drop_in_place(); } } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - // Create a slice from the buffer to make - // a fat pointer. Then, use Box::from_raw - // to deallocate it. - let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; - Box::from_raw(ptr); - } } } diff --git a/src/flavors/at.rs b/src/flavors/at.rs index 4581edb..ca5ee60 100644 --- a/src/flavors/at.rs +++ b/src/flavors/at.rs @@ -35,7 +35,7 @@ impl Channel { /// Creates a channel that delivers a message after a certain duration of time. #[inline] pub(crate) fn new_timeout(dur: Duration) -> Self { - Self::new_deadline(Instant::now() + dur) + Self::new_deadline(utils::convert_timeout_to_deadline(dur)) } /// Attempts to receive a message without blocking. @@ -142,7 +142,6 @@ impl Channel { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(1) diff --git a/src/flavors/list.rs b/src/flavors/list.rs index 5056aa4..9bda6d1 100644 --- a/src/flavors/list.rs +++ b/src/flavors/list.rs @@ -126,7 +126,7 @@ struct Position<T> { /// The token type for the list flavor. #[derive(Debug)] -pub struct ListToken { +pub(crate) struct ListToken { /// The block of slots. block: *const u8, @@ -634,9 +634,9 @@ impl<T> Channel<T> { impl<T> Drop for Channel<T> { fn drop(&mut self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let mut tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let mut head = *self.head.index.get_mut(); + let mut tail = *self.tail.index.get_mut(); + let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); @@ -654,7 +654,7 @@ impl<T> Drop for Channel<T> { p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } diff --git a/src/flavors/never.rs b/src/flavors/never.rs index 1951e96..277a61d 100644 --- a/src/flavors/never.rs +++ b/src/flavors/never.rs @@ -65,7 +65,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(0) diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs index d4b1f6c..4201b6e 100644 --- a/src/flavors/tick.rs +++ b/src/flavors/tick.rs @@ -10,6 +10,7 @@ use crossbeam_utils::atomic::AtomicCell; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; +use crate::utils; /// Result of a receive operation. pub(crate) type TickToken = Option<Instant>; @@ -28,7 +29,7 @@ impl Channel { #[inline] pub(crate) fn new(dur: Duration) -> Self { Channel { - delivery_time: AtomicCell::new(Instant::now() + dur), + delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)), duration: dur, } } @@ -112,7 +113,6 @@ impl Channel { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(1) diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index 4afbd8f..31e62af 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -5,6 +5,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::time::Instant; use std::{fmt, ptr}; @@ -13,11 +14,10 @@ use crossbeam_utils::Backoff; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub struct ZeroToken(*mut ()); +pub(crate) struct ZeroToken(*mut ()); impl Default for ZeroToken { fn default() -> Self { @@ -95,7 +95,7 @@ struct Inner { /// Zero-capacity channel. pub(crate) struct Channel<T> { /// Inner representation of the channel. - inner: Spinlock<Inner>, + inner: Mutex<Inner>, /// Indicates that dropping a `Channel<T>` may drop values of type `T`. _marker: PhantomData<T>, @@ -105,7 +105,7 @@ impl<T> Channel<T> { /// Constructs a new zero-capacity channel. pub(crate) fn new() -> Self { Channel { - inner: Spinlock::new(Inner { + inner: Mutex::new(Inner { senders: Waker::new(), receivers: Waker::new(), is_disconnected: false, @@ -126,7 +126,7 @@ impl<T> Channel<T> { /// Attempts to reserve a slot for sending a message. fn start_send(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -155,7 +155,7 @@ impl<T> Channel<T> { /// Attempts to pair up with a sender. fn start_recv(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -198,7 +198,7 @@ impl<T> Channel<T> { /// Attempts to send a message into the channel. pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -222,7 +222,7 @@ impl<T> Channel<T> { deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -254,12 +254,12 @@ impl<T> Channel<T> { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Timeout(msg)) } Selected::Disconnected => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Disconnected(msg)) } @@ -275,7 +275,7 @@ impl<T> Channel<T> { /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -292,7 +292,7 @@ impl<T> Channel<T> { /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -325,11 +325,21 @@ impl<T> Channel<T> { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Timeout) } Selected::Disconnected => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Disconnected) } Selected::Operation(_) => { @@ -345,7 +355,7 @@ impl<T> Channel<T> { /// /// Returns `true` if this call disconnected the channel. pub(crate) fn disconnect(&self) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !inner.is_disconnected { inner.is_disconnected = true; @@ -363,7 +373,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. pub(crate) fn capacity(&self) -> Option<usize> { Some(0) } @@ -397,7 +406,7 @@ impl<T> SelectHandle for Receiver<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::<T>::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .receivers .register_with_packet(oper, packet as *mut (), cx); @@ -406,7 +415,7 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { unsafe { drop(Box::from_raw(operation.packet as *mut Packet<T>)); } @@ -419,18 +428,18 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.senders.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.watch(oper, cx); inner.senders.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.unwatch(oper); } } @@ -447,7 +456,7 @@ impl<T> SelectHandle for Sender<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::<T>::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .senders .register_with_packet(oper, packet as *mut (), cx); @@ -456,7 +465,7 @@ impl<T> SelectHandle for Sender<'_, T> { } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { unsafe { drop(Box::from_raw(operation.packet as *mut Packet<T>)); } @@ -469,18 +478,18 @@ impl<T> SelectHandle for Sender<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.receivers.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.watch(oper, cx); inner.receivers.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.unwatch(oper); } } diff --git a/src/select.rs b/src/select.rs index 6103ef4..57d67a3 100644 --- a/src/select.rs +++ b/src/select.rs @@ -22,12 +22,13 @@ use crate::utils; // This is a private API that is used by the select macro. #[derive(Debug, Default)] pub struct Token { - pub at: flavors::at::AtToken, - pub array: flavors::array::ArrayToken, - pub list: flavors::list::ListToken, - pub never: flavors::never::NeverToken, - pub tick: flavors::tick::TickToken, - pub zero: flavors::zero::ZeroToken, + pub(crate) at: flavors::at::AtToken, + pub(crate) array: flavors::array::ArrayToken, + pub(crate) list: flavors::list::ListToken, + #[allow(dead_code)] + pub(crate) never: flavors::never::NeverToken, + pub(crate) tick: flavors::tick::TickToken, + pub(crate) zero: flavors::zero::ZeroToken, } /// Identifier associated with an operation by a specific thread on a specific channel. @@ -486,7 +487,7 @@ pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { - select_deadline(handles, Instant::now() + timeout) + select_deadline(handles, utils::convert_timeout_to_deadline(timeout)) } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. @@ -518,6 +519,8 @@ pub(crate) fn select_deadline<'a>( /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a /// dynamically created list of channel operations. /// +/// [`select!`]: crate::select! +/// /// Once a list of operations has been built with `Select`, there are two different ways of /// proceeding: /// @@ -1042,7 +1045,7 @@ impl<'a> Select<'a> { /// } /// ``` pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { - self.ready_deadline(Instant::now() + timeout) + self.ready_deadline(utils::convert_timeout_to_deadline(timeout)) } /// Blocks until a given deadline, or until one of the operations becomes ready. diff --git a/src/select_macro.rs b/src/select_macro.rs index f8b247e..efe0ae4 100644 --- a/src/select_macro.rs +++ b/src/select_macro.rs @@ -121,18 +121,7 @@ macro_rules! crossbeam_channel_internal { }; // Only one case remains. (@list - ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr) - ($($head:tt)*) - ) => { - $crate::crossbeam_channel_internal!( - @list - () - ($($head)* $case ($($args)*) $(-> $res)* => { $body },) - ) - }; - // Accept a trailing comma at the end of the list. - (@list - ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr,) + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr $(,)?) ($($head:tt)*) ) => { $crate::crossbeam_channel_internal!( @@ -373,20 +362,7 @@ macro_rules! crossbeam_channel_internal { // Check the format of a recv case. (@case - (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*) - ($($cases:tt)*) - $default:tt - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - ($($cases)* recv($r) -> $res => $body,) - $default - ) - }; - // Allow trailing comma... - (@case - (recv($r:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + (recv($r:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*) ($($cases:tt)*) $default:tt ) => { @@ -428,20 +404,7 @@ macro_rules! crossbeam_channel_internal { // Check the format of a send case. (@case - (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*) - ($($cases:tt)*) - $default:tt - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - ($($cases)* send($s, $m) -> $res => $body,) - $default - ) - }; - // Allow trailing comma... - (@case - (send($s:expr, $m:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + (send($s:expr, $m:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*) ($($cases:tt)*) $default:tt ) => { @@ -496,20 +459,7 @@ macro_rules! crossbeam_channel_internal { }; // Check the format of a default case with timeout. (@case - (default($timeout:expr) => $body:tt, $($tail:tt)*) - $cases:tt - () - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - $cases - (default($timeout) => $body,) - ) - }; - // Allow trailing comma... - (@case - (default($timeout:expr,) => $body:tt, $($tail:tt)*) + (default($timeout:expr $(,)?) => $body:tt, $($tail:tt)*) $cases:tt () ) => { @@ -1043,7 +993,7 @@ macro_rules! crossbeam_channel_internal { /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even /// when it will simply return an error because the channel is disconnected. /// -/// The `select` macro is a convenience wrapper around [`Select`]. However, it cannot select over a +/// The `select!` macro is a convenience wrapper around [`Select`]. However, it cannot select over a /// dynamically created list of channel operations. /// /// [`Select`]: super::Select diff --git a/src/utils.rs b/src/utils.rs index 557b6a0..9f14c8e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,14 +1,10 @@ //! Miscellaneous utilities. -use std::cell::{Cell, UnsafeCell}; +use std::cell::Cell; use std::num::Wrapping; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, Instant}; -use crossbeam_utils::Backoff; - /// Randomly shuffles a slice. pub(crate) fn shuffle<T>(v: &mut [T]) { let len = v.len(); @@ -61,52 +57,10 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) { } } -/// A simple spinlock. -pub(crate) struct Spinlock<T> { - flag: AtomicBool, - value: UnsafeCell<T>, -} - -impl<T> Spinlock<T> { - /// Returns a new spinlock initialized with `value`. - pub(crate) fn new(value: T) -> Spinlock<T> { - Spinlock { - flag: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.flag.swap(true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } -} - -/// A guard holding a spinlock locked. -pub(crate) struct SpinlockGuard<'a, T> { - parent: &'a Spinlock<T>, -} - -impl<T> Drop for SpinlockGuard<'_, T> { - fn drop(&mut self) { - self.parent.flag.store(false, Ordering::Release); - } -} - -impl<T> Deref for SpinlockGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<T> DerefMut for SpinlockGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } +// https://github.com/crossbeam-rs/crossbeam/issues/795 +pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant { + match Instant::now().checked_add(timeout) { + Some(deadline) => deadline, + None => Instant::now() + Duration::from_secs(86400 * 365 * 30), } } diff --git a/src/waker.rs b/src/waker.rs index dec73a9..7eb58ba 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -2,11 +2,11 @@ use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::thread::{self, ThreadId}; use crate::context::Context; use crate::select::{Operation, Selected}; -use crate::utils::Spinlock; /// Represents a thread blocked on a specific channel operation. pub(crate) struct Entry { @@ -77,26 +77,32 @@ impl Waker { /// Attempts to find another thread's entry, select the operation, and wake it up. #[inline] pub(crate) fn try_select(&mut self) -> Option<Entry> { - self.selectors - .iter() - .position(|selector| { - // Does the entry belong to a different thread? - selector.cx.thread_id() != current_thread_id() - && selector // Try selecting this operation. - .cx - .try_select(Selected::Operation(selector.oper)) - .is_ok() - && { - // Provide the packet. - selector.cx.store_packet(selector.packet); - // Wake the thread up. - selector.cx.unpark(); - true - } - }) - // Remove the entry from the queue to keep it clean and improve - // performance. - .map(|pos| self.selectors.remove(pos)) + if self.selectors.is_empty() { + None + } else { + let thread_id = current_thread_id(); + + self.selectors + .iter() + .position(|selector| { + // Does the entry belong to a different thread? + selector.cx.thread_id() != thread_id + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { + // Provide the packet. + selector.cx.store_packet(selector.packet); + // Wake the thread up. + selector.cx.unpark(); + true + } + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) + } } /// Returns `true` if there is an entry which can be selected by the current thread. @@ -170,7 +176,7 @@ impl Drop for Waker { /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. pub(crate) struct SyncWaker { /// The inner `Waker`. - inner: Spinlock<Waker>, + inner: Mutex<Waker>, /// `true` if the waker is empty. is_empty: AtomicBool, @@ -181,7 +187,7 @@ impl SyncWaker { #[inline] pub(crate) fn new() -> Self { SyncWaker { - inner: Spinlock::new(Waker::new()), + inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true), } } @@ -189,7 +195,7 @@ impl SyncWaker { /// Registers the current thread with an operation. #[inline] pub(crate) fn register(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.register(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -200,7 +206,7 @@ impl SyncWaker { /// Unregisters an operation previously registered by the current thread. #[inline] pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); let entry = inner.unregister(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -213,7 +219,7 @@ impl SyncWaker { #[inline] pub(crate) fn notify(&self) { if !self.is_empty.load(Ordering::SeqCst) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !self.is_empty.load(Ordering::SeqCst) { inner.try_select(); inner.notify(); @@ -228,7 +234,7 @@ impl SyncWaker { /// Registers an operation waiting to be ready. #[inline] pub(crate) fn watch(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.watch(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -239,7 +245,7 @@ impl SyncWaker { /// Unregisters an operation waiting to be ready. #[inline] pub(crate) fn unwatch(&self, oper: Operation) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.unwatch(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -250,7 +256,7 @@ impl SyncWaker { /// Notifies all threads that the channel is disconnected. #[inline] pub(crate) fn disconnect(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.disconnect(); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), diff --git a/tests/array.rs b/tests/array.rs index bb2cebe..de843cd 100644 --- a/tests/array.rs +++ b/tests/array.rs @@ -1,7 +1,5 @@ //! Tests for the array channel flavor. -#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow - use std::any::Any; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -254,7 +252,13 @@ fn recv_after_disconnect() { #[test] fn len() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 25_000; + #[cfg(miri)] + const CAP: usize = 50; + #[cfg(not(miri))] const CAP: usize = 1000; let (s, r) = bounded(CAP); @@ -347,6 +351,9 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (s, r) = bounded(3); @@ -369,6 +376,9 @@ fn spsc() { #[test] fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -401,6 +411,9 @@ fn mpmc() { #[test] fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; for _ in 0..COUNT { @@ -416,6 +429,9 @@ fn stress_oneshot() { #[test] fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (request_s, request_r) = bounded(1); @@ -483,7 +499,14 @@ fn stress_timeout_two_threads() { #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 10; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -499,7 +522,7 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); + let steps = rng.gen_range(0..STEPS); let additional = rng.gen_range(0..50); DROPS.store(0, Ordering::SeqCst); @@ -509,12 +532,16 @@ fn drops() { scope.spawn(|_| { for _ in 0..steps { r.recv().unwrap(); + #[cfg(miri)] + std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388 } }); scope.spawn(|_| { for _ in 0..steps { s.send(DropCounter).unwrap(); + #[cfg(miri)] + std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388 } }); }) @@ -533,6 +560,9 @@ fn drops() { #[test] fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -553,6 +583,9 @@ fn linearizable() { #[test] fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -575,6 +608,9 @@ fn fairness() { #[test] fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s, r) = bounded::<()>(COUNT); @@ -619,6 +655,9 @@ fn recv_in_send() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; @@ -654,3 +693,56 @@ fn channel_through_channel() { }) .unwrap(); } + +#[test] +fn panic_on_drop() { + struct Msg1<'a>(&'a mut bool); + impl Drop for Msg1<'_> { + fn drop(&mut self) { + if *self.0 && !std::thread::panicking() { + panic!("double drop"); + } else { + *self.0 = true; + } + } + } + + struct Msg2<'a>(&'a mut bool); + impl Drop for Msg2<'_> { + fn drop(&mut self) { + if *self.0 { + panic!("double drop"); + } else { + *self.0 = true; + panic!("first drop"); + } + } + } + + // normal + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg1(&mut a)).unwrap(); + s.send(Msg1(&mut b)).unwrap(); + drop(s); + drop(r); + assert!(a); + assert!(b); + + // panic on drop + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg2(&mut a)).unwrap(); + s.send(Msg2(&mut b)).unwrap(); + drop(s); + let res = std::panic::catch_unwind(move || { + drop(r); + }); + assert_eq!( + *res.unwrap_err().downcast_ref::<&str>().unwrap(), + "first drop" + ); + assert!(a); + // Elements after the panicked element will leak. + assert!(!b); +} diff --git a/tests/golang.rs b/tests/golang.rs index 05d67f6..6a46c03 100644 --- a/tests/golang.rs +++ b/tests/golang.rs @@ -15,12 +15,12 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::any::Any; use std::cell::Cell; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender}; +use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) @@ -32,7 +32,13 @@ struct Chan<T> { struct ChanInner<T> { s: Option<Sender<T>>, - r: Receiver<T>, + r: Option<Receiver<T>>, + // Receiver to use when r is None (Go blocks on receiving from nil) + nil_r: Receiver<T>, + // Sender to use when s is None (Go blocks on sending to nil) + nil_s: Sender<T>, + // Hold this receiver to prevent nil sender channel from disconnection + _nil_sr: Receiver<T>, } impl<T> Clone for Chan<T> { @@ -57,35 +63,53 @@ impl<T> Chan<T> { } fn try_recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.try_recv().ok() } fn recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.recv().ok() } - fn close(&self) { + fn close_s(&self) { self.inner .lock() .unwrap() .s .take() - .expect("channel already closed"); + .expect("channel sender already closed"); + } + + fn close_r(&self) { + self.inner + .lock() + .unwrap() + .r + .take() + .expect("channel receiver already closed"); + } + + fn has_rx(&self) -> bool { + self.inner.lock().unwrap().r.is_some() + } + + fn has_tx(&self) -> bool { + self.inner.lock().unwrap().s.is_some() } fn rx(&self) -> Receiver<T> { - self.inner.lock().unwrap().r.clone() + let inner = self.inner.lock().unwrap(); + match inner.r.as_ref() { + None => inner.nil_r.clone(), + Some(r) => r.clone(), + } } fn tx(&self) -> Sender<T> { - match self.inner.lock().unwrap().s.as_ref() { - None => { - let (s, r) = bounded(0); - std::mem::forget(r); - s - } + let inner = self.inner.lock().unwrap(); + match inner.s.as_ref() { + None => inner.nil_s.clone(), Some(s) => s.clone(), } } @@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> { fn make<T>(cap: usize) -> Chan<T> { let (s, r) = bounded(cap); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } fn make_unbounded<T>() -> Chan<T> { let (s, r) = unbounded(); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } + #[derive(Clone)] struct WaitGroup(Arc<WaitGroupInner>); @@ -199,14 +238,6 @@ macro_rules! defer { } macro_rules! go { - (@parse ref $v:ident, $($tail:tt)*) => {{ - let ref $v = $v; - go!(@parse $($tail)*) - }}; - (@parse move $v:ident, $($tail:tt)*) => {{ - let $v = $v; - go!(@parse $($tail)*) - }}; (@parse $v:ident, $($tail:tt)*) => {{ let $v = $v.clone(); go!(@parse $($tail)*) @@ -240,10 +271,10 @@ mod doubleselect { const ITERATIONS: i32 = 10_000; fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { - defer! { c1.close() } - defer! { c2.close() } - defer! { c3.close() } - defer! { c4.close() } + defer! { c1.close_s() } + defer! { c2.close_s() } + defer! { c3.close_s() } + defer! { c4.close_s() } for i in 0..n { select! { @@ -292,7 +323,7 @@ mod doubleselect { done.recv(); done.recv(); done.recv(); - cmux.close(); + cmux.close_s(); }); recver(cmux); } @@ -697,7 +728,7 @@ mod select2 { use super::*; #[cfg(miri)] - const N: i32 = 1000; + const N: i32 = 200; #[cfg(not(miri))] const N: i32 = 100000; @@ -892,6 +923,9 @@ mod sieve1 { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, ]; + #[cfg(miri)] + let a = &a[..10]; + for item in a.iter() { let x = primes.recv().unwrap(); if x != *item { @@ -929,6 +963,11 @@ mod chan_test { #[cfg(not(miri))] const N: i32 = 200; + #[cfg(miri)] + const MESSAGES_COUNT: i32 = 20; + #[cfg(not(miri))] + const MESSAGES_COUNT: i32 = 100; + for cap in 0..N { { // Ensure that receive from empty chan blocks. @@ -999,7 +1038,7 @@ mod chan_test { for i in 0..cap { c.send(i); } - c.close(); + c.close_s(); for i in 0..cap { let v = c.recv(); @@ -1027,7 +1066,7 @@ mod chan_test { }); thread::sleep(ms(1)); - c.close(); + c.close_s(); if !done.recv().unwrap() { panic!(); @@ -1035,15 +1074,15 @@ mod chan_test { } { - // Send 100 integers, + // Send many integers, // ensure that we receive them non-corrupted in FIFO order. let c = make::<i32>(cap as usize); go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1051,11 +1090,11 @@ mod chan_test { // Same, but using recv2. go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1082,7 +1121,7 @@ mod chan_test { } }); - c.close(); + c.close_s(); c.recv(); t.join().unwrap(); } @@ -1149,7 +1188,7 @@ mod chan_test { done.send(true); }); - c2.close(); + c2.close_s(); select! { recv(c1.rx()) -> _ => {} default => {} @@ -1378,7 +1417,7 @@ mod chan_test { ); } - done.close(); + done.close_s(); wg.wait(); } @@ -1481,9 +1520,9 @@ mod chan_test { *expect.lock().unwrap() += v; q.send(v); } - q.close(); + q.close_s(); wg.wait(); - r.close(); + r.close_s(); }); let mut n = 0; @@ -1541,8 +1580,504 @@ mod race_chan_test { } // https://github.com/golang/go/blob/master/test/ken/chan.go +#[cfg(not(miri))] // Miri is too slow mod chan { - // TODO + + use super::*; + + const MESSAGES_PER_CHANEL: u32 = 76; + const MESSAGES_RANGE_LEN: u32 = 100; + const END: i32 = 10000; + + struct ChanWithVals { + chan: Chan<i32>, + /// Next value to send + sv: Arc<AtomicI32>, + /// Next value to receive + rv: Arc<AtomicI32>, + } + + struct Totals { + /// Total sent messages + tots: u32, + /// Total received messages + totr: u32, + } + + struct Context { + nproc: Arc<Mutex<i32>>, + cval: Arc<Mutex<i32>>, + tot: Arc<Mutex<Totals>>, + nc: ChanWithVals, + randx: Arc<Mutex<i32>>, + } + + impl ChanWithVals { + fn with_capacity(capacity: usize) -> Self { + ChanWithVals { + chan: make(capacity), + sv: Arc::new(AtomicI32::new(0)), + rv: Arc::new(AtomicI32::new(0)), + } + } + + fn closed() -> Self { + let ch = ChanWithVals::with_capacity(0); + ch.chan.close_r(); + ch.chan.close_s(); + ch + } + + fn rv(&self) -> i32 { + self.rv.load(SeqCst) + } + + fn sv(&self) -> i32 { + self.sv.load(SeqCst) + } + + fn send(&mut self, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.tots += 1 + } + let esv = expect(self.sv(), self.sv()); + self.sv.store(esv, SeqCst); + if self.sv() == END { + self.chan.close_s(); + return true; + } + false + } + + fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.totr += 1 + } + let erv = expect(self.rv(), v); + self.rv.store(erv, SeqCst); + if self.rv() == END { + self.chan.close_r(); + return true; + } + false + } + } + + impl Clone for ChanWithVals { + fn clone(&self) -> Self { + ChanWithVals { + chan: self.chan.clone(), + sv: self.sv.clone(), + rv: self.rv.clone(), + } + } + } + + impl Context { + fn nproc(&self) -> &Mutex<i32> { + self.nproc.as_ref() + } + + fn cval(&self) -> &Mutex<i32> { + self.cval.as_ref() + } + + fn tot(&self) -> &Mutex<Totals> { + self.tot.as_ref() + } + + fn randx(&self) -> &Mutex<i32> { + self.randx.as_ref() + } + } + + impl Clone for Context { + fn clone(&self) -> Self { + Context { + nproc: self.nproc.clone(), + cval: self.cval.clone(), + tot: self.tot.clone(), + nc: self.nc.clone(), + randx: self.randx.clone(), + } + } + } + + fn nrand(n: i32, randx: &Mutex<i32>) -> i32 { + let mut randx = randx.lock().unwrap(); + *randx += 10007; + if *randx >= 1000000 { + *randx -= 1000000 + } + *randx % n + } + + fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 { + let mut nproc = nproc.lock().unwrap(); + *nproc += adjust; + *nproc + } + + fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> { + let mut ca = Vec::<ChanWithVals>::with_capacity(n); + let mut cval = cval.lock().unwrap(); + for _ in 0..n { + *cval += MESSAGES_RANGE_LEN as i32; + let chl = ChanWithVals::with_capacity(c); + chl.sv.store(*cval, SeqCst); + chl.rv.store(*cval, SeqCst); + ca.push(chl); + } + ca + } + + fn expect(v: i32, v0: i32) -> i32 { + if v == v0 { + return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { + END + } else { + v + 1 + }; + } + panic!("got {}, expected {}", v, v0 + 1); + } + + fn send(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in 0..=nrand(10, ctx.randx()) { + thread::yield_now(); + } + c.chan.tx().send(c.sv()).unwrap(); + if c.send(ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn recv(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in (0..nrand(10, ctx.randx())).rev() { + thread::yield_now(); + } + let v = c.chan.rx().recv().unwrap(); + if c.recv(v, ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + #[allow(clippy::too_many_arguments)] + fn sel( + mut r0: ChanWithVals, + mut r1: ChanWithVals, + mut r2: ChanWithVals, + mut r3: ChanWithVals, + mut s0: ChanWithVals, + mut s1: ChanWithVals, + mut s2: ChanWithVals, + mut s3: ChanWithVals, + ctx: Context, + ) { + let mut a = 0; // local chans running + + if r0.chan.has_rx() { + a += 1; + } + if r1.chan.has_rx() { + a += 1; + } + if r2.chan.has_rx() { + a += 1; + } + if r3.chan.has_rx() { + a += 1; + } + if s0.chan.has_tx() { + a += 1; + } + if s1.chan.has_tx() { + a += 1; + } + if s2.chan.has_tx() { + a += 1; + } + if s3.chan.has_tx() { + a += 1; + } + + loop { + for _ in 0..=nrand(5, ctx.randx()) { + thread::yield_now(); + } + select! { + recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, + send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, + send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, + send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, + } + if a == 0 { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { + vec.get(idx).unwrap().clone() + } + + /// Direct send to direct recv + fn test1(c: ChanWithVals, ctx: &mut Context) { + change_nproc(2, ctx.nproc()); + go!(c, ctx, send(c, ctx)); + go!(c, ctx, recv(c, ctx)); + } + + /// Direct send to select recv + fn test2(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 0), ctx)); + go!(ca, ctx, send(get(&ca, 1), ctx)); + go!(ca, ctx, send(get(&ca, 2), ctx)); + go!(ca, ctx, send(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to direct recv + fn test3(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 0), ctx)); + go!(ca, ctx, recv(get(&ca, 1), ctx)); + go!(ca, ctx, recv(get(&ca, 2), ctx)); + go!(ca, ctx, recv(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + } + + /// Select send to select recv, 4 channels + fn test4(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to select recv, 8 channels + fn test5(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 8, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + ctx, + ) + ); + } + + // Direct and select send to direct and select recv + fn test6(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 12, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 4), ctx)); + go!(ca, ctx, send(get(&ca, 5), ctx)); + go!(ca, ctx, send(get(&ca, 6), ctx)); + go!(ca, ctx, send(get(&ca, 7), ctx)); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 8), ctx)); + go!(ca, ctx, recv(get(&ca, 9), ctx)); + go!(ca, ctx, recv(get(&ca, 10), ctx)); + go!(ca, ctx, recv(get(&ca, 11), ctx)); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 8), + get(&ca, 9), + get(&ca, 10), + get(&ca, 11), + ctx, + ) + ); + } + + fn wait(ctx: &mut Context) { + thread::yield_now(); + while change_nproc(0, ctx.nproc()) != 0 { + thread::yield_now(); + } + } + + fn tests(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + test1(get(&ca, 0), ctx); + test1(get(&ca, 1), ctx); + test1(get(&ca, 2), ctx); + test1(get(&ca, 3), ctx); + wait(ctx); + + test2(c, ctx); + wait(ctx); + + test3(c, ctx); + wait(ctx); + + test4(c, ctx); + wait(ctx); + + test5(c, ctx); + wait(ctx); + + test6(c, ctx); + wait(ctx); + } + + #[test] + fn main() { + let mut ctx = Context { + nproc: Arc::new(Mutex::new(0)), + cval: Arc::new(Mutex::new(0)), + tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), + nc: ChanWithVals::closed(), + randx: Arc::new(Mutex::new(0)), + }; + + tests(0, &mut ctx); + tests(1, &mut ctx); + tests(10, &mut ctx); + tests(100, &mut ctx); + + #[rustfmt::skip] + let t = 4 * // buffer sizes + (4*4 + // tests 1,2,3,4 channels + 8 + // test 5 channels + 12) * // test 6 channels + MESSAGES_PER_CHANEL; // sends/recvs on a channel + + let tot = ctx.tot.lock().unwrap(); + if tot.tots != t || tot.totr != t { + panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); + } + } } // https://github.com/golang/go/blob/master/test/ken/chan1.go @@ -1551,7 +2086,7 @@ mod chan1 { // sent messages #[cfg(miri)] - const N: usize = 100; + const N: usize = 20; #[cfg(not(miri))] const N: usize = 1000; // receiving "goroutines" diff --git a/tests/list.rs b/tests/list.rs index 619e1fc..a0b9087 100644 --- a/tests/list.rs +++ b/tests/list.rs @@ -132,8 +132,13 @@ fn recv_timeout() { #[test] fn try_send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.try_send(i), Ok(())); } @@ -143,8 +148,13 @@ fn try_send() { #[test] fn send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.send(i), Ok(())); } @@ -154,8 +164,13 @@ fn send() { #[test] fn send_timeout() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(())); } @@ -383,10 +398,16 @@ fn stress_timeout_two_threads() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -402,8 +423,8 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..1000); + let steps = rng.gen_range(0..STEPS); + let additional = rng.gen_range(0..STEPS / 10); DROPS.store(0, Ordering::SeqCst); let (s, r) = unbounded::<DropCounter>(); @@ -412,6 +433,8 @@ fn drops() { scope.spawn(|_| { for _ in 0..steps { r.recv().unwrap(); + #[cfg(miri)] + std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388 } }); diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 4d6e179..3db4812 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -321,7 +321,7 @@ mod channel_tests { #[test] fn stress() { #[cfg(miri)] - const COUNT: usize = 500; + const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10000; @@ -340,7 +340,7 @@ mod channel_tests { #[test] fn stress_shared() { #[cfg(miri)] - const AMT: u32 = 500; + const AMT: u32 = 100; #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; @@ -747,7 +747,7 @@ mod channel_tests { #[test] fn recv_a_lot() { #[cfg(miri)] - const N: usize = 100; + const N: usize = 50; #[cfg(not(miri))] const N: usize = 10000; diff --git a/tests/select.rs b/tests/select.rs index f24aed8..e7691f5 100644 --- a/tests/select.rs +++ b/tests/select.rs @@ -694,7 +694,7 @@ fn nesting() { #[test] fn stress_recv() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -735,7 +735,7 @@ fn stress_recv() { #[test] fn stress_send() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -953,7 +953,7 @@ fn matching_with_leftover() { #[test] fn channel_through_channel() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 1000; @@ -1014,7 +1014,7 @@ fn channel_through_channel() { #[test] fn linearizable_try() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -1069,7 +1069,7 @@ fn linearizable_try() { #[test] fn linearizable_timeout() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -1124,7 +1124,7 @@ fn linearizable_timeout() { #[test] fn fairness1() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1173,7 +1173,7 @@ fn fairness1() { #[test] fn fairness2() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1292,7 +1292,7 @@ fn send_and_clone() { #[test] fn reuse() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; diff --git a/tests/select_macro.rs b/tests/select_macro.rs index 0b9a21a..91c04e1 100644 --- a/tests/select_macro.rs +++ b/tests/select_macro.rs @@ -488,7 +488,7 @@ fn panic_receiver() { #[test] fn stress_recv() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1468,3 +1468,14 @@ fn disconnect_wakes_receiver() { }) .unwrap(); } + +#[test] +fn trailing_comma() { + let (s, r) = unbounded::<usize>(); + + select! { + send(s, 1,) -> _ => {}, + recv(r,) -> _ => {}, + default(ms(1000),) => {}, + } +} diff --git a/tests/zero.rs b/tests/zero.rs index ba41b1a..c90d741 100644 --- a/tests/zero.rs +++ b/tests/zero.rs @@ -188,7 +188,7 @@ fn send_timeout() { #[test] fn len() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 25_000; @@ -253,7 +253,7 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -278,7 +278,7 @@ fn spsc() { #[test] fn mpmc() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -313,7 +313,7 @@ fn mpmc() { #[test] fn stress_oneshot() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -396,10 +396,16 @@ fn stress_timeout_two_threads() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 500; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -415,7 +421,7 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..3_000); + let steps = rng.gen_range(0..STEPS); DROPS.store(0, Ordering::SeqCst); let (s, r) = bounded::<DropCounter>(0); @@ -445,7 +451,7 @@ fn drops() { #[test] fn fairness() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -479,7 +485,7 @@ fn fairness() { #[test] fn fairness_duplicates() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -540,7 +546,7 @@ fn recv_in_send() { #[test] fn channel_through_channel() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 1000; |