aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/atomic/atomic_cell.rs373
-rw-r--r--src/atomic/consume.rs31
-rw-r--r--src/atomic/mod.rs43
-rw-r--r--src/backoff.rs17
-rw-r--r--src/cache_padded.rs30
-rw-r--r--src/lib.rs41
-rw-r--r--src/sync/once_lock.rs27
-rw-r--r--src/sync/parker.rs3
-rw-r--r--src/sync/sharded_lock.rs4
-rw-r--r--src/thread.rs79
10 files changed, 369 insertions, 279 deletions
diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs
index 7941c5c..06ccf2e 100644
--- a/src/atomic/atomic_cell.rs
+++ b/src/atomic/atomic_cell.rs
@@ -1,18 +1,15 @@
// Necessary for implementing atomic methods for `AtomicUnit`
#![allow(clippy::unit_arg)]
-use crate::primitive::sync::atomic::{self, AtomicBool};
+use crate::primitive::sync::atomic::{self, Ordering};
+use crate::CachePadded;
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt;
use core::mem::{self, ManuallyDrop, MaybeUninit};
-use core::sync::atomic::Ordering;
-
+use core::panic::{RefUnwindSafe, UnwindSafe};
use core::ptr;
-#[cfg(feature = "std")]
-use std::panic::{RefUnwindSafe, UnwindSafe};
-
use super::seq_lock::SeqLock;
/// A thread-safe mutable memory location.
@@ -49,9 +46,7 @@ pub struct AtomicCell<T> {
unsafe impl<T: Send> Send for AtomicCell<T> {}
unsafe impl<T: Send> Sync for AtomicCell<T> {}
-#[cfg(feature = "std")]
impl<T> UnwindSafe for AtomicCell<T> {}
-#[cfg(feature = "std")]
impl<T> RefUnwindSafe for AtomicCell<T> {}
impl<T> AtomicCell<T> {
@@ -322,6 +317,36 @@ impl<T> Drop for AtomicCell<T> {
}
}
+macro_rules! atomic {
+ // If values of type `$t` can be transmuted into values of the primitive atomic type `$atomic`,
+ // declares variable `$a` of type `$atomic` and executes `$atomic_op`, breaking out of the loop.
+ (@check, $t:ty, $atomic:ty, $a:ident, $atomic_op:expr) => {
+ if can_transmute::<$t, $atomic>() {
+ let $a: &$atomic;
+ break $atomic_op;
+ }
+ };
+
+ // If values of type `$t` can be transmuted into values of a primitive atomic type, declares
+ // variable `$a` of that type and executes `$atomic_op`. Otherwise, just executes
+ // `$fallback_op`.
+ ($t:ty, $a:ident, $atomic_op:expr, $fallback_op:expr) => {
+ loop {
+ atomic!(@check, $t, AtomicUnit, $a, $atomic_op);
+
+ atomic!(@check, $t, atomic::AtomicU8, $a, $atomic_op);
+ atomic!(@check, $t, atomic::AtomicU16, $a, $atomic_op);
+ atomic!(@check, $t, atomic::AtomicU32, $a, $atomic_op);
+ #[cfg(target_has_atomic = "64")]
+ atomic!(@check, $t, atomic::AtomicU64, $a, $atomic_op);
+ // TODO: AtomicU128 is unstable
+ // atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op);
+
+ break $fallback_op;
+ }
+ };
+}
+
macro_rules! impl_arithmetic {
($t:ty, fallback, $example:tt) => {
impl AtomicCell<$t> {
@@ -500,7 +525,7 @@ macro_rules! impl_arithmetic {
}
}
};
- ($t:ty, $atomic:ty, $example:tt) => {
+ ($t:ty, $atomic:ident, $example:tt) => {
impl AtomicCell<$t> {
/// Increments the current value by `val` and returns the previous value.
///
@@ -518,15 +543,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_add(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_add(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value = value.wrapping_add(val);
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_add(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = value.wrapping_add(val);
+ old
+ }
}
}
@@ -546,15 +575,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_sub(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_sub(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value = value.wrapping_sub(val);
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_sub(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = value.wrapping_sub(val);
+ old
+ }
}
}
@@ -572,15 +605,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_and(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_and(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value &= val;
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_and(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value &= val;
+ old
+ }
}
}
@@ -598,15 +635,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_nand(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_nand(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value = !(old & val);
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_nand(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = !(old & val);
+ old
+ }
}
}
@@ -624,15 +665,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_or(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_or(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value |= val;
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_or(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value |= val;
+ old
+ }
}
}
@@ -650,15 +695,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_xor(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- let a = unsafe { &*(self.as_ptr() as *const $atomic) };
- a.fetch_xor(val, Ordering::AcqRel)
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value ^= val;
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_xor(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value ^= val;
+ old
+ }
}
}
@@ -677,15 +726,19 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_max(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- // TODO: Atomic*::fetch_max requires Rust 1.45.
- self.fetch_update(|old| Some(cmp::max(old, val))).unwrap()
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value = cmp::max(old, val);
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_max(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = cmp::max(old, val);
+ old
+ }
}
}
@@ -704,51 +757,50 @@ macro_rules! impl_arithmetic {
/// ```
#[inline]
pub fn fetch_min(&self, val: $t) -> $t {
- if can_transmute::<$t, $atomic>() {
- // TODO: Atomic*::fetch_min requires Rust 1.45.
- self.fetch_update(|old| Some(cmp::min(old, val))).unwrap()
- } else {
- let _guard = lock(self.as_ptr() as usize).write();
- let value = unsafe { &mut *(self.as_ptr()) };
- let old = *value;
- *value = cmp::min(old, val);
- old
+ atomic! {
+ $t, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::$atomic) };
+ a.fetch_min(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = cmp::min(old, val);
+ old
+ }
}
}
}
};
}
-impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);");
-impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);");
-impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);");
-impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);");
-impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);");
-impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);");
-#[cfg(not(crossbeam_no_atomic_64))]
-impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);");
-#[cfg(not(crossbeam_no_atomic_64))]
-impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);");
-#[cfg(crossbeam_no_atomic_64)]
+impl_arithmetic!(u8, AtomicU8, "let a = AtomicCell::new(7u8);");
+impl_arithmetic!(i8, AtomicI8, "let a = AtomicCell::new(7i8);");
+impl_arithmetic!(u16, AtomicU16, "let a = AtomicCell::new(7u16);");
+impl_arithmetic!(i16, AtomicI16, "let a = AtomicCell::new(7i16);");
+
+impl_arithmetic!(u32, AtomicU32, "let a = AtomicCell::new(7u32);");
+impl_arithmetic!(i32, AtomicI32, "let a = AtomicCell::new(7i32);");
+
+#[cfg(target_has_atomic = "64")]
+impl_arithmetic!(u64, AtomicU64, "let a = AtomicCell::new(7u64);");
+#[cfg(target_has_atomic = "64")]
+impl_arithmetic!(i64, AtomicI64, "let a = AtomicCell::new(7i64);");
+#[cfg(not(target_has_atomic = "64"))]
impl_arithmetic!(u64, fallback, "let a = AtomicCell::new(7u64);");
-#[cfg(crossbeam_no_atomic_64)]
+#[cfg(not(target_has_atomic = "64"))]
impl_arithmetic!(i64, fallback, "let a = AtomicCell::new(7i64);");
+
// TODO: AtomicU128 is unstable
-// impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);");
-// impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);");
+// impl_arithmetic!(u128, AtomicU128, "let a = AtomicCell::new(7u128);");
+// impl_arithmetic!(i128, AtomicI128, "let a = AtomicCell::new(7i128);");
impl_arithmetic!(u128, fallback, "let a = AtomicCell::new(7u128);");
impl_arithmetic!(i128, fallback, "let a = AtomicCell::new(7i128);");
-impl_arithmetic!(
- usize,
- atomic::AtomicUsize,
- "let a = AtomicCell::new(7usize);"
-);
-impl_arithmetic!(
- isize,
- atomic::AtomicIsize,
- "let a = AtomicCell::new(7isize);"
-);
+impl_arithmetic!(usize, AtomicUsize, "let a = AtomicCell::new(7usize);");
+impl_arithmetic!(isize, AtomicIsize, "let a = AtomicCell::new(7isize);");
impl AtomicCell<bool> {
/// Applies logical "and" to the current value and returns the previous value.
@@ -768,8 +820,20 @@ impl AtomicCell<bool> {
/// ```
#[inline]
pub fn fetch_and(&self, val: bool) -> bool {
- let a = unsafe { &*(self.as_ptr() as *const AtomicBool) };
- a.fetch_and(val, Ordering::AcqRel)
+ atomic! {
+ bool, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::AtomicBool) };
+ a.fetch_and(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value &= val;
+ old
+ }
+ }
}
/// Applies logical "nand" to the current value and returns the previous value.
@@ -792,8 +856,20 @@ impl AtomicCell<bool> {
/// ```
#[inline]
pub fn fetch_nand(&self, val: bool) -> bool {
- let a = unsafe { &*(self.as_ptr() as *const AtomicBool) };
- a.fetch_nand(val, Ordering::AcqRel)
+ atomic! {
+ bool, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::AtomicBool) };
+ a.fetch_nand(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value = !(old & val);
+ old
+ }
+ }
}
/// Applies logical "or" to the current value and returns the previous value.
@@ -813,8 +889,20 @@ impl AtomicCell<bool> {
/// ```
#[inline]
pub fn fetch_or(&self, val: bool) -> bool {
- let a = unsafe { &*(self.as_ptr() as *const AtomicBool) };
- a.fetch_or(val, Ordering::AcqRel)
+ atomic! {
+ bool, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::AtomicBool) };
+ a.fetch_or(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value |= val;
+ old
+ }
+ }
}
/// Applies logical "xor" to the current value and returns the previous value.
@@ -834,8 +922,20 @@ impl AtomicCell<bool> {
/// ```
#[inline]
pub fn fetch_xor(&self, val: bool) -> bool {
- let a = unsafe { &*(self.as_ptr() as *const AtomicBool) };
- a.fetch_xor(val, Ordering::AcqRel)
+ atomic! {
+ bool, _a,
+ {
+ let a = unsafe { &*(self.as_ptr() as *const atomic::AtomicBool) };
+ a.fetch_xor(val, Ordering::AcqRel)
+ },
+ {
+ let _guard = lock(self.as_ptr() as usize).write();
+ let value = unsafe { &mut *(self.as_ptr()) };
+ let old = *value;
+ *value ^= val;
+ old
+ }
+ }
}
}
@@ -899,10 +999,10 @@ fn lock(addr: usize) -> &'static SeqLock {
// Now, if we have a slice of type `&[Foo]`, it is possible that field `a` in all items gets
// stored at addresses that are multiples of 3. It'd be too bad if `LEN` was divisible by 3.
// In order to protect from such cases, we simply choose a large prime number for `LEN`.
- const LEN: usize = 97;
+ const LEN: usize = 67;
#[allow(clippy::declare_interior_mutable_const)]
- const L: SeqLock = SeqLock::new();
- static LOCKS: [SeqLock; LEN] = [L; LEN];
+ const L: CachePadded<SeqLock> = CachePadded::new(SeqLock::new());
+ static LOCKS: [CachePadded<SeqLock>; LEN] = [L; LEN];
// If the modulus is a constant number, the compiler will use crazy math to transform this into
// a sequence of cheap arithmetic operations rather than using the slow modulo instruction.
@@ -936,48 +1036,9 @@ impl AtomicUnit {
}
}
-macro_rules! atomic {
- // If values of type `$t` can be transmuted into values of the primitive atomic type `$atomic`,
- // declares variable `$a` of type `$atomic` and executes `$atomic_op`, breaking out of the loop.
- (@check, $t:ty, $atomic:ty, $a:ident, $atomic_op:expr) => {
- if can_transmute::<$t, $atomic>() {
- let $a: &$atomic;
- break $atomic_op;
- }
- };
-
- // If values of type `$t` can be transmuted into values of a primitive atomic type, declares
- // variable `$a` of that type and executes `$atomic_op`. Otherwise, just executes
- // `$fallback_op`.
- ($t:ty, $a:ident, $atomic_op:expr, $fallback_op:expr) => {
- loop {
- atomic!(@check, $t, AtomicUnit, $a, $atomic_op);
-
- atomic!(@check, $t, atomic::AtomicU8, $a, $atomic_op);
- atomic!(@check, $t, atomic::AtomicU16, $a, $atomic_op);
- atomic!(@check, $t, atomic::AtomicU32, $a, $atomic_op);
- #[cfg(not(crossbeam_no_atomic_64))]
- atomic!(@check, $t, atomic::AtomicU64, $a, $atomic_op);
- // TODO: AtomicU128 is unstable
- // atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op);
-
- break $fallback_op;
- }
- };
-}
-
/// Returns `true` if operations on `AtomicCell<T>` are lock-free.
const fn atomic_is_lock_free<T>() -> bool {
- // HACK(taiki-e): This is equivalent to `atomic! { T, _a, true, false }`, but can be used in const fn even in our MSRV (Rust 1.38).
- let is_lock_free = can_transmute::<T, AtomicUnit>()
- | can_transmute::<T, atomic::AtomicU8>()
- | can_transmute::<T, atomic::AtomicU16>()
- | can_transmute::<T, atomic::AtomicU32>();
- #[cfg(not(crossbeam_no_atomic_64))]
- let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU64>();
- // TODO: AtomicU128 is unstable
- // let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU128>();
- is_lock_free
+ atomic! { T, _a, true, false }
}
/// Atomically reads data from `src`.
diff --git a/src/atomic/consume.rs b/src/atomic/consume.rs
index 277b370..ff8e316 100644
--- a/src/atomic/consume.rs
+++ b/src/atomic/consume.rs
@@ -1,5 +1,3 @@
-#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
-use crate::primitive::sync::atomic::compiler_fence;
#[cfg(not(crossbeam_no_atomic))]
use core::sync::atomic::Ordering;
@@ -27,11 +25,21 @@ pub trait AtomicConsume {
}
#[cfg(not(crossbeam_no_atomic))]
-#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
+// Miri and Loom don't support "consume" ordering and ThreadSanitizer doesn't treat
+// load(Relaxed) + compiler_fence(Acquire) as "consume" load.
+// LLVM generates machine code equivalent to fence(Acquire) in compiler_fence(Acquire)
+// on PowerPC, MIPS, etc. (https://godbolt.org/z/hffvjvW7h), so for now the fence
+// can be actually avoided here only on ARM and AArch64. See also
+// https://github.com/rust-lang/rust/issues/62256.
+#[cfg(all(
+ any(target_arch = "arm", target_arch = "aarch64"),
+ not(any(miri, crossbeam_loom, crossbeam_sanitize_thread)),
+))]
macro_rules! impl_consume {
() => {
#[inline]
fn load_consume(&self) -> Self::Val {
+ use crate::primitive::sync::atomic::compiler_fence;
let result = self.load(Ordering::Relaxed);
compiler_fence(Ordering::Acquire);
result
@@ -40,7 +48,10 @@ macro_rules! impl_consume {
}
#[cfg(not(crossbeam_no_atomic))]
-#[cfg(not(any(target_arch = "arm", target_arch = "aarch64")))]
+#[cfg(not(all(
+ any(target_arch = "arm", target_arch = "aarch64"),
+ not(any(miri, crossbeam_loom, crossbeam_sanitize_thread)),
+)))]
macro_rules! impl_consume {
() => {
#[inline]
@@ -72,11 +83,19 @@ impl_atomic!(AtomicU8, u8);
impl_atomic!(AtomicI8, i8);
impl_atomic!(AtomicU16, u16);
impl_atomic!(AtomicI16, i16);
+#[cfg(any(target_has_atomic = "32", not(target_pointer_width = "16")))]
impl_atomic!(AtomicU32, u32);
+#[cfg(any(target_has_atomic = "32", not(target_pointer_width = "16")))]
impl_atomic!(AtomicI32, i32);
-#[cfg(not(crossbeam_no_atomic_64))]
+#[cfg(any(
+ target_has_atomic = "64",
+ not(any(target_pointer_width = "16", target_pointer_width = "32")),
+))]
impl_atomic!(AtomicU64, u64);
-#[cfg(not(crossbeam_no_atomic_64))]
+#[cfg(any(
+ target_has_atomic = "64",
+ not(any(target_pointer_width = "16", target_pointer_width = "32")),
+))]
impl_atomic!(AtomicI64, i64);
#[cfg(not(crossbeam_no_atomic))]
diff --git a/src/atomic/mod.rs b/src/atomic/mod.rs
index 3896785..7b39fe4 100644
--- a/src/atomic/mod.rs
+++ b/src/atomic/mod.rs
@@ -3,35 +3,30 @@
//! * [`AtomicCell`], a thread-safe mutable memory location.
//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering.
-#[cfg(not(crossbeam_no_atomic_cas))]
+#[cfg(target_has_atomic = "ptr")]
#[cfg(not(crossbeam_loom))]
-cfg_if::cfg_if! {
- // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap
- // around.
- //
- // We are ignoring too wide architectures (pointer width >= 256), since such a system will not
- // appear in a conceivable future.
- //
- // In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be
- // vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the
- // counter will not be increased that fast.
- if #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] {
- mod seq_lock;
- } else {
- #[path = "seq_lock_wide.rs"]
- mod seq_lock;
- }
-}
+// Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap
+// around.
+//
+// In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be
+// vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the
+// counter will not be increased that fast.
+// Note that Rust (and C99) pointers must be at least 16-bits: https://github.com/rust-lang/rust/pull/49305
+#[cfg_attr(
+ any(target_pointer_width = "16", target_pointer_width = "32"),
+ path = "seq_lock_wide.rs"
+)]
+mod seq_lock;
-#[cfg(not(crossbeam_no_atomic_cas))]
+#[cfg(target_has_atomic = "ptr")]
// We cannot provide AtomicCell under cfg(crossbeam_loom) because loom's atomic
// types have a different in-memory representation than the underlying type.
// TODO: The latest loom supports fences, so fallback using seqlock may be available.
#[cfg(not(crossbeam_loom))]
mod atomic_cell;
-mod consume;
-
-#[cfg(not(crossbeam_no_atomic_cas))]
+#[cfg(target_has_atomic = "ptr")]
#[cfg(not(crossbeam_loom))]
-pub use self::atomic_cell::AtomicCell;
-pub use self::consume::AtomicConsume;
+pub use atomic_cell::AtomicCell;
+
+mod consume;
+pub use consume::AtomicConsume;
diff --git a/src/backoff.rs b/src/backoff.rs
index 9e256aa..7a505ed 100644
--- a/src/backoff.rs
+++ b/src/backoff.rs
@@ -1,4 +1,4 @@
-use crate::primitive::sync::atomic;
+use crate::primitive::hint;
use core::cell::Cell;
use core::fmt;
@@ -145,10 +145,7 @@ impl Backoff {
#[inline]
pub fn spin(&self) {
for _ in 0..1 << self.step.get().min(SPIN_LIMIT) {
- // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
- // use [`core::hint::spin_loop`] instead.
- #[allow(deprecated)]
- atomic::spin_loop_hint();
+ hint::spin_loop();
}
if self.step.get() <= SPIN_LIMIT {
@@ -209,18 +206,12 @@ impl Backoff {
pub fn snooze(&self) {
if self.step.get() <= SPIN_LIMIT {
for _ in 0..1 << self.step.get() {
- // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
- // use [`core::hint::spin_loop`] instead.
- #[allow(deprecated)]
- atomic::spin_loop_hint();
+ hint::spin_loop();
}
} else {
#[cfg(not(feature = "std"))]
for _ in 0..1 << self.step.get() {
- // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
- // use [`core::hint::spin_loop`] instead.
- #[allow(deprecated)]
- atomic::spin_loop_hint();
+ hint::spin_loop();
}
#[cfg(feature = "std")]
diff --git a/src/cache_padded.rs b/src/cache_padded.rs
index b5d5d33..f44f2d7 100644
--- a/src/cache_padded.rs
+++ b/src/cache_padded.rs
@@ -14,7 +14,8 @@ use core::ops::{Deref, DerefMut};
/// Cache lines are assumed to be N bytes long, depending on the architecture:
///
/// * On x86-64, aarch64, and powerpc64, N = 128.
-/// * On arm, mips, mips64, and riscv64, N = 32.
+/// * On arm, mips, mips64, sparc, and hexagon, N = 32.
+/// * On m68k, N = 16.
/// * On s390x, N = 256.
/// * On all others, N = 64.
///
@@ -75,6 +76,7 @@ use core::ops::{Deref, DerefMut};
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/powerpc/include/asm/cache.h#L26
#[cfg_attr(
any(
target_arch = "x86_64",
@@ -83,33 +85,45 @@ use core::ops::{Deref, DerefMut};
),
repr(align(128))
)]
-// arm, mips, mips64, and riscv64 have 32-byte cache line size.
+// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
-// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
+ target_arch = "mips32r6",
target_arch = "mips64",
- target_arch = "riscv64",
+ target_arch = "mips64r6",
+ target_arch = "sparc",
+ target_arch = "hexagon",
),
repr(align(32))
)]
+// m68k has 16-byte cache line size.
+//
+// Sources:
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
+#[cfg_attr(target_arch = "m68k", repr(align(16)))]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
-// x86 and wasm have 64-byte cache line size.
+// x86, wasm, riscv, and sparc64 have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
@@ -119,8 +133,12 @@ use core::ops::{Deref, DerefMut};
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
+ target_arch = "mips32r6",
target_arch = "mips64",
- target_arch = "riscv64",
+ target_arch = "mips64r6",
+ target_arch = "sparc",
+ target_arch = "hexagon",
+ target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
diff --git a/src/lib.rs b/src/lib.rs
index 191c5a1..7206c1e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -42,12 +42,14 @@
#[cfg(crossbeam_loom)]
#[allow(unused_imports)]
mod primitive {
+ pub(crate) mod hint {
+ pub(crate) use loom::hint::spin_loop;
+ }
pub(crate) mod sync {
pub(crate) mod atomic {
- pub(crate) use loom::sync::atomic::spin_loop_hint;
pub(crate) use loom::sync::atomic::{
AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicU16,
- AtomicU32, AtomicU64, AtomicU8, AtomicUsize,
+ AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering,
};
// FIXME: loom does not support compiler_fence at the moment.
@@ -63,19 +65,24 @@ mod primitive {
#[cfg(not(crossbeam_loom))]
#[allow(unused_imports)]
mod primitive {
+ pub(crate) mod hint {
+ pub(crate) use core::hint::spin_loop;
+ }
pub(crate) mod sync {
pub(crate) mod atomic {
- pub(crate) use core::sync::atomic::compiler_fence;
- // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
- // use [`core::hint::spin_loop`] instead.
- #[allow(deprecated)]
- pub(crate) use core::sync::atomic::spin_loop_hint;
+ pub(crate) use core::sync::atomic::{compiler_fence, Ordering};
#[cfg(not(crossbeam_no_atomic))]
pub(crate) use core::sync::atomic::{
- AtomicBool, AtomicI16, AtomicI32, AtomicI8, AtomicIsize, AtomicU16, AtomicU32,
- AtomicU8, AtomicUsize,
+ AtomicBool, AtomicI16, AtomicI8, AtomicIsize, AtomicU16, AtomicU8, AtomicUsize,
};
- #[cfg(not(crossbeam_no_atomic_64))]
+ #[cfg(not(crossbeam_no_atomic))]
+ #[cfg(any(target_has_atomic = "32", not(target_pointer_width = "16")))]
+ pub(crate) use core::sync::atomic::{AtomicI32, AtomicU32};
+ #[cfg(not(crossbeam_no_atomic))]
+ #[cfg(any(
+ target_has_atomic = "64",
+ not(any(target_pointer_width = "16", target_pointer_width = "32")),
+ ))]
pub(crate) use core::sync::atomic::{AtomicI64, AtomicU64};
}
@@ -92,13 +99,9 @@ pub use crate::cache_padded::CachePadded;
mod backoff;
pub use crate::backoff::Backoff;
-use cfg_if::cfg_if;
+#[cfg(feature = "std")]
+pub mod sync;
-cfg_if! {
- if #[cfg(feature = "std")] {
- pub mod sync;
-
- #[cfg(not(crossbeam_loom))]
- pub mod thread;
- }
-}
+#[cfg(feature = "std")]
+#[cfg(not(crossbeam_loom))]
+pub mod thread;
diff --git a/src/sync/once_lock.rs b/src/sync/once_lock.rs
index c1fefc9..e057aca 100644
--- a/src/sync/once_lock.rs
+++ b/src/sync/once_lock.rs
@@ -4,13 +4,10 @@
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
-use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
pub(crate) struct OnceLock<T> {
once: Once,
- // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized.
- is_initialized: AtomicBool,
value: UnsafeCell<MaybeUninit<T>>,
// Unlike std::sync::OnceLock, we don't need PhantomData here because
// we don't use #[may_dangle].
@@ -25,7 +22,6 @@ impl<T> OnceLock<T> {
pub(crate) const fn new() -> Self {
Self {
once: Once::new(),
- is_initialized: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
@@ -50,37 +46,26 @@ impl<T> OnceLock<T> {
F: FnOnce() -> T,
{
// Fast path check
- if self.is_initialized() {
+ if self.once.is_completed() {
// SAFETY: The inner value has been initialized
return unsafe { self.get_unchecked() };
}
self.initialize(f);
- debug_assert!(self.is_initialized());
-
// SAFETY: The inner value has been initialized
unsafe { self.get_unchecked() }
}
- #[inline]
- fn is_initialized(&self) -> bool {
- self.is_initialized.load(Ordering::Acquire)
- }
-
#[cold]
fn initialize<F>(&self, f: F)
where
F: FnOnce() -> T,
{
- let slot = self.value.get().cast::<T>();
- let is_initialized = &self.is_initialized;
+ let slot = self.value.get();
self.once.call_once(|| {
let value = f();
- unsafe {
- slot.write(value);
- }
- is_initialized.store(true, Ordering::Release);
+ unsafe { slot.write(MaybeUninit::new(value)) }
});
}
@@ -88,16 +73,16 @@ impl<T> OnceLock<T> {
///
/// The value must be initialized
unsafe fn get_unchecked(&self) -> &T {
- debug_assert!(self.is_initialized());
+ debug_assert!(self.once.is_completed());
&*self.value.get().cast::<T>()
}
}
impl<T> Drop for OnceLock<T> {
fn drop(&mut self) {
- if self.is_initialized() {
+ if self.once.is_completed() {
// SAFETY: The inner value has been initialized
- unsafe { self.value.get().cast::<T>().drop_in_place() };
+ unsafe { (*self.value.get()).assume_init_drop() };
}
}
}
diff --git a/src/sync/parker.rs b/src/sync/parker.rs
index 9cb3a26..971981d 100644
--- a/src/sync/parker.rs
+++ b/src/sync/parker.rs
@@ -1,6 +1,5 @@
-use crate::primitive::sync::atomic::AtomicUsize;
+use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use crate::primitive::sync::{Arc, Condvar, Mutex};
-use core::sync::atomic::Ordering::SeqCst;
use std::fmt;
use std::marker::PhantomData;
use std::time::{Duration, Instant};
diff --git a/src/sync/sharded_lock.rs b/src/sync/sharded_lock.rs
index a8f4584..5aee56f 100644
--- a/src/sync/sharded_lock.rs
+++ b/src/sync/sharded_lock.rs
@@ -356,7 +356,7 @@ impl<T: ?Sized> ShardedLock<T> {
for shard in self.shards[0..i].iter().rev() {
unsafe {
let dest: *mut _ = shard.write_guard.get();
- let guard = mem::replace(&mut *dest, None);
+ let guard = (*dest).take();
drop(guard);
}
}
@@ -526,7 +526,7 @@ impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
for shard in self.lock.shards.iter().rev() {
unsafe {
let dest: *mut _ = shard.write_guard.get();
- let guard = mem::replace(&mut *dest, None);
+ let guard = (*dest).take();
drop(guard);
}
}
diff --git a/src/thread.rs b/src/thread.rs
index 7446454..b2e063a 100644
--- a/src/thread.rs
+++ b/src/thread.rs
@@ -84,7 +84,7 @@
//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
//! cannot be borrowed by scoped threads:
//!
-//! ```compile_fail,E0373,E0521
+//! ```compile_fail,E0521
//! use crossbeam_utils::thread;
//!
//! thread::scope(|s| {
@@ -120,7 +120,6 @@ use std::sync::{Arc, Mutex};
use std::thread;
use crate::sync::WaitGroup;
-use cfg_if::cfg_if;
type SharedVec<T> = Arc<Mutex<Vec<T>>>;
type SharedOption<T> = Arc<Mutex<Option<T>>>;
@@ -152,6 +151,15 @@ pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
where
F: FnOnce(&Scope<'env>) -> R,
{
+ struct AbortOnPanic;
+ impl Drop for AbortOnPanic {
+ fn drop(&mut self) {
+ if thread::panicking() {
+ std::process::abort();
+ }
+ }
+ }
+
let wg = WaitGroup::new();
let scope = Scope::<'env> {
handles: SharedVec::default(),
@@ -162,6 +170,10 @@ where
// Execute the scoped function, but catch any panics.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
+ // If an unwinding panic occurs before all threads are joined
+ // promote it to an aborting panic to prevent any threads from escaping the scope.
+ let guard = AbortOnPanic;
+
// Wait until all nested scopes are dropped.
drop(scope.wait_group);
wg.wait();
@@ -177,6 +189,8 @@ where
.filter_map(|handle| handle.join().err())
.collect();
+ mem::forget(guard);
+
// If `f` has panicked, resume unwinding.
// If any of the child threads have panicked, return the panic errors.
// Otherwise, everything is OK and return the result of `f`.
@@ -547,37 +561,42 @@ impl<T> ScopedJoinHandle<'_, T> {
}
}
-cfg_if! {
- if #[cfg(unix)] {
- use std::os::unix::thread::{JoinHandleExt, RawPthread};
-
- impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
- fn as_pthread_t(&self) -> RawPthread {
- // Borrow the handle. The handle will surely be available because the root scope waits
- // for nested scopes before joining remaining threads.
- let handle = self.handle.lock().unwrap();
- handle.as_ref().unwrap().as_pthread_t()
- }
- fn into_pthread_t(self) -> RawPthread {
- self.as_pthread_t()
- }
+/// Unix-specific extensions.
+#[cfg(unix)]
+mod unix {
+ use super::ScopedJoinHandle;
+ use std::os::unix::thread::{JoinHandleExt, RawPthread};
+
+ impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
+ fn as_pthread_t(&self) -> RawPthread {
+ // Borrow the handle. The handle will surely be available because the root scope waits
+ // for nested scopes before joining remaining threads.
+ let handle = self.handle.lock().unwrap();
+ handle.as_ref().unwrap().as_pthread_t()
}
- } else if #[cfg(windows)] {
- use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
-
- impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
- fn as_raw_handle(&self) -> RawHandle {
- // Borrow the handle. The handle will surely be available because the root scope waits
- // for nested scopes before joining remaining threads.
- let handle = self.handle.lock().unwrap();
- handle.as_ref().unwrap().as_raw_handle()
- }
+ fn into_pthread_t(self) -> RawPthread {
+ self.as_pthread_t()
}
+ }
+}
+/// Windows-specific extensions.
+#[cfg(windows)]
+mod windows {
+ use super::ScopedJoinHandle;
+ use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
+
+ impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
+ fn as_raw_handle(&self) -> RawHandle {
+ // Borrow the handle. The handle will surely be available because the root scope waits
+ // for nested scopes before joining remaining threads.
+ let handle = self.handle.lock().unwrap();
+ handle.as_ref().unwrap().as_raw_handle()
+ }
+ }
- impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
- fn into_raw_handle(self) -> RawHandle {
- self.as_raw_handle()
- }
+ impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
+ fn into_raw_handle(self) -> RawHandle {
+ self.as_raw_handle()
}
}
}