diff options
author | Joel Galenson <jgalenson@google.com> | 2021-04-01 15:39:07 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-04-01 15:39:07 -0700 |
commit | 2e9962829e53f12a4abaf17fe77715bd9858ae7c (patch) | |
tree | b6220d6f0ac2cbe211e4475c51b066fcebe7c5cb | |
parent | 3122e0b475fe3ff279abad8aff320e6e0c705b00 (diff) | |
download | crossbeam-utils-2e9962829e53f12a4abaf17fe77715bd9858ae7c.tar.gz |
Upgrade rust/crates/crossbeam-utils to 0.8.3
Test: make
Change-Id: Iaf06fddaa1968c93ac55bec419208e679e5c449f
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 13 | ||||
-rw-r--r-- | Cargo.toml | 8 | ||||
-rw-r--r-- | Cargo.toml.orig | 12 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | TEST_MAPPING | 8 | ||||
-rw-r--r-- | benches/atomic_cell.rs | 8 | ||||
-rw-r--r-- | src/atomic/atomic_cell.rs | 27 | ||||
-rw-r--r-- | src/atomic/consume.rs | 14 | ||||
-rw-r--r-- | src/atomic/mod.rs | 5 | ||||
-rw-r--r-- | src/atomic/seq_lock.rs | 14 | ||||
-rw-r--r-- | src/atomic/seq_lock_wide.rs | 14 | ||||
-rw-r--r-- | src/backoff.rs | 15 | ||||
-rw-r--r-- | src/cache_padded.rs | 60 | ||||
-rw-r--r-- | src/lib.rs | 65 | ||||
-rw-r--r-- | src/sync/mod.rs | 2 | ||||
-rw-r--r-- | src/sync/parker.rs | 110 | ||||
-rw-r--r-- | src/sync/wait_group.rs | 2 | ||||
-rw-r--r-- | src/thread.rs | 3 | ||||
-rw-r--r-- | tests/cache_padded.rs | 4 | ||||
-rw-r--r-- | tests/parker.rs | 4 | ||||
-rw-r--r-- | tests/sharded_lock.rs | 7 |
23 files changed, 298 insertions, 111 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 3dcef5c..1d9c34d 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "99c3230b263202aca56497b1f8e418a7b3647a23" + "sha1": "d841a2028dc72b4e09739116f07e865db60f3690" } } diff --git a/CHANGELOG.md b/CHANGELOG.md index a17c6e6..c4a92bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +# Version 0.8.3 + +- Make `loom` dependency optional. (#666) + +# Version 0.8.2 + +- Deprecate `AtomicCell::compare_and_swap`. Use `AtomicCell::compare_exchange` instead. (#619) +- Add `Parker::park_deadline`. (#563) +- Improve implementation of `CachePadded`. (#636) +- Add unstable support for `loom`. (#487) + # Version 0.8.1 - Make `AtomicCell::is_lock_free` always const fn. (#600) @@ -8,7 +19,7 @@ # Version 0.8.0 - Bump the minimum supported Rust version to 1.36. -- Remove deprecated `AtomicCell::get_mut()` and `Backoff::is_complete()` methods +- Remove deprecated `AtomicCell::get_mut()` and `Backoff::is_complete()` methods. - Remove `alloc` feature. - Make `CachePadded::new()` const function. - Make `AtomicCell::is_lock_free()` const function at 1.46+. @@ -13,12 +13,11 @@ [package] edition = "2018" name = "crossbeam-utils" -version = "0.8.1" +version = "0.8.3" authors = ["The Crossbeam Project Developers"] description = "Utilities for concurrent programming" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils" documentation = "https://docs.rs/crossbeam-utils" -readme = "README.md" keywords = ["scoped", "thread", "atomic", "cache"] categories = ["algorithms", "concurrency", "data-structures", "no-std"] license = "MIT OR Apache-2.0" @@ -30,7 +29,7 @@ version = "1" version = "1.4.0" optional = true [dev-dependencies.rand] -version = "0.7.3" +version = "0.8" [build-dependencies.autocfg] version = "1.0.0" @@ -38,3 +37,6 @@ version = "1.0.0" default = ["std"] nightly = [] std = ["lazy_static"] +[target."cfg(crossbeam_loom)".dependencies.loom] +version = "0.4" +optional = true diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 6011c75..30697d2 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,11 +4,10 @@ name = "crossbeam-utils" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-utils-X.Y.Z" git tag -version = "0.8.1" +version = "0.8.3" authors = ["The Crossbeam Project Developers"] edition = "2018" license = "MIT OR Apache-2.0" -readme = "README.md" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils" documentation = "https://docs.rs/crossbeam-utils" @@ -33,8 +32,15 @@ nightly = [] cfg-if = "1" lazy_static = { version = "1.4.0", optional = true } +# Enable the use of loom for concurrency testing. +# +# This configuration option is outside of the normal semver guarantees: minor +# versions of crossbeam may make breaking changes to it at any time. +[target.'cfg(crossbeam_loom)'.dependencies] +loom = { version = "0.4", optional = true } + [build-dependencies] autocfg = "1.0.0" [dev-dependencies] -rand = "0.7.3" +rand = "0.8" @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-utils/crossbeam-utils-0.8.1.crate" + value: "https://static.crates.io/crates/crossbeam-utils/crossbeam-utils-0.8.3.crate" } - version: "0.8.1" + version: "0.8.3" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 21 + year: 2021 + month: 4 + day: 1 } } @@ -2,7 +2,7 @@ [![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( https://github.com/crossbeam-rs/crossbeam/actions) -[![License](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)]( +[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils#license) [![Cargo](https://img.shields.io/crates/v/crossbeam-utils.svg)]( https://crates.io/crates/crossbeam-utils) diff --git a/TEST_MAPPING b/TEST_MAPPING new file mode 100644 index 0000000..25a9cdc --- /dev/null +++ b/TEST_MAPPING @@ -0,0 +1,8 @@ +// Generated by update_crate_tests.py for tests that depend on this crate. +{ + "presubmit": [ + { + "name": "crossbeam-epoch_device_test_src_lib" + } + ] +} diff --git a/benches/atomic_cell.rs b/benches/atomic_cell.rs index 938a8fe..844f7c0 100644 --- a/benches/atomic_cell.rs +++ b/benches/atomic_cell.rs @@ -28,11 +28,11 @@ fn fetch_add_u8(b: &mut test::Bencher) { } #[bench] -fn compare_and_swap_u8(b: &mut test::Bencher) { +fn compare_exchange_u8(b: &mut test::Bencher) { let a = AtomicCell::new(0u8); let mut i = 0; b.iter(|| { - a.compare_and_swap(i, i.wrapping_add(1)); + let _ = a.compare_exchange(i, i.wrapping_add(1)); i = i.wrapping_add(1); }); } @@ -102,11 +102,11 @@ fn fetch_add_usize(b: &mut test::Bencher) { } #[bench] -fn compare_and_swap_usize(b: &mut test::Bencher) { +fn compare_exchange_usize(b: &mut test::Bencher) { let a = AtomicCell::new(0usize); let mut i = 0; b.iter(|| { - a.compare_and_swap(i, i.wrapping_add(1)); + let _ = a.compare_exchange(i, i.wrapping_add(1)); i = i.wrapping_add(1); }); } diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs index e8f6804..ad094b2 100644 --- a/src/atomic/atomic_cell.rs +++ b/src/atomic/atomic_cell.rs @@ -2,15 +2,19 @@ #![allow(clippy::unit_arg)] #![allow(clippy::let_unit_value)] +use crate::primitive::sync::atomic::{self, AtomicBool}; use core::cell::UnsafeCell; use core::fmt; use core::mem; +use core::sync::atomic::Ordering; + +#[cfg(not(crossbeam_loom))] use core::ptr; -use core::sync::atomic::{self, AtomicBool, Ordering}; #[cfg(feature = "std")] use std::panic::{RefUnwindSafe, UnwindSafe}; +#[cfg(not(crossbeam_loom))] use super::seq_lock::SeqLock; /// A thread-safe mutable memory location. @@ -213,6 +217,7 @@ impl<T: Copy + Eq> AtomicCell<T> { /// # Examples /// /// ``` + /// # #![allow(deprecated)] /// use crossbeam_utils::atomic::AtomicCell; /// /// let a = AtomicCell::new(1); @@ -223,6 +228,8 @@ impl<T: Copy + Eq> AtomicCell<T> { /// assert_eq!(a.compare_and_swap(1, 2), 1); /// assert_eq!(a.load(), 2); /// ``` + // TODO: remove in the next major version. + #[deprecated(note = "Use `compare_exchange` instead")] pub fn compare_and_swap(&self, current: T, new: T) -> T { match self.compare_exchange(current, new) { Ok(v) => v, @@ -492,23 +499,23 @@ macro_rules! impl_arithmetic { #[cfg(has_atomic_u8)] impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);"); -#[cfg(has_atomic_u8)] +#[cfg(all(has_atomic_u8, not(crossbeam_loom)))] impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);"); #[cfg(has_atomic_u16)] impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);"); -#[cfg(has_atomic_u16)] +#[cfg(all(has_atomic_u16, not(crossbeam_loom)))] impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);"); #[cfg(has_atomic_u32)] impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);"); -#[cfg(has_atomic_u32)] +#[cfg(all(has_atomic_u32, not(crossbeam_loom)))] impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);"); #[cfg(has_atomic_u64)] impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);"); -#[cfg(has_atomic_u64)] +#[cfg(all(has_atomic_u64, not(crossbeam_loom)))] impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);"); -#[cfg(has_atomic_u128)] +#[cfg(all(has_atomic_u128, not(crossbeam_loom)))] impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);"); -#[cfg(has_atomic_u128)] +#[cfg(all(has_atomic_u128, not(crossbeam_loom)))] impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);"); impl_arithmetic!( @@ -516,6 +523,7 @@ impl_arithmetic!( atomic::AtomicUsize, "let a = AtomicCell::new(7usize);" ); +#[cfg(not(crossbeam_loom))] impl_arithmetic!( isize, atomic::AtomicIsize, @@ -624,6 +632,7 @@ const fn can_transmute<A, B>() -> bool { /// scalability. #[inline] #[must_use] +#[cfg(not(crossbeam_loom))] fn lock(addr: usize) -> &'static SeqLock { // The number of locks is a prime number because we want to make sure `addr % LEN` gets // dispersed across all locks. @@ -769,6 +778,7 @@ impl AtomicUnit { #[inline] fn swap(&self, _val: (), _order: Ordering) {} + #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] fn compare_exchange_weak( &self, @@ -810,6 +820,9 @@ macro_rules! atomic { #[cfg(has_atomic_u128)] atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op); + #[cfg(crossbeam_loom)] + unimplemented!("loom does not support non-atomic atomic ops"); + #[cfg(not(crossbeam_loom))] break $fallback_op; } }; diff --git a/src/atomic/consume.rs b/src/atomic/consume.rs index 584fc34..0fbd93e 100644 --- a/src/atomic/consume.rs +++ b/src/atomic/consume.rs @@ -1,5 +1,5 @@ #[cfg(any(target_arch = "arm", target_arch = "aarch64"))] -use core::sync::atomic::compiler_fence; +use crate::primitive::sync::atomic::compiler_fence; use core::sync::atomic::Ordering; /// Trait which allows reading from primitive atomic types with "consume" ordering. @@ -53,11 +53,17 @@ macro_rules! impl_atomic { type Val = $val; impl_consume!(); } + #[cfg(crossbeam_loom)] + impl AtomicConsume for ::loom::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } }; } impl_atomic!(AtomicBool, bool); impl_atomic!(AtomicUsize, usize); +#[cfg(not(crossbeam_loom))] impl_atomic!(AtomicIsize, isize); #[cfg(has_atomic_u8)] impl_atomic!(AtomicU8, u8); @@ -80,3 +86,9 @@ impl<T> AtomicConsume for ::core::sync::atomic::AtomicPtr<T> { type Val = *mut T; impl_consume!(); } + +#[cfg(crossbeam_loom)] +impl<T> AtomicConsume for ::loom::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} diff --git a/src/atomic/mod.rs b/src/atomic/mod.rs index 7309c16..874eaf2 100644 --- a/src/atomic/mod.rs +++ b/src/atomic/mod.rs @@ -1,7 +1,12 @@ //! Atomic types. +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. +#[cfg(not(crossbeam_loom))] use cfg_if::cfg_if; +#[cfg(not(crossbeam_loom))] cfg_if! { // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap // around. diff --git a/src/atomic/seq_lock.rs b/src/atomic/seq_lock.rs index a423bc0..ff8defd 100644 --- a/src/atomic/seq_lock.rs +++ b/src/atomic/seq_lock.rs @@ -4,7 +4,7 @@ use core::sync::atomic::{self, AtomicUsize, Ordering}; use crate::Backoff; /// A simple stamped lock. -pub struct SeqLock { +pub(crate) struct SeqLock { /// The current state of the lock. /// /// All bits except the least significant one hold the current stamp. When locked, the state @@ -13,7 +13,7 @@ pub struct SeqLock { } impl SeqLock { - pub const fn new() -> Self { + pub(crate) const fn new() -> Self { Self { state: AtomicUsize::new(0), } @@ -23,7 +23,7 @@ impl SeqLock { /// /// This method should be called before optimistic reads. #[inline] - pub fn optimistic_read(&self) -> Option<usize> { + pub(crate) fn optimistic_read(&self) -> Option<usize> { let state = self.state.load(Ordering::Acquire); if state == 1 { None @@ -37,14 +37,14 @@ impl SeqLock { /// This method should be called after optimistic reads to check whether they are valid. The /// argument `stamp` should correspond to the one returned by method `optimistic_read`. #[inline] - pub fn validate_read(&self, stamp: usize) -> bool { + pub(crate) fn validate_read(&self, stamp: usize) -> bool { atomic::fence(Ordering::Acquire); self.state.load(Ordering::Relaxed) == stamp } /// Grabs the lock for writing. #[inline] - pub fn write(&'static self) -> SeqLockWriteGuard { + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { let backoff = Backoff::new(); loop { let previous = self.state.swap(1, Ordering::Acquire); @@ -64,7 +64,7 @@ impl SeqLock { } /// An RAII guard that releases the lock and increments the stamp when dropped. -pub struct SeqLockWriteGuard { +pub(crate) struct SeqLockWriteGuard { /// The parent lock. lock: &'static SeqLock, @@ -75,7 +75,7 @@ pub struct SeqLockWriteGuard { impl SeqLockWriteGuard { /// Releases the lock without incrementing the stamp. #[inline] - pub fn abort(self) { + pub(crate) fn abort(self) { self.lock.state.store(self.state, Ordering::Release); // We specifically don't want to call drop(), since that's diff --git a/src/atomic/seq_lock_wide.rs b/src/atomic/seq_lock_wide.rs index 871a93d..ef5d94a 100644 --- a/src/atomic/seq_lock_wide.rs +++ b/src/atomic/seq_lock_wide.rs @@ -7,7 +7,7 @@ use crate::Backoff; /// /// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low /// bits. -pub struct SeqLock { +pub(crate) struct SeqLock { /// The high bits of the current state of the lock. state_hi: AtomicUsize, @@ -19,7 +19,7 @@ pub struct SeqLock { } impl SeqLock { - pub const fn new() -> Self { + pub(crate) const fn new() -> Self { Self { state_hi: AtomicUsize::new(0), state_lo: AtomicUsize::new(0), @@ -30,7 +30,7 @@ impl SeqLock { /// /// This method should be called before optimistic reads. #[inline] - pub fn optimistic_read(&self) -> Option<(usize, usize)> { + pub(crate) fn optimistic_read(&self) -> Option<(usize, usize)> { // The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in // `SeqLockWriteGuard::drop`. // @@ -51,7 +51,7 @@ impl SeqLock { /// This method should be called after optimistic reads to check whether they are valid. The /// argument `stamp` should correspond to the one returned by method `optimistic_read`. #[inline] - pub fn validate_read(&self, stamp: (usize, usize)) -> bool { + pub(crate) fn validate_read(&self, stamp: (usize, usize)) -> bool { // Thanks to the fence, if we're noticing any modification to the data at the critical // section of `(a, b)`, then the critical section's write of 1 to state_lo should be // visible. @@ -76,7 +76,7 @@ impl SeqLock { /// Grabs the lock for writing. #[inline] - pub fn write(&'static self) -> SeqLockWriteGuard { + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { let backoff = Backoff::new(); loop { let previous = self.state_lo.swap(1, Ordering::Acquire); @@ -98,7 +98,7 @@ impl SeqLock { } /// An RAII guard that releases the lock and increments the stamp when dropped. -pub struct SeqLockWriteGuard { +pub(crate) struct SeqLockWriteGuard { /// The parent lock. lock: &'static SeqLock, @@ -109,7 +109,7 @@ pub struct SeqLockWriteGuard { impl SeqLockWriteGuard { /// Releases the lock without incrementing the stamp. #[inline] - pub fn abort(self) { + pub(crate) fn abort(self) { self.lock.state_lo.store(self.state_lo, Ordering::Release); mem::forget(self); } diff --git a/src/backoff.rs b/src/backoff.rs index 2391dd1..1012f06 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -1,6 +1,6 @@ +use crate::primitive::sync::atomic; use core::cell::Cell; use core::fmt; -use core::sync::atomic; const SPIN_LIMIT: u32 = 6; const YIELD_LIMIT: u32 = 10; @@ -27,7 +27,7 @@ const YIELD_LIMIT: u32 = 10; /// let backoff = Backoff::new(); /// loop { /// let val = a.load(SeqCst); -/// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val { +/// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { /// return val; /// } /// backoff.spin(); @@ -131,7 +131,7 @@ impl Backoff { /// let backoff = Backoff::new(); /// loop { /// let val = a.load(SeqCst); - /// if a.compare_and_swap(val, val.wrapping_mul(b), SeqCst) == val { + /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { /// return val; /// } /// backoff.spin(); @@ -145,6 +145,9 @@ impl Backoff { #[inline] pub fn spin(&self) { for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] atomic::spin_loop_hint(); } @@ -205,11 +208,17 @@ impl Backoff { pub fn snooze(&self) { if self.step.get() <= SPIN_LIMIT { for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] atomic::spin_loop_hint(); } } else { #[cfg(not(feature = "std"))] for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] atomic::spin_loop_hint(); } diff --git a/src/cache_padded.rs b/src/cache_padded.rs index 62c686b..822e831 100644 --- a/src/cache_padded.rs +++ b/src/cache_padded.rs @@ -13,7 +13,9 @@ use core::ops::{Deref, DerefMut}; /// /// Cache lines are assumed to be N bytes long, depending on the architecture: /// -/// * On x86-64 and aarch64, N = 128. +/// * On x86-64, aarch64, and powerpc64, N = 128. +/// * On arm, mips, mips64, and riscv64, N = 32. +/// * On s390x, N = 256. /// * On all others, N = 64. /// /// Note that N is just a reasonable guess and is not guaranteed to match the actual cache line @@ -64,13 +66,63 @@ use core::ops::{Deref, DerefMut}; // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 // -// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128 byte cache line size +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// // Sources: // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ // -#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))] +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. #[cfg_attr( - not(any(target_arch = "x86_64", target_arch = "aarch64")), + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), repr(align(64)) )] pub struct CachePadded<T> { @@ -31,11 +31,68 @@ allow(dead_code, unused_assignments, unused_variables) ) ))] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] #![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] -// matches! requires Rust 1.42 -#![allow(clippy::match_like_matches_macro)] + +#[cfg(crossbeam_loom)] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::spin_loop_hint; + pub(crate) use loom::sync::atomic::{ + AtomicBool, AtomicU16, AtomicU32, AtomicU64, AtomicU8, AtomicUsize, + }; + + // FIXME: loom does not support compiler_fence at the moment. + // https://github.com/tokio-rs/loom/issues/117 + // we use fence as a stand-in for compiler_fence for the time being. + // this may miss some races since fence is stronger than compiler_fence, + // but it's the best we can do for the time being. + pub(crate) use loom::sync::atomic::fence as compiler_fence; + } + pub(crate) use loom::sync::{Arc, Condvar, Mutex}; + } +} +#[cfg(not(crossbeam_loom))] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + pub(crate) use core::sync::atomic::spin_loop_hint; + pub(crate) use core::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize}; + #[cfg(has_atomic_u16)] + pub(crate) use core::sync::atomic::{AtomicI16, AtomicU16}; + #[cfg(has_atomic_u32)] + pub(crate) use core::sync::atomic::{AtomicI32, AtomicU32}; + #[cfg(has_atomic_u64)] + pub(crate) use core::sync::atomic::{AtomicI64, AtomicU64}; + #[cfg(has_atomic_u8)] + pub(crate) use core::sync::atomic::{AtomicI8, AtomicU8}; + } + + #[cfg(feature = "std")] + pub(crate) use std::sync::{Arc, Condvar, Mutex}; + } +} + +cfg_if! { + if #[cfg(feature = "alloc")] { + extern crate alloc; + } else if #[cfg(feature = "std")] { + extern crate std as alloc; + } +} #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] pub mod atomic; @@ -51,6 +108,8 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(feature = "std")] { pub mod sync; + + #[cfg(not(crossbeam_loom))] pub mod thread; } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fd400d7..eeb740c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -5,9 +5,11 @@ //! * [`WaitGroup`], for synchronizing the beginning or end of some computation. mod parker; +#[cfg(not(crossbeam_loom))] mod sharded_lock; mod wait_group; pub use self::parker::{Parker, Unparker}; +#[cfg(not(crossbeam_loom))] pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; pub use self::wait_group::WaitGroup; diff --git a/src/sync/parker.rs b/src/sync/parker.rs index fc13d2e..aefa515 100644 --- a/src/sync/parker.rs +++ b/src/sync/parker.rs @@ -1,20 +1,19 @@ +use crate::primitive::sync::atomic::AtomicUsize; +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use core::sync::atomic::Ordering::SeqCst; use std::fmt; use std::marker::PhantomData; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::{Arc, Condvar, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; /// A thread parking primitive. /// /// Conceptually, each `Parker` has an associated token which is initially not present: /// /// * The [`park`] method blocks the current thread unless or until the token is available, at -/// which point it automatically consumes the token. It may also return *spuriously*, without -/// consuming the token. +/// which point it automatically consumes the token. /// -/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum -/// time. +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. /// /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call @@ -43,13 +42,13 @@ use std::time::Duration; /// u.unpark(); /// }); /// -/// // Wakes up when `u.unpark()` provides the token, but may also wake up -/// // spuriously before that without consuming the token. +/// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline /// [`unpark`]: Unparker::unpark pub struct Parker { unparker: Unparker, @@ -90,9 +89,6 @@ impl Parker { /// Blocks the current thread until the token is made available. /// - /// A call to `park` may wake up spuriously without consuming the token, and callers should be - /// prepared for this possibility. - /// /// # Examples /// /// ``` @@ -113,9 +109,6 @@ impl Parker { /// Blocks the current thread until the token is made available, but only for a limited time. /// - /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers - /// should be prepared for this possibility. - /// /// # Examples /// /// ``` @@ -128,7 +121,25 @@ impl Parker { /// p.park_timeout(Duration::from_millis(500)); /// ``` pub fn park_timeout(&self, timeout: Duration) { - self.unparker.inner.park(Some(timeout)); + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) } /// Returns a reference to an associated [`Unparker`]. @@ -227,8 +238,7 @@ impl Unparker { /// u.unpark(); /// }); /// - /// // Wakes up when `u.unpark()` provides the token, but may also wake up - /// // spuriously before that without consuming the token. + /// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// @@ -302,7 +312,7 @@ struct Inner { } impl Inner { - fn park(&self, timeout: Option<Duration>) { + fn park(&self, deadline: Option<Instant>) { // If we were previously notified then we consume this notification and return quickly. if self .state @@ -313,8 +323,8 @@ impl Inner { } // If the timeout is zero, then there is no need to actually block. - if let Some(ref dur) = timeout { - if *dur == Duration::from_millis(0) { + if let Some(deadline) = deadline { + if deadline <= Instant::now() { return; } } @@ -338,40 +348,42 @@ impl Inner { Err(n) => panic!("inconsistent park_timeout state: {}", n), } - match timeout { - None => { - loop { - // Block the current thread on the conditional variable. - m = self.cvar.wait(m).unwrap(); - - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - // got a notification - return; + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; } - - // spurious wakeup, go back to sleep } - } - Some(timeout) => { - // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a - // notification we just want to unconditionally set `state` back to `EMPTY`, either - // consuming a notification or un-flagging ourselves as parked. - let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); + }; - match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification - PARKED => {} // no notification - n => panic!("inconsistent park_timeout state: {}", n), - } + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past } } - pub fn unpark(&self) { + pub(crate) fn unpark(&self) { // To ensure the unparked thread will observe any writes we made before this call, we must // perform a release operation that `park` can synchronize with. To do that we must write // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather diff --git a/src/sync/wait_group.rs b/src/sync/wait_group.rs index cd7af12..4206ee4 100644 --- a/src/sync/wait_group.rs +++ b/src/sync/wait_group.rs @@ -1,8 +1,8 @@ // Necessary for using `Mutex<usize>` for conditional variables #![allow(clippy::mutex_atomic)] +use crate::primitive::sync::{Arc, Condvar, Mutex}; use std::fmt; -use std::sync::{Arc, Condvar, Mutex}; /// Enables threads to synchronize the beginning or end of some computation. /// diff --git a/src/thread.rs b/src/thread.rs index ab91be7..c57d076 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -110,8 +110,6 @@ //! }); //! }).unwrap(); //! ``` -//! -//! [`std::thread::spawn`]: std::thread::spawn use std::fmt; use std::io; @@ -572,7 +570,6 @@ cfg_if! { } } - #[cfg(windows)] impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> { fn into_raw_handle(self) -> RawHandle { self.as_raw_handle() diff --git a/tests/cache_padded.rs b/tests/cache_padded.rs index be90cbe..c9e7687 100644 --- a/tests/cache_padded.rs +++ b/tests/cache_padded.rs @@ -27,7 +27,9 @@ fn distance() { let arr = [CachePadded::new(17u8), CachePadded::new(37u8)]; let a = &*arr[0] as *const u8; let b = &*arr[1] as *const u8; - assert!(unsafe { a.offset(64) } <= b); + let align = mem::align_of::<CachePadded<()>>(); + assert!(align >= 32); + assert_eq!(unsafe { a.add(align) }, b); } #[test] diff --git a/tests/parker.rs b/tests/parker.rs index f657eb1..2bf9c37 100644 --- a/tests/parker.rs +++ b/tests/parker.rs @@ -18,7 +18,7 @@ fn park_timeout_unpark_before() { fn park_timeout_unpark_not_called() { let p = Parker::new(); for _ in 0..10 { - p.park_timeout(Duration::from_millis(10)); + p.park_timeout(Duration::from_millis(10)) } } @@ -34,7 +34,7 @@ fn park_timeout_unpark_called_other_thread() { u.unpark(); }); - p.park_timeout(Duration::from_millis(u32::MAX as u64)); + p.park_timeout(Duration::from_millis(u32::MAX as u64)) }) .unwrap(); } diff --git a/tests/sharded_lock.rs b/tests/sharded_lock.rs index 73bd489..c362154 100644 --- a/tests/sharded_lock.rs +++ b/tests/sharded_lock.rs @@ -176,11 +176,8 @@ fn try_write() { let write_result = lock.try_write(); match write_result { Err(TryLockError::WouldBlock) => (), - Ok(_) => assert!( - false, - "try_write should not succeed while read_guard is in scope" - ), - Err(_) => assert!(false, "unexpected error"), + Ok(_) => panic!("try_write should not succeed while read_guard is in scope"), + Err(_) => panic!("unexpected error"), } drop(read_guard); |