aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-04-12 23:05:42 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-04-12 23:05:42 +0000
commit11b36bd7423e5fd1be722e4011e13de621b7a2b9 (patch)
treeb6220d6f0ac2cbe211e4475c51b066fcebe7c5cb
parentfa0aaabd3ee245fdb844f68d1a709d85356fe7d0 (diff)
parentcc5d71ed34bb38374074714930103e1649c61858 (diff)
downloadcrossbeam-utils-11b36bd7423e5fd1be722e4011e13de621b7a2b9.tar.gz
Upgrade rust/crates/crossbeam-utils to 0.8.3 am: 2e9962829e am: cab5ae797c am: cc5d71ed34
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/crossbeam-utils/+/1662781 Change-Id: I6c438ee16ab62dbdc8f5e45f0a095c4c23eaa85f
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--CHANGELOG.md13
-rw-r--r--Cargo.toml8
-rw-r--r--Cargo.toml.orig12
-rw-r--r--METADATA10
-rw-r--r--README.md2
-rw-r--r--TEST_MAPPING8
-rw-r--r--benches/atomic_cell.rs8
-rw-r--r--src/atomic/atomic_cell.rs27
-rw-r--r--src/atomic/consume.rs14
-rw-r--r--src/atomic/mod.rs5
-rw-r--r--src/atomic/seq_lock.rs14
-rw-r--r--src/atomic/seq_lock_wide.rs14
-rw-r--r--src/backoff.rs15
-rw-r--r--src/cache_padded.rs60
-rw-r--r--src/lib.rs65
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/parker.rs110
-rw-r--r--src/sync/wait_group.rs2
-rw-r--r--src/thread.rs3
-rw-r--r--tests/cache_padded.rs4
-rw-r--r--tests/parker.rs4
-rw-r--r--tests/sharded_lock.rs7
23 files changed, 298 insertions, 111 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 3dcef5c..1d9c34d 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "99c3230b263202aca56497b1f8e418a7b3647a23"
+ "sha1": "d841a2028dc72b4e09739116f07e865db60f3690"
}
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a17c6e6..c4a92bf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,14 @@
+# Version 0.8.3
+
+- Make `loom` dependency optional. (#666)
+
+# Version 0.8.2
+
+- Deprecate `AtomicCell::compare_and_swap`. Use `AtomicCell::compare_exchange` instead. (#619)
+- Add `Parker::park_deadline`. (#563)
+- Improve implementation of `CachePadded`. (#636)
+- Add unstable support for `loom`. (#487)
+
# Version 0.8.1
- Make `AtomicCell::is_lock_free` always const fn. (#600)
@@ -8,7 +19,7 @@
# Version 0.8.0
- Bump the minimum supported Rust version to 1.36.
-- Remove deprecated `AtomicCell::get_mut()` and `Backoff::is_complete()` methods
+- Remove deprecated `AtomicCell::get_mut()` and `Backoff::is_complete()` methods.
- Remove `alloc` feature.
- Make `CachePadded::new()` const function.
- Make `AtomicCell::is_lock_free()` const function at 1.46+.
diff --git a/Cargo.toml b/Cargo.toml
index edb08f9..a7d694c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,12 +13,11 @@
[package]
edition = "2018"
name = "crossbeam-utils"
-version = "0.8.1"
+version = "0.8.3"
authors = ["The Crossbeam Project Developers"]
description = "Utilities for concurrent programming"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils"
documentation = "https://docs.rs/crossbeam-utils"
-readme = "README.md"
keywords = ["scoped", "thread", "atomic", "cache"]
categories = ["algorithms", "concurrency", "data-structures", "no-std"]
license = "MIT OR Apache-2.0"
@@ -30,7 +29,7 @@ version = "1"
version = "1.4.0"
optional = true
[dev-dependencies.rand]
-version = "0.7.3"
+version = "0.8"
[build-dependencies.autocfg]
version = "1.0.0"
@@ -38,3 +37,6 @@ version = "1.0.0"
default = ["std"]
nightly = []
std = ["lazy_static"]
+[target."cfg(crossbeam_loom)".dependencies.loom]
+version = "0.4"
+optional = true
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 6011c75..30697d2 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,11 +4,10 @@ name = "crossbeam-utils"
# - Update CHANGELOG.md
# - Update README.md
# - Create "crossbeam-utils-X.Y.Z" git tag
-version = "0.8.1"
+version = "0.8.3"
authors = ["The Crossbeam Project Developers"]
edition = "2018"
license = "MIT OR Apache-2.0"
-readme = "README.md"
repository = "https://github.com/crossbeam-rs/crossbeam"
homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils"
documentation = "https://docs.rs/crossbeam-utils"
@@ -33,8 +32,15 @@ nightly = []
cfg-if = "1"
lazy_static = { version = "1.4.0", optional = true }
+# Enable the use of loom for concurrency testing.
+#
+# This configuration option is outside of the normal semver guarantees: minor
+# versions of crossbeam may make breaking changes to it at any time.
+[target.'cfg(crossbeam_loom)'.dependencies]
+loom = { version = "0.4", optional = true }
+
[build-dependencies]
autocfg = "1.0.0"
[dev-dependencies]
-rand = "0.7.3"
+rand = "0.8"
diff --git a/METADATA b/METADATA
index 863be3f..e66f8d0 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/crossbeam-utils/crossbeam-utils-0.8.1.crate"
+ value: "https://static.crates.io/crates/crossbeam-utils/crossbeam-utils-0.8.3.crate"
}
- version: "0.8.1"
+ version: "0.8.3"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 12
- day: 21
+ year: 2021
+ month: 4
+ day: 1
}
}
diff --git a/README.md b/README.md
index 7e95829..fd0943b 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)](
https://github.com/crossbeam-rs/crossbeam/actions)
-[![License](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)](
+[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)](
https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils#license)
[![Cargo](https://img.shields.io/crates/v/crossbeam-utils.svg)](
https://crates.io/crates/crossbeam-utils)
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..25a9cdc
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,8 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "presubmit": [
+ {
+ "name": "crossbeam-epoch_device_test_src_lib"
+ }
+ ]
+}
diff --git a/benches/atomic_cell.rs b/benches/atomic_cell.rs
index 938a8fe..844f7c0 100644
--- a/benches/atomic_cell.rs
+++ b/benches/atomic_cell.rs
@@ -28,11 +28,11 @@ fn fetch_add_u8(b: &mut test::Bencher) {
}
#[bench]
-fn compare_and_swap_u8(b: &mut test::Bencher) {
+fn compare_exchange_u8(b: &mut test::Bencher) {
let a = AtomicCell::new(0u8);
let mut i = 0;
b.iter(|| {
- a.compare_and_swap(i, i.wrapping_add(1));
+ let _ = a.compare_exchange(i, i.wrapping_add(1));
i = i.wrapping_add(1);
});
}
@@ -102,11 +102,11 @@ fn fetch_add_usize(b: &mut test::Bencher) {
}
#[bench]
-fn compare_and_swap_usize(b: &mut test::Bencher) {
+fn compare_exchange_usize(b: &mut test::Bencher) {
let a = AtomicCell::new(0usize);
let mut i = 0;
b.iter(|| {
- a.compare_and_swap(i, i.wrapping_add(1));
+ let _ = a.compare_exchange(i, i.wrapping_add(1));
i = i.wrapping_add(1);
});
}
diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs
index e8f6804..ad094b2 100644
--- a/src/atomic/atomic_cell.rs
+++ b/src/atomic/atomic_cell.rs
@@ -2,15 +2,19 @@
#![allow(clippy::unit_arg)]
#![allow(clippy::let_unit_value)]
+use crate::primitive::sync::atomic::{self, AtomicBool};
use core::cell::UnsafeCell;
use core::fmt;
use core::mem;
+use core::sync::atomic::Ordering;
+
+#[cfg(not(crossbeam_loom))]
use core::ptr;
-use core::sync::atomic::{self, AtomicBool, Ordering};
#[cfg(feature = "std")]
use std::panic::{RefUnwindSafe, UnwindSafe};
+#[cfg(not(crossbeam_loom))]
use super::seq_lock::SeqLock;
/// A thread-safe mutable memory location.
@@ -213,6 +217,7 @@ impl<T: Copy + Eq> AtomicCell<T> {
/// # Examples
///
/// ```
+ /// # #![allow(deprecated)]
/// use crossbeam_utils::atomic::AtomicCell;
///
/// let a = AtomicCell::new(1);
@@ -223,6 +228,8 @@ impl<T: Copy + Eq> AtomicCell<T> {
/// assert_eq!(a.compare_and_swap(1, 2), 1);
/// assert_eq!(a.load(), 2);
/// ```
+ // TODO: remove in the next major version.
+ #[deprecated(note = "Use `compare_exchange` instead")]
pub fn compare_and_swap(&self, current: T, new: T) -> T {
match self.compare_exchange(current, new) {
Ok(v) => v,
@@ -492,23 +499,23 @@ macro_rules! impl_arithmetic {
#[cfg(has_atomic_u8)]
impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);");
-#[cfg(has_atomic_u8)]
+#[cfg(all(has_atomic_u8, not(crossbeam_loom)))]
impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);");
#[cfg(has_atomic_u16)]
impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);");
-#[cfg(has_atomic_u16)]
+#[cfg(all(has_atomic_u16, not(crossbeam_loom)))]
impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);");
#[cfg(has_atomic_u32)]
impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);");
-#[cfg(has_atomic_u32)]
+#[cfg(all(has_atomic_u32, not(crossbeam_loom)))]
impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);");
#[cfg(has_atomic_u64)]
impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);");
-#[cfg(has_atomic_u64)]
+#[cfg(all(has_atomic_u64, not(crossbeam_loom)))]
impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);");
-#[cfg(has_atomic_u128)]
+#[cfg(all(has_atomic_u128, not(crossbeam_loom)))]
impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);");
-#[cfg(has_atomic_u128)]
+#[cfg(all(has_atomic_u128, not(crossbeam_loom)))]
impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);");
impl_arithmetic!(
@@ -516,6 +523,7 @@ impl_arithmetic!(
atomic::AtomicUsize,
"let a = AtomicCell::new(7usize);"
);
+#[cfg(not(crossbeam_loom))]
impl_arithmetic!(
isize,
atomic::AtomicIsize,
@@ -624,6 +632,7 @@ const fn can_transmute<A, B>() -> bool {
/// scalability.
#[inline]
#[must_use]
+#[cfg(not(crossbeam_loom))]
fn lock(addr: usize) -> &'static SeqLock {
// The number of locks is a prime number because we want to make sure `addr % LEN` gets
// dispersed across all locks.
@@ -769,6 +778,7 @@ impl AtomicUnit {
#[inline]
fn swap(&self, _val: (), _order: Ordering) {}
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
#[inline]
fn compare_exchange_weak(
&self,
@@ -810,6 +820,9 @@ macro_rules! atomic {
#[cfg(has_atomic_u128)]
atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op);
+ #[cfg(crossbeam_loom)]
+ unimplemented!("loom does not support non-atomic atomic ops");
+ #[cfg(not(crossbeam_loom))]
break $fallback_op;
}
};
diff --git a/src/atomic/consume.rs b/src/atomic/consume.rs
index 584fc34..0fbd93e 100644
--- a/src/atomic/consume.rs
+++ b/src/atomic/consume.rs
@@ -1,5 +1,5 @@
#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
-use core::sync::atomic::compiler_fence;
+use crate::primitive::sync::atomic::compiler_fence;
use core::sync::atomic::Ordering;
/// Trait which allows reading from primitive atomic types with "consume" ordering.
@@ -53,11 +53,17 @@ macro_rules! impl_atomic {
type Val = $val;
impl_consume!();
}
+ #[cfg(crossbeam_loom)]
+ impl AtomicConsume for ::loom::sync::atomic::$atomic {
+ type Val = $val;
+ impl_consume!();
+ }
};
}
impl_atomic!(AtomicBool, bool);
impl_atomic!(AtomicUsize, usize);
+#[cfg(not(crossbeam_loom))]
impl_atomic!(AtomicIsize, isize);
#[cfg(has_atomic_u8)]
impl_atomic!(AtomicU8, u8);
@@ -80,3 +86,9 @@ impl<T> AtomicConsume for ::core::sync::atomic::AtomicPtr<T> {
type Val = *mut T;
impl_consume!();
}
+
+#[cfg(crossbeam_loom)]
+impl<T> AtomicConsume for ::loom::sync::atomic::AtomicPtr<T> {
+ type Val = *mut T;
+ impl_consume!();
+}
diff --git a/src/atomic/mod.rs b/src/atomic/mod.rs
index 7309c16..874eaf2 100644
--- a/src/atomic/mod.rs
+++ b/src/atomic/mod.rs
@@ -1,7 +1,12 @@
//! Atomic types.
+//!
+//! * [`AtomicCell`], a thread-safe mutable memory location.
+//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering.
+#[cfg(not(crossbeam_loom))]
use cfg_if::cfg_if;
+#[cfg(not(crossbeam_loom))]
cfg_if! {
// Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap
// around.
diff --git a/src/atomic/seq_lock.rs b/src/atomic/seq_lock.rs
index a423bc0..ff8defd 100644
--- a/src/atomic/seq_lock.rs
+++ b/src/atomic/seq_lock.rs
@@ -4,7 +4,7 @@ use core::sync::atomic::{self, AtomicUsize, Ordering};
use crate::Backoff;
/// A simple stamped lock.
-pub struct SeqLock {
+pub(crate) struct SeqLock {
/// The current state of the lock.
///
/// All bits except the least significant one hold the current stamp. When locked, the state
@@ -13,7 +13,7 @@ pub struct SeqLock {
}
impl SeqLock {
- pub const fn new() -> Self {
+ pub(crate) const fn new() -> Self {
Self {
state: AtomicUsize::new(0),
}
@@ -23,7 +23,7 @@ impl SeqLock {
///
/// This method should be called before optimistic reads.
#[inline]
- pub fn optimistic_read(&self) -> Option<usize> {
+ pub(crate) fn optimistic_read(&self) -> Option<usize> {
let state = self.state.load(Ordering::Acquire);
if state == 1 {
None
@@ -37,14 +37,14 @@ impl SeqLock {
/// This method should be called after optimistic reads to check whether they are valid. The
/// argument `stamp` should correspond to the one returned by method `optimistic_read`.
#[inline]
- pub fn validate_read(&self, stamp: usize) -> bool {
+ pub(crate) fn validate_read(&self, stamp: usize) -> bool {
atomic::fence(Ordering::Acquire);
self.state.load(Ordering::Relaxed) == stamp
}
/// Grabs the lock for writing.
#[inline]
- pub fn write(&'static self) -> SeqLockWriteGuard {
+ pub(crate) fn write(&'static self) -> SeqLockWriteGuard {
let backoff = Backoff::new();
loop {
let previous = self.state.swap(1, Ordering::Acquire);
@@ -64,7 +64,7 @@ impl SeqLock {
}
/// An RAII guard that releases the lock and increments the stamp when dropped.
-pub struct SeqLockWriteGuard {
+pub(crate) struct SeqLockWriteGuard {
/// The parent lock.
lock: &'static SeqLock,
@@ -75,7 +75,7 @@ pub struct SeqLockWriteGuard {
impl SeqLockWriteGuard {
/// Releases the lock without incrementing the stamp.
#[inline]
- pub fn abort(self) {
+ pub(crate) fn abort(self) {
self.lock.state.store(self.state, Ordering::Release);
// We specifically don't want to call drop(), since that's
diff --git a/src/atomic/seq_lock_wide.rs b/src/atomic/seq_lock_wide.rs
index 871a93d..ef5d94a 100644
--- a/src/atomic/seq_lock_wide.rs
+++ b/src/atomic/seq_lock_wide.rs
@@ -7,7 +7,7 @@ use crate::Backoff;
///
/// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low
/// bits.
-pub struct SeqLock {
+pub(crate) struct SeqLock {
/// The high bits of the current state of the lock.
state_hi: AtomicUsize,
@@ -19,7 +19,7 @@ pub struct SeqLock {
}
impl SeqLock {
- pub const fn new() -> Self {
+ pub(crate) const fn new() -> Self {
Self {
state_hi: AtomicUsize::new(0),
state_lo: AtomicUsize::new(0),
@@ -30,7 +30,7 @@ impl SeqLock {
///
/// This method should be called before optimistic reads.
#[inline]
- pub fn optimistic_read(&self) -> Option<(usize, usize)> {
+ pub(crate) fn optimistic_read(&self) -> Option<(usize, usize)> {
// The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in
// `SeqLockWriteGuard::drop`.
//
@@ -51,7 +51,7 @@ impl SeqLock {
/// This method should be called after optimistic reads to check whether they are valid. The
/// argument `stamp` should correspond to the one returned by method `optimistic_read`.
#[inline]
- pub fn validate_read(&self, stamp: (usize, usize)) -> bool {
+ pub(crate) fn validate_read(&self, stamp: (usize, usize)) -> bool {
// Thanks to the fence, if we're noticing any modification to the data at the critical
// section of `(a, b)`, then the critical section's write of 1 to state_lo should be
// visible.
@@ -76,7 +76,7 @@ impl SeqLock {
/// Grabs the lock for writing.
#[inline]
- pub fn write(&'static self) -> SeqLockWriteGuard {
+ pub(crate) fn write(&'static self) -> SeqLockWriteGuard {
let backoff = Backoff::new();
loop {
let previous = self.state_lo.swap(1, Ordering::Acquire);
@@ -98,7 +98,7 @@ impl SeqLock {
}
/// An RAII guard that releases the lock and increments the stamp when dropped.
-pub struct SeqLockWriteGuard {
+pub(crate) struct SeqLockWriteGuard {
/// The parent lock.
lock: &'static SeqLock,
@@ -109,7 +109,7 @@ pub struct SeqLockWriteGuard {
impl SeqLockWriteGuard {
/// Releases the lock without incrementing the stamp.
#[inline]
- pub fn abort(self) {
+ pub(crate) fn abort(self) {
self.lock.state_lo.store(self.state_lo, Ordering::Release);
mem::forget(self);
}
diff --git a/src/backoff.rs b/src/backoff.rs
index 2391dd1..1012f06 100644
--- a/src/backoff.rs
+++ b/src/backoff.rs
@@ -1,6 +1,6 @@
+use crate::primitive::sync::atomic;
use core::cell::Cell;
use core::fmt;
-use core::sync::atomic;
const SPIN_LIMIT: u32 = 6;
const YIELD_LIMIT: u32 = 10;
@@ -27,7 +27,7 @@ const YIELD_LIMIT: u32 = 10;
/// let backoff = Backoff::new();
/// loop {
/// let val = a.load(SeqCst);
-/// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val {
+/// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
/// return val;
/// }
/// backoff.spin();
@@ -131,7 +131,7 @@ impl Backoff {
/// let backoff = Backoff::new();
/// loop {
/// let val = a.load(SeqCst);
- /// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val {
+ /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
/// return val;
/// }
/// backoff.spin();
@@ -145,6 +145,9 @@ impl Backoff {
#[inline]
pub fn spin(&self) {
for _ in 0..1 << self.step.get().min(SPIN_LIMIT) {
+ // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
+ // use [`core::hint::spin_loop`] instead.
+ #[allow(deprecated)]
atomic::spin_loop_hint();
}
@@ -205,11 +208,17 @@ impl Backoff {
pub fn snooze(&self) {
if self.step.get() <= SPIN_LIMIT {
for _ in 0..1 << self.step.get() {
+ // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
+ // use [`core::hint::spin_loop`] instead.
+ #[allow(deprecated)]
atomic::spin_loop_hint();
}
} else {
#[cfg(not(feature = "std"))]
for _ in 0..1 << self.step.get() {
+ // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
+ // use [`core::hint::spin_loop`] instead.
+ #[allow(deprecated)]
atomic::spin_loop_hint();
}
diff --git a/src/cache_padded.rs b/src/cache_padded.rs
index 62c686b..822e831 100644
--- a/src/cache_padded.rs
+++ b/src/cache_padded.rs
@@ -13,7 +13,9 @@ use core::ops::{Deref, DerefMut};
///
/// Cache lines are assumed to be N bytes long, depending on the architecture:
///
-/// * On x86-64 and aarch64, N = 128.
+/// * On x86-64, aarch64, and powerpc64, N = 128.
+/// * On arm, mips, mips64, and riscv64, N = 32.
+/// * On s390x, N = 256.
/// * On all others, N = 64.
///
/// Note that N is just a reasonable guess and is not guaranteed to match the actual cache line
@@ -64,13 +66,63 @@ use core::ops::{Deref, DerefMut};
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
-// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128 byte cache line size
+// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
+//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
-#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
+// powerpc64 has 128-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+#[cfg_attr(
+ any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ ),
+ repr(align(128))
+)]
+// arm, mips, mips64, and riscv64 have 32-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+#[cfg_attr(
+ any(
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ ),
+ repr(align(32))
+)]
+// s390x has 256-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+#[cfg_attr(target_arch = "s390x", repr(align(256)))]
+// x86 and wasm have 64-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+//
+// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
- not(any(target_arch = "x86_64", target_arch = "aarch64")),
+ not(any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ target_arch = "s390x",
+ )),
repr(align(64))
)]
pub struct CachePadded<T> {
diff --git a/src/lib.rs b/src/lib.rs
index f2bd460..880d37e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,11 +31,68 @@
allow(dead_code, unused_assignments, unused_variables)
)
))]
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+#![warn(
+ missing_docs,
+ missing_debug_implementations,
+ rust_2018_idioms,
+ unreachable_pub
+)]
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))]
-// matches! requires Rust 1.42
-#![allow(clippy::match_like_matches_macro)]
+
+#[cfg(crossbeam_loom)]
+#[allow(unused_imports)]
+mod primitive {
+ pub(crate) mod sync {
+ pub(crate) mod atomic {
+ pub(crate) use loom::sync::atomic::spin_loop_hint;
+ pub(crate) use loom::sync::atomic::{
+ AtomicBool, AtomicU16, AtomicU32, AtomicU64, AtomicU8, AtomicUsize,
+ };
+
+ // FIXME: loom does not support compiler_fence at the moment.
+ // https://github.com/tokio-rs/loom/issues/117
+ // we use fence as a stand-in for compiler_fence for the time being.
+ // this may miss some races since fence is stronger than compiler_fence,
+ // but it's the best we can do for the time being.
+ pub(crate) use loom::sync::atomic::fence as compiler_fence;
+ }
+ pub(crate) use loom::sync::{Arc, Condvar, Mutex};
+ }
+}
+#[cfg(not(crossbeam_loom))]
+#[allow(unused_imports)]
+mod primitive {
+ pub(crate) mod sync {
+ pub(crate) mod atomic {
+ pub(crate) use core::sync::atomic::compiler_fence;
+ // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
+ // use [`core::hint::spin_loop`] instead.
+ #[allow(deprecated)]
+ pub(crate) use core::sync::atomic::spin_loop_hint;
+ pub(crate) use core::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize};
+ #[cfg(has_atomic_u16)]
+ pub(crate) use core::sync::atomic::{AtomicI16, AtomicU16};
+ #[cfg(has_atomic_u32)]
+ pub(crate) use core::sync::atomic::{AtomicI32, AtomicU32};
+ #[cfg(has_atomic_u64)]
+ pub(crate) use core::sync::atomic::{AtomicI64, AtomicU64};
+ #[cfg(has_atomic_u8)]
+ pub(crate) use core::sync::atomic::{AtomicI8, AtomicU8};
+ }
+
+ #[cfg(feature = "std")]
+ pub(crate) use std::sync::{Arc, Condvar, Mutex};
+ }
+}
+
+cfg_if! {
+ if #[cfg(feature = "alloc")] {
+ extern crate alloc;
+ } else if #[cfg(feature = "std")] {
+ extern crate std as alloc;
+ }
+}
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
pub mod atomic;
@@ -51,6 +108,8 @@ use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "std")] {
pub mod sync;
+
+ #[cfg(not(crossbeam_loom))]
pub mod thread;
}
}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index fd400d7..eeb740c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -5,9 +5,11 @@
//! * [`WaitGroup`], for synchronizing the beginning or end of some computation.
mod parker;
+#[cfg(not(crossbeam_loom))]
mod sharded_lock;
mod wait_group;
pub use self::parker::{Parker, Unparker};
+#[cfg(not(crossbeam_loom))]
pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
pub use self::wait_group::WaitGroup;
diff --git a/src/sync/parker.rs b/src/sync/parker.rs
index fc13d2e..aefa515 100644
--- a/src/sync/parker.rs
+++ b/src/sync/parker.rs
@@ -1,20 +1,19 @@
+use crate::primitive::sync::atomic::AtomicUsize;
+use crate::primitive::sync::{Arc, Condvar, Mutex};
+use core::sync::atomic::Ordering::SeqCst;
use std::fmt;
use std::marker::PhantomData;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::{Arc, Condvar, Mutex};
-use std::time::Duration;
+use std::time::{Duration, Instant};
/// A thread parking primitive.
///
/// Conceptually, each `Parker` has an associated token which is initially not present:
///
/// * The [`park`] method blocks the current thread unless or until the token is available, at
-/// which point it automatically consumes the token. It may also return *spuriously*, without
-/// consuming the token.
+/// which point it automatically consumes the token.
///
-/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
-/// time.
+/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
+/// a specified maximum time.
///
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
@@ -43,13 +42,13 @@ use std::time::Duration;
/// u.unpark();
/// });
///
-/// // Wakes up when `u.unpark()` provides the token, but may also wake up
-/// // spuriously before that without consuming the token.
+/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
+/// [`park_deadline`]: Parker::park_deadline
/// [`unpark`]: Unparker::unpark
pub struct Parker {
unparker: Unparker,
@@ -90,9 +89,6 @@ impl Parker {
/// Blocks the current thread until the token is made available.
///
- /// A call to `park` may wake up spuriously without consuming the token, and callers should be
- /// prepared for this possibility.
- ///
/// # Examples
///
/// ```
@@ -113,9 +109,6 @@ impl Parker {
/// Blocks the current thread until the token is made available, but only for a limited time.
///
- /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
- /// should be prepared for this possibility.
- ///
/// # Examples
///
/// ```
@@ -128,7 +121,25 @@ impl Parker {
/// p.park_timeout(Duration::from_millis(500));
/// ```
pub fn park_timeout(&self, timeout: Duration) {
- self.unparker.inner.park(Some(timeout));
+ self.park_deadline(Instant::now() + timeout)
+ }
+
+ /// Blocks the current thread until the token is made available, or until a certain deadline.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_deadline(deadline);
+ /// ```
+ pub fn park_deadline(&self, deadline: Instant) {
+ self.unparker.inner.park(Some(deadline))
}
/// Returns a reference to an associated [`Unparker`].
@@ -227,8 +238,7 @@ impl Unparker {
/// u.unpark();
/// });
///
- /// // Wakes up when `u.unpark()` provides the token, but may also wake up
- /// // spuriously before that without consuming the token.
+ /// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
@@ -302,7 +312,7 @@ struct Inner {
}
impl Inner {
- fn park(&self, timeout: Option<Duration>) {
+ fn park(&self, deadline: Option<Instant>) {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
@@ -313,8 +323,8 @@ impl Inner {
}
// If the timeout is zero, then there is no need to actually block.
- if let Some(ref dur) = timeout {
- if *dur == Duration::from_millis(0) {
+ if let Some(deadline) = deadline {
+ if deadline <= Instant::now() {
return;
}
}
@@ -338,40 +348,42 @@ impl Inner {
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
- match timeout {
- None => {
- loop {
- // Block the current thread on the conditional variable.
- m = self.cvar.wait(m).unwrap();
-
- if self
- .state
- .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
- .is_ok()
- {
- // got a notification
- return;
+ loop {
+ // Block the current thread on the conditional variable.
+ m = match deadline {
+ None => self.cvar.wait(m).unwrap(),
+ Some(deadline) => {
+ let now = Instant::now();
+ if now < deadline {
+ // We could check for a timeout here, in the return value of wait_timeout,
+ // but in the case that a timeout and an unpark arrive simultaneously, we
+ // prefer to report the former.
+ self.cvar.wait_timeout(m, deadline - now).unwrap().0
+ } else {
+ // We've timed out; swap out the state back to empty on our way out
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED | PARKED => return,
+ n => panic!("inconsistent park_timeout state: {}", n),
+ };
}
-
- // spurious wakeup, go back to sleep
}
- }
- Some(timeout) => {
- // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
- // notification we just want to unconditionally set `state` back to `EMPTY`, either
- // consuming a notification or un-flagging ourselves as parked.
- let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
+ };
- match self.state.swap(EMPTY, SeqCst) {
- NOTIFIED => {} // got a notification
- PARKED => {} // no notification
- n => panic!("inconsistent park_timeout state: {}", n),
- }
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
}
+
+ // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
+ // in the branch above, when we discover the deadline is in the past
}
}
- pub fn unpark(&self) {
+ pub(crate) fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
diff --git a/src/sync/wait_group.rs b/src/sync/wait_group.rs
index cd7af12..4206ee4 100644
--- a/src/sync/wait_group.rs
+++ b/src/sync/wait_group.rs
@@ -1,8 +1,8 @@
// Necessary for using `Mutex<usize>` for conditional variables
#![allow(clippy::mutex_atomic)]
+use crate::primitive::sync::{Arc, Condvar, Mutex};
use std::fmt;
-use std::sync::{Arc, Condvar, Mutex};
/// Enables threads to synchronize the beginning or end of some computation.
///
diff --git a/src/thread.rs b/src/thread.rs
index ab91be7..c57d076 100644
--- a/src/thread.rs
+++ b/src/thread.rs
@@ -110,8 +110,6 @@
//! });
//! }).unwrap();
//! ```
-//!
-//! [`std::thread::spawn`]: std::thread::spawn
use std::fmt;
use std::io;
@@ -572,7 +570,6 @@ cfg_if! {
}
}
- #[cfg(windows)]
impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
fn into_raw_handle(self) -> RawHandle {
self.as_raw_handle()
diff --git a/tests/cache_padded.rs b/tests/cache_padded.rs
index be90cbe..c9e7687 100644
--- a/tests/cache_padded.rs
+++ b/tests/cache_padded.rs
@@ -27,7 +27,9 @@ fn distance() {
let arr = [CachePadded::new(17u8), CachePadded::new(37u8)];
let a = &*arr[0] as *const u8;
let b = &*arr[1] as *const u8;
- assert!(unsafe { a.offset(64) } <= b);
+ let align = mem::align_of::<CachePadded<()>>();
+ assert!(align >= 32);
+ assert_eq!(unsafe { a.add(align) }, b);
}
#[test]
diff --git a/tests/parker.rs b/tests/parker.rs
index f657eb1..2bf9c37 100644
--- a/tests/parker.rs
+++ b/tests/parker.rs
@@ -18,7 +18,7 @@ fn park_timeout_unpark_before() {
fn park_timeout_unpark_not_called() {
let p = Parker::new();
for _ in 0..10 {
- p.park_timeout(Duration::from_millis(10));
+ p.park_timeout(Duration::from_millis(10))
}
}
@@ -34,7 +34,7 @@ fn park_timeout_unpark_called_other_thread() {
u.unpark();
});
- p.park_timeout(Duration::from_millis(u32::MAX as u64));
+ p.park_timeout(Duration::from_millis(u32::MAX as u64))
})
.unwrap();
}
diff --git a/tests/sharded_lock.rs b/tests/sharded_lock.rs
index 73bd489..c362154 100644
--- a/tests/sharded_lock.rs
+++ b/tests/sharded_lock.rs
@@ -176,11 +176,8 @@ fn try_write() {
let write_result = lock.try_write();
match write_result {
Err(TryLockError::WouldBlock) => (),
- Ok(_) => assert!(
- false,
- "try_write should not succeed while read_guard is in scope"
- ),
- Err(_) => assert!(false, "unexpected error"),
+ Ok(_) => panic!("try_write should not succeed while read_guard is in scope"),
+ Err(_) => panic!("unexpected error"),
}
drop(read_guard);