diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 05:00:20 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 05:00:20 +0000 |
commit | 43f25f13aef75f0e1ab59eafb33d8f99f63831cb (patch) | |
tree | 30fab8e942b35661e92e0f597cb0da429f72430d | |
parent | 1241a8cb4015f7d625909134ef9e60622cb8afef (diff) | |
parent | b0f9c5ad609b8a73c2c9f86fd63086a05ece90e9 (diff) | |
download | spin-43f25f13aef75f0e1ab59eafb33d8f99f63831cb.tar.gz |
Snap for 10453563 from b0f9c5ad609b8a73c2c9f86fd63086a05ece90e9 to mainline-mediaprovider-releaseaml_mpr_341713020aml_mpr_341614010aml_mpr_341511070aml_mpr_341411070aml_mpr_341313030aml_mpr_341111030aml_mpr_341111020aml_mpr_341015090aml_mpr_341015030aml_mpr_340919000android14-mainline-mediaprovider-release
Change-Id: Ib272e6d70be4c53f823dfd57496d9eaf203f52f6
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | .github/workflows/rust.yml | 48 | ||||
-rw-r--r-- | Android.bp | 16 | ||||
-rw-r--r-- | CHANGELOG.md | 47 | ||||
-rw-r--r-- | Cargo.toml | 62 | ||||
-rw-r--r-- | Cargo.toml.orig | 31 | ||||
-rw-r--r-- | METADATA | 14 | ||||
-rw-r--r-- | README.md | 22 | ||||
-rw-r--r-- | TEST_MAPPING | 51 | ||||
-rw-r--r-- | benches/mutex.rs | 128 | ||||
-rw-r--r-- | cargo2android.json | 2 | ||||
-rw-r--r-- | src/barrier.rs | 16 | ||||
-rw-r--r-- | src/lazy.rs | 12 | ||||
-rw-r--r-- | src/lib.rs | 45 | ||||
-rw-r--r-- | src/mutex.rs | 20 | ||||
-rw-r--r-- | src/mutex/fair.rs | 735 | ||||
-rw-r--r-- | src/mutex/spin.rs | 70 | ||||
-rw-r--r-- | src/mutex/ticket.rs | 18 | ||||
-rw-r--r-- | src/once.rs | 257 | ||||
-rw-r--r-- | src/relax.rs | 5 | ||||
-rw-r--r-- | src/rwlock.rs | 123 |
21 files changed, 1496 insertions, 233 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 56f48b2..fd12933 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "95e2993afe52104d6d585173ddedb3da6afba533" - } -} + "sha1": "a080ab5a952290e32bc455213631ffddb4d794e4" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3c13d1b..ed2b6ce 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -9,14 +9,50 @@ on: env: CARGO_TERM_COLOR: always -jobs: - build: +permissions: read-all +jobs: + test: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + rust: [stable, beta, nightly] steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --verbose - - name: Run tests + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - name: Run Tests run: cargo test --verbose + - run: cargo build --all --all-features --all-targets + - name: Catch missing feature flags + if: startsWith(matrix.rust, 'nightly') + run: cargo check -Z features=dev_dep + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - name: Ensure we don't depend on libstd + run: cargo hack build --target thumbv7m-none-eabi --no-dev-deps --no-default-features + + msrv: + runs-on: ubuntu-latest + strategy: + matrix: + version: [1.38.0] + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.version }} && rustup default ${{ matrix.version }} + - run: cargo build --all --all-features --all-targets + + miri: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup toolchain install nightly --component miri && rustup default nightly + - run: cargo miri test + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout @@ -36,7 +36,7 @@ rust_library { host_supported: true, crate_name: "spin", cargo_env_compat: true, - cargo_pkg_version: "0.9.2", + cargo_pkg_version: "0.9.7", srcs: ["src/lib.rs"], edition: "2015", features: [ @@ -49,6 +49,8 @@ rust_library { "com.android.resolv", "com.android.virt", ], + product_available: true, + vendor_available: true, min_sdk_version: "29", } @@ -57,7 +59,7 @@ rust_test { host_supported: true, crate_name: "spin", cargo_env_compat: true, - cargo_pkg_version: "0.9.2", + cargo_pkg_version: "0.9.7", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -69,6 +71,9 @@ rust_test { "once", "std", ], + rustlibs: [ + "libcriterion", + ], } rust_library_rlib { @@ -89,10 +94,3 @@ rust_library_rlib { ], min_sdk_version: "29", } - - -// Errors when listing tests: -// error[E0433]: failed to resolve: could not find `Mutex` in `spin` -// error[E0433]: failed to resolve: could not find `RwLock` in `spin` -// error: could not compile `spin` due to 2 previous errors -// error: build failed diff --git a/CHANGELOG.md b/CHANGELOG.md index abbeee1..e62adfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,53 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +# [0.9.7] - 2023-03-27 + +### Fixed + +- Relaxed accidentally restricted `Send`/`Sync` bounds for `Mutex` guards + +# [0.9.6] - 2023-03-13 + +### Fixed + +- Relaxed accidentally restricted `Send`/`Sync` bounds for `RwLock` guards + +# [0.9.5] - 2023-02-07 + +### Added + +- `FairMutex`, a new mutex implementation that reduces writer starvation. +- A MSRV policy: Rust 1.38 is currently required + +### Changed + +- The crate's CI now has full MIRI integration, further improving the confidence you can have in the implementation. + +### Fixed + +- Ensured that the crate's abstractions comply with stacked borrows rules. +- Unsoundness in the `RwLock` that could be triggered via a reader overflow +- Relaxed various `Send`/`Sync` bound requirements to make the crate more flexible + +# [0.9.4] - 2022-07-14 + +### Fixed + +- Fixed unsoundness in `RwLock` on reader overflow +- Relaxed `Send`/`Sync` bounds for `SpinMutex` and `TicketMutex` (doesn't affect `Mutex` itself) + +# [0.9.3] - 2022-04-17 + +### Added + +- Implemented `Default` for `Once` +- `Once::try_call_once` + +### Fixed + +- Fixed bug that caused `Once::call_once` to incorrectly fail + # [0.9.2] - 2021-07-09 ### Changed @@ -3,38 +3,78 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] +rust-version = "1.38" name = "spin" -version = "0.9.2" -authors = ["Mathijs van de Nes <git@mathijs.vd-nes.nl>", "John Ericson <git@JohnEricson.me>", "Joshua Barretto <joshua.s.barretto@gmail.com>"] +version = "0.9.7" +authors = [ + "Mathijs van de Nes <git@mathijs.vd-nes.nl>", + "John Ericson <git@JohnEricson.me>", + "Joshua Barretto <joshua.s.barretto@gmail.com>", +] description = "Spin-based synchronization primitives" -keywords = ["spinlock", "mutex", "rwlock"] +readme = "README.md" +keywords = [ + "spinlock", + "mutex", + "rwlock", +] license = "MIT" repository = "https://github.com/mvdnes/spin-rs.git" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] + +[[bench]] +name = "mutex" +harness = false +required-features = ["ticket_mutex"] + [dependencies.lock_api_crate] version = "0.4" optional = true package = "lock_api" +[dependencies.portable-atomic] +version = "1" +optional = true +default-features = false + +[dev-dependencies.criterion] +version = "0.4" + [features] barrier = ["mutex"] -default = ["lock_api", "mutex", "spin_mutex", "rwlock", "once", "lazy", "barrier"] +default = [ + "lock_api", + "mutex", + "spin_mutex", + "rwlock", + "once", + "lazy", + "barrier", +] +fair_mutex = ["mutex"] lazy = ["once"] lock_api = ["lock_api_crate"] mutex = [] once = [] +portable_atomic = ["portable-atomic"] rwlock = [] spin_mutex = ["mutex"] std = [] ticket_mutex = ["mutex"] -use_ticket_mutex = ["mutex", "ticket_mutex"] +use_ticket_mutex = [ + "mutex", + "ticket_mutex", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index ee6fb09..ca2fdc3 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,18 +1,20 @@ [package] name = "spin" -version = "0.9.2" +version = "0.9.7" authors = [ - "Mathijs van de Nes <git@mathijs.vd-nes.nl>", - "John Ericson <git@JohnEricson.me>", - "Joshua Barretto <joshua.s.barretto@gmail.com>", + "Mathijs van de Nes <git@mathijs.vd-nes.nl>", + "John Ericson <git@JohnEricson.me>", + "Joshua Barretto <joshua.s.barretto@gmail.com>", ] license = "MIT" repository = "https://github.com/mvdnes/spin-rs.git" keywords = ["spinlock", "mutex", "rwlock"] description = "Spin-based synchronization primitives" +rust-version = "1.38" [dependencies] lock_api_crate = { package = "lock_api", version = "0.4", optional = true } +portable-atomic = { version = "1", optional = true, default-features = false } [features] default = ["lock_api", "mutex", "spin_mutex", "rwlock", "once", "lazy", "barrier"] @@ -26,6 +28,9 @@ spin_mutex = ["mutex"] # Enables `TicketMutex`. ticket_mutex = ["mutex"] +# Enables `FairMutex`. +fair_mutex = ["mutex"] + # Enables the non-default ticket mutex implementation for `Mutex`. use_ticket_mutex = ["mutex", "ticket_mutex"] @@ -47,6 +52,24 @@ lock_api = ["lock_api_crate"] # Enables std-only features such as yield-relaxing. std = [] +# Use the portable_atomic crate to support platforms without native atomic operations. +# The `portable_atomic_unsafe_assume_single_core` cfg or `critical-section` feature +# of `portable-atomic` crate must also be set by the final binary crate. +# When using the cfg, note that it is unsafe and enabling it for multicore systems is unsound. +# When using the `critical-section` feature, you need to implement the critical-section +# implementation that sound for your system by implementing an unsafe trait. +# See the documentation for the `portable-atomic` crate for more information: +# https://docs.rs/portable-atomic/latest/portable_atomic/#optional-cfg +portable_atomic = ["portable-atomic"] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] + +[dev-dependencies] +criterion = "0.4" + +[[bench]] +name = "mutex" +harness = false +required-features = ["ticket_mutex"] @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/spin +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "spin" description: "Spin-based synchronization primitives" third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/spin/spin-0.9.2.crate" + value: "https://static.crates.io/crates/spin/spin-0.9.7.crate" } - version: "0.9.2" + version: "0.9.7" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 8 - day: 9 + year: 2023 + month: 4 + day: 3 } } @@ -92,6 +92,23 @@ The crate comes with a few feature flags that you may wish to use. - `std` enables support for thread yielding instead of spinning. +- `portable_atomic` enables usage of the `portable-atomic` crate + to support platforms without native atomic operations (Cortex-M0, etc.). + The `portable_atomic_unsafe_assume_single_core` cfg or `critical-section` feature + of `portable-atomic` crate must also be set by the final binary crate. + + When using the cfg, this can be done by adapting the following snippet to the `.cargo/config` file: + ``` + [target.<target>] + rustflags = [ "--cfg", "portable_atomic_unsafe_assume_single_core" ] + ``` + Note that this cfg is unsafe by nature, and enabling it for multicore systems is unsound. + + When using the `critical-section` feature, you need to implement the critical-section + implementation that sound for your system by implementing an unsafe trait. + See [the documentation for the `portable-atomic` crate](https://docs.rs/portable-atomic/latest/portable_atomic/#optional-cfg) + for more information. + ## Remarks It is often desirable to have a lock shared between threads. Wrapping the lock in an @@ -116,6 +133,11 @@ time for your crate's users. You can do this like so: spin = { version = "x.y", default-features = false, features = [...] } ``` +## Minimum Safe Rust Version (MSRV) + +This crate is guaranteed to compile on a Minimum Safe Rust Version (MSRV) of 1.38.0 and above. +This version will not be changed without a minor version bump. + ## License `spin` is distributed under the MIT License, (See `LICENSE`). diff --git a/TEST_MAPPING b/TEST_MAPPING index 777e539..c028b97 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -9,64 +9,19 @@ }, { "path": "external/rust/crates/webpki" + }, + { + "path": "packages/modules/DnsResolver" } ], "presubmit": [ { - "name": "apkdmverity.test" - }, - { - "name": "authfs_device_test_src_lib" - }, - { - "name": "doh_unit_test" - }, - { - "name": "libapkverify.integration_test" - }, - { - "name": "libapkverify.test" - }, - { - "name": "libidsig.test" - }, - { - "name": "microdroid_manager_test" - }, - { "name": "spin_test_src_lib" - }, - { - "name": "virtualizationservice_device_test" } ], "presubmit-rust": [ { - "name": "apkdmverity.test" - }, - { - "name": "authfs_device_test_src_lib" - }, - { - "name": "doh_unit_test" - }, - { - "name": "libapkverify.integration_test" - }, - { - "name": "libapkverify.test" - }, - { - "name": "libidsig.test" - }, - { - "name": "microdroid_manager_test" - }, - { "name": "spin_test_src_lib" - }, - { - "name": "virtualizationservice_device_test" } ] } diff --git a/benches/mutex.rs b/benches/mutex.rs new file mode 100644 index 0000000..5201145 --- /dev/null +++ b/benches/mutex.rs @@ -0,0 +1,128 @@ +#![feature(generic_associated_types)] + +#[macro_use] +extern crate criterion; + +use criterion::{Criterion, Bencher, black_box}; +use std::{ + ops::DerefMut, + sync::Arc, +}; + +trait Mutex<T>: Send + Sync + 'static { + type Guard<'a>: DerefMut<Target = T> where Self: 'a; + fn new(x: T) -> Self; + fn lock(&self) -> Self::Guard<'_>; +} + +impl<T: Send + 'static> Mutex<T> for spin::mutex::SpinMutex<T> { + type Guard<'a> = spin::mutex::SpinMutexGuard<'a, T> where Self: 'a; + fn new(x: T) -> Self { spin::mutex::SpinMutex::new(x) } + fn lock(&self) -> Self::Guard<'_> { self.lock() } +} + +impl<T: Send + 'static> Mutex<T> for spin::mutex::TicketMutex<T> { + type Guard<'a> = spin::mutex::TicketMutexGuard<'a, T> where Self: 'a; + fn new(x: T) -> Self { spin::mutex::TicketMutex::new(x) } + fn lock(&self) -> Self::Guard<'_> { self.lock() } +} + +impl<T: Send + 'static> Mutex<T> for std::sync::Mutex<T> { + type Guard<'a> = std::sync::MutexGuard<'a, T> where Self: 'a; + fn new(x: T) -> Self { std::sync::Mutex::new(x) } + fn lock(&self) -> Self::Guard<'_> { self.lock().unwrap() } +} + +fn gen_create<M: Mutex<u32>>(b: &mut Bencher) { + b.iter(|| { + let n = black_box(42); + M::new(n) + }); +} + +fn gen_lock_unlock<M: Mutex<u32>>(b: &mut Bencher) { + let m = M::new(0); + b.iter(|| { + let mut m = m.lock(); + *m = m.wrapping_add(1); + drop(m); + }); +} + +fn gen_lock_unlock_read_contention<M: Mutex<u32>>(b: &mut Bencher) { + let m = Arc::new(M::new(0)); + let thread = std::thread::spawn({ + let m = m.clone(); + move || { + while Arc::strong_count(&m) > 1 { + for _ in 0..1000 { + black_box(*m.lock()); + } + } + } + }); + b.iter(|| { + let mut m = m.lock(); + *m = m.wrapping_add(1); + drop(m); + }); + drop(m); + thread.join().unwrap(); +} + +fn gen_lock_unlock_write_contention<M: Mutex<u32>>(b: &mut Bencher) { + let m = Arc::new(M::new(0)); + let thread = std::thread::spawn({ + let m = m.clone(); + move || { + while Arc::strong_count(&m) > 1 { + for _ in 0..1000 { + let mut m = m.lock(); + *m = m.wrapping_add(1); + drop(m); + } + } + } + }); + b.iter(|| { + let mut m = m.lock(); + *m = m.wrapping_add(1); + drop(m); + }); + drop(m); + thread.join().unwrap(); +} + +fn create(b: &mut Criterion) { + b.bench_function("create-spin-spinmutex", |b| gen_create::<spin::mutex::SpinMutex<u32>>(b)); + b.bench_function("create-spin-ticketmutex", |b| gen_create::<spin::mutex::TicketMutex<u32>>(b)); + b.bench_function("create-std", |b| gen_create::<std::sync::Mutex<u32>>(b)); +} + +fn lock_unlock(b: &mut Criterion) { + b.bench_function("lock_unlock-spin-spinmutex", |b| gen_lock_unlock::<spin::mutex::SpinMutex<u32>>(b)); + b.bench_function("lock_unlock-spin-ticketmutex", |b| gen_lock_unlock::<spin::mutex::TicketMutex<u32>>(b)); + b.bench_function("lock_unlock-std", |b| gen_lock_unlock::<std::sync::Mutex<u32>>(b)); +} + +fn lock_unlock_read_contention(b: &mut Criterion) { + b.bench_function("lock_unlock_read_contention-spin-spinmutex", |b| gen_lock_unlock_read_contention::<spin::mutex::SpinMutex<u32>>(b)); + b.bench_function("lock_unlock_read_contention-spin-ticketmutex", |b| gen_lock_unlock_read_contention::<spin::mutex::TicketMutex<u32>>(b)); + b.bench_function("lock_unlock_read_contention-std", |b| gen_lock_unlock_read_contention::<std::sync::Mutex<u32>>(b)); +} + +fn lock_unlock_write_contention(b: &mut Criterion) { + b.bench_function("lock_unlock_write_contention-spin-spinmutex", |b| gen_lock_unlock_write_contention::<spin::mutex::SpinMutex<u32>>(b)); + b.bench_function("lock_unlock_write_contention-spin-ticketmutex", |b| gen_lock_unlock_write_contention::<spin::mutex::TicketMutex<u32>>(b)); + b.bench_function("lock_unlock_write_contention-std", |b| gen_lock_unlock_write_contention::<std::sync::Mutex<u32>>(b)); +} + +criterion_group!( + mutex, + create, + lock_unlock, + lock_unlock_read_contention, + lock_unlock_write_contention, +); + +criterion_main!(mutex); diff --git a/cargo2android.json b/cargo2android.json index 086d38a..0be577a 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -12,4 +12,4 @@ "min-sdk-version": "29", "run": true, "tests": true -}
\ No newline at end of file +} diff --git a/src/barrier.rs b/src/barrier.rs index 7a13890..c3a1c92 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -115,8 +115,7 @@ impl<R: RelaxStrategy> Barrier<R> { // not the leader let local_gen = lock.generation_id; - while local_gen == lock.generation_id && - lock.count < self.num_threads { + while local_gen == lock.generation_id && lock.count < self.num_threads { drop(lock); R::relax(); lock = self.lock.lock(); @@ -176,7 +175,9 @@ impl BarrierWaitResult { /// let barrier_wait_result = barrier.wait(); /// println!("{:?}", barrier_wait_result.is_leader()); /// ``` - pub fn is_leader(&self) -> bool { self.0 } + pub fn is_leader(&self) -> bool { + self.0 + } } #[cfg(test)] @@ -192,12 +193,13 @@ mod tests { fn use_barrier(n: usize, barrier: Arc<Barrier>) { let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..n - 1 { let c = barrier.clone(); let tx = tx.clone(); - thread::spawn(move|| { + ts.push(thread::spawn(move || { tx.send(c.wait().is_leader()).unwrap(); - }); + })); } // At this point, all spawned threads should be blocked, @@ -217,6 +219,10 @@ mod tests { } } assert!(leader_found); + + for t in ts { + t.join().unwrap(); + } } #[test] diff --git a/src/lazy.rs b/src/lazy.rs index 1473db1..6e5efe4 100644 --- a/src/lazy.rs +++ b/src/lazy.rs @@ -3,8 +3,8 @@ //! Implementation adapted from the `SyncLazy` type of the standard library. See: //! <https://doc.rust-lang.org/std/lazy/struct.SyncLazy.html> -use core::{cell::Cell, fmt, ops::Deref}; use crate::{once::Once, RelaxStrategy, Spin}; +use core::{cell::Cell, fmt, ops::Deref}; /// A value which is initialized on the first access. /// @@ -45,7 +45,10 @@ pub struct Lazy<T, F = fn() -> T, R = Spin> { impl<T: fmt::Debug, F, R> fmt::Debug for Lazy<T, F, R> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Lazy").field("cell", &self.cell).field("init", &"..").finish() + f.debug_struct("Lazy") + .field("cell", &self.cell) + .field("init", &"..") + .finish() } } @@ -61,7 +64,10 @@ impl<T, F, R> Lazy<T, F, R> { /// Creates a new lazy value with the given initializing /// function. pub const fn new(f: F) -> Self { - Self { cell: Once::new(), init: Cell::new(Some(f)) } + Self { + cell: Once::new(), + init: Cell::new(Some(f)), + } } /// Retrieves a mutable pointer to the inner data. /// @@ -55,11 +55,22 @@ //! //! - `ticket_mutex` uses a ticket lock for the implementation of `Mutex` //! +//! - `fair_mutex` enables a fairer implementation of `Mutex` that uses eventual fairness to avoid +//! starvation +//! //! - `std` enables support for thread yielding instead of spinning #[cfg(any(test, feature = "std"))] extern crate core; +#[cfg(feature = "portable_atomic")] +extern crate portable_atomic; + +#[cfg(not(feature = "portable_atomic"))] +use core::sync::atomic; +#[cfg(feature = "portable_atomic")] +use portable_atomic as atomic; + #[cfg(feature = "barrier")] #[cfg_attr(docsrs, doc(cfg(feature = "barrier")))] pub mod barrier; @@ -72,21 +83,21 @@ pub mod mutex; #[cfg(feature = "once")] #[cfg_attr(docsrs, doc(cfg(feature = "once")))] pub mod once; +pub mod relax; #[cfg(feature = "rwlock")] #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] pub mod rwlock; -pub mod relax; #[cfg(feature = "mutex")] #[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] pub use mutex::MutexGuard; -#[cfg(feature = "rwlock")] -#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] -pub use rwlock::RwLockReadGuard; -pub use relax::{Spin, RelaxStrategy}; #[cfg(feature = "std")] #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub use relax::Yield; +pub use relax::{RelaxStrategy, Spin}; +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub use rwlock::RwLockReadGuard; // Avoid confusing inference errors by aliasing away the relax strategy parameter. Users that need to use a different // relax strategy can do so by accessing the types through their fully-qualified path. This is a little bit horrible @@ -184,3 +195,27 @@ pub mod lock_api { pub type RwLockUpgradableReadGuard<'a, T> = lock_api_crate::RwLockUpgradableReadGuard<'a, crate::RwLock<()>, T>; } + +/// In the event of an invalid operation, it's best to abort the current process. +#[cfg(feature = "fair_mutex")] +fn abort() -> ! { + #[cfg(not(feature = "std"))] + { + // Panicking while panicking is defined by Rust to result in an abort. + struct Panic; + + impl Drop for Panic { + fn drop(&mut self) { + panic!("aborting due to invalid operation"); + } + } + + let _panic = Panic; + panic!("aborting due to invalid operation"); + } + + #[cfg(feature = "std")] + { + std::process::abort(); + } +} diff --git a/src/mutex.rs b/src/mutex.rs index 2335051..e333d8a 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -27,11 +27,18 @@ pub mod ticket; #[cfg_attr(docsrs, doc(cfg(feature = "ticket_mutex")))] pub use self::ticket::{TicketMutex, TicketMutexGuard}; +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub mod fair; +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub use self::fair::{FairMutex, FairMutexGuard, Starvation}; + +use crate::{RelaxStrategy, Spin}; use core::{ fmt, ops::{Deref, DerefMut}, }; -use crate::{RelaxStrategy, Spin}; #[cfg(all(not(feature = "spin_mutex"), not(feature = "use_ticket_mutex")))] compile_error!("The `mutex` feature flag was used (perhaps through another feature?) without either `spin_mutex` or `use_ticket_mutex`. One of these is required."); @@ -78,9 +85,11 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>; /// // We use a barrier to ensure the readout happens after all writing /// let barrier = Arc::new(Barrier::new(thread_count + 1)); /// +/// # let mut ts = Vec::new(); /// for _ in (0..thread_count) { /// let my_barrier = barrier.clone(); /// let my_lock = spin_mutex.clone(); +/// # let t = /// std::thread::spawn(move || { /// let mut guard = my_lock.lock(); /// *guard += 1; @@ -89,12 +98,17 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>; /// drop(guard); /// my_barrier.wait(); /// }); +/// # ts.push(t); /// } /// /// barrier.wait(); /// /// let answer = { *spin_mutex.lock() }; /// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } /// ``` pub struct Mutex<T: ?Sized, R = Spin> { inner: InnerMutex<T, R>, @@ -132,7 +146,9 @@ impl<T, R> Mutex<T, R> { /// ``` #[inline(always)] pub const fn new(value: T) -> Self { - Self { inner: InnerMutex::new(value) } + Self { + inner: InnerMutex::new(value), + } } /// Consumes this [`Mutex`] and unwraps the underlying data. diff --git a/src/mutex/fair.rs b/src/mutex/fair.rs new file mode 100644 index 0000000..db07ad6 --- /dev/null +++ b/src/mutex/fair.rs @@ -0,0 +1,735 @@ +//! A spinning mutex with a fairer unlock algorithm. +//! +//! This mutex is similar to the `SpinMutex` in that it uses spinning to avoid +//! context switches. However, it uses a fairer unlock algorithm that avoids +//! starvation of threads that are waiting for the lock. + +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, +}; + +// The lowest bit of `lock` is used to indicate whether the mutex is locked or not. The rest of the bits are used to +// store the number of starving threads. +const LOCKED: usize = 1; +const STARVED: usize = 2; + +/// Number chosen by fair roll of the dice, adjust as needed. +const STARVATION_SPINS: usize = 1024; + +/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data, but with a fairer +/// algorithm. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::mutex::FairMutex::<_>::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::mutex::FairMutex::<_>::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// ``` +pub struct FairMutex<T: ?Sized, R = Spin> { + phantom: PhantomData<R>, + pub(crate) lock: AtomicUsize, + data: UnsafeCell<T>, +} + +/// A guard that provides mutable data access. +/// +/// When the guard falls out of scope it will release the lock. +pub struct FairMutexGuard<'a, T: ?Sized + 'a> { + lock: &'a AtomicUsize, + data: *mut T, +} + +/// A handle that indicates that we have been trying to acquire the lock for a while. +/// +/// This handle is used to prevent starvation. +pub struct Starvation<'a, T: ?Sized + 'a, R> { + lock: &'a FairMutex<T, R>, +} + +/// Indicates whether a lock was rejected due to the lock being held by another thread or due to starvation. +#[derive(Debug)] +pub enum LockRejectReason { + /// The lock was rejected due to the lock being held by another thread. + Locked, + + /// The lock was rejected due to starvation. + Starved, +} + +// Same unsafe impls as `std::sync::Mutex` +unsafe impl<T: ?Sized + Send, R> Sync for FairMutex<T, R> {} +unsafe impl<T: ?Sized + Send, R> Send for FairMutex<T, R> {} + +unsafe impl<T: ?Sized + Sync> Sync for FairMutexGuard<'_, T> {} +unsafe impl<T: ?Sized + Send> Send for FairMutexGuard<'_, T> {} + +impl<T, R> FairMutex<T, R> { + /// Creates a new [`FairMutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::mutex::FairMutex; + /// + /// static MUTEX: FairMutex<()> = FairMutex::<_>::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(data: T) -> Self { + FairMutex { + lock: AtomicUsize::new(0), + data: UnsafeCell::new(data), + phantom: PhantomData, + } + } + + /// Consumes this [`FairMutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock. + let FairMutex { data, .. } = self; + data.into_inner() + } + + /// Returns a mutable pointer to the underlying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// # Example + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.lock()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_unlock(); + /// } + /// + /// assert_eq!(*lock.lock(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl<T: ?Sized, R: RelaxStrategy> FairMutex<T, R> { + /// Locks the [`FairMutex`] and returns a guard that permits access to the inner data. + /// + /// The returned value may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> FairMutexGuard<T> { + // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` + // when called in a loop. + let mut spins = 0; + while self + .lock + .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + // Wait until the lock looks unlocked before retrying + while self.is_locked() { + R::relax(); + + // If we've been spinning for a while, switch to a fairer strategy that will prevent + // newer users from stealing our lock from us. + if spins > STARVATION_SPINS { + return self.starve().lock(); + } + spins += 1; + } + } + + FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + } + } +} + +impl<T: ?Sized, R> FairMutex<T, R> { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) & LOCKED != 0 + } + + /// Force unlock this [`FairMutex`]. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } + + /// Try to lock this [`FairMutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option<FairMutexGuard<T>> { + self.try_lock_starver().ok() + } + + /// Tries to lock this [`FairMutex`] and returns a result that indicates whether the lock was + /// rejected due to a starver or not. + #[inline(always)] + pub fn try_lock_starver(&self) -> Result<FairMutexGuard<T>, LockRejectReason> { + match self + .lock + .compare_exchange(0, LOCKED, Ordering::Acquire, Ordering::Relaxed) + .unwrap_or_else(|x| x) + { + 0 => Ok(FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + }), + LOCKED => Err(LockRejectReason::Locked), + _ => Err(LockRejectReason::Starved), + } + } + + /// Indicates that the current user has been waiting for the lock for a while + /// and that the lock should yield to this thread over a newly arriving thread. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn starve(&self) -> Starvation<'_, T, R> { + // Add a new starver to the state. + if self.lock.fetch_add(STARVED, Ordering::Relaxed) > (core::isize::MAX - 1) as usize { + // In the event of a potential lock overflow, abort. + crate::abort(); + } + + Starvation { lock: self } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`FairMutex`] mutably, and a mutable reference is guaranteed to be exclusive in + /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As + /// such, this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::mutex::FairMutex::<_>::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner mutex. + unsafe { &mut *self.data.get() } + } +} + +impl<T: ?Sized + fmt::Debug, R> fmt::Debug for FairMutex<T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + struct LockWrapper<'a, T: ?Sized + fmt::Debug>(Option<FairMutexGuard<'a, T>>); + + impl<T: ?Sized + fmt::Debug> fmt::Debug for LockWrapper<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self.0 { + Some(guard) => fmt::Debug::fmt(guard, f), + None => f.write_str("<locked>"), + } + } + } + + f.debug_struct("FairMutex") + .field("data", &LockWrapper(self.try_lock())) + .finish() + } +} + +impl<T: ?Sized + Default, R> Default for FairMutex<T, R> { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl<T, R> From<T> for FairMutex<T, R> { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> FairMutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`FairMutex`]. + /// + /// ``` + /// let mylock = spin::mutex::FairMutex::<_>::new(0); + /// + /// let data: &mut i32 = spin::mutex::FairMutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for FairMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for FairMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // We know statically that only we are referencing data + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized> Drop for FairMutexGuard<'a, T> { + /// The dropping of the MutexGuard will release the lock it was created from. + fn drop(&mut self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } +} + +impl<'a, T: ?Sized, R> Starvation<'a, T, R> { + /// Attempts the lock the mutex if we are the only starving user. + /// + /// This allows another user to lock the mutex if they are starving as well. + pub fn try_lock_fair(self) -> Result<FairMutexGuard<'a, T>, Self> { + // Try to lock the mutex. + if self + .lock + .lock + .compare_exchange( + STARVED, + STARVED | LOCKED, + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + // We are the only starving user, lock the mutex. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + // Another user is starving, fail. + Err(self) + } + } + + /// Attempts to lock the mutex. + /// + /// If the lock is currently held by another thread, this will return `None`. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn try_lock(self) -> Result<FairMutexGuard<'a, T>, Self> { + // Try to lock the mutex. + if self.lock.lock.fetch_or(LOCKED, Ordering::Acquire) & LOCKED == 0 { + // We have successfully locked the mutex. + // By dropping `self` here, we decrement the starvation count. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + Err(self) + } + } +} + +impl<'a, T: ?Sized, R: RelaxStrategy> Starvation<'a, T, R> { + /// Locks the mutex. + pub fn lock(mut self) -> FairMutexGuard<'a, T> { + // Try to lock the mutex. + loop { + match self.try_lock() { + Ok(lock) => return lock, + Err(starve) => self = starve, + } + + // Relax until the lock is released. + while self.lock.is_locked() { + R::relax(); + } + } + } +} + +impl<'a, T: ?Sized, R> Drop for Starvation<'a, T, R> { + fn drop(&mut self) { + // As there is no longer a user being starved, we decrement the starver count. + self.lock.lock.fetch_sub(STARVED, Ordering::Release); + } +} + +impl fmt::Display for LockRejectReason { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + LockRejectReason::Locked => write!(f, "locked"), + LockRejectReason::Starved => write!(f, "starved"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for LockRejectReason {} + +#[cfg(feature = "lock_api")] +unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for FairMutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + Self::is_locked(self) + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type FairMutex<T> = super::FairMutex<T>; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = FairMutex::<_>::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: FairMutex<()> = FairMutex::<_>::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + } + + #[test] + fn try_lock() { + let mutex = FairMutex::<_>::new(42); + + // First lock succeeds + let a = mutex.try_lock(); + assert_eq!(a.as_ref().map(|r| **r), Some(42)); + + // Additional lock fails + let b = mutex.try_lock(); + assert!(b.is_none()); + + // After dropping lock, it succeeds again + ::core::mem::drop(a); + let c = mutex.try_lock(); + assert_eq!(c.as_ref().map(|r| **r), Some(42)); + } + + #[test] + fn test_into_inner() { + let m = FairMutex::<_>::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = FairMutex::<_>::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = Arc::new(FairMutex::<_>::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<FairMutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &FairMutex<[i32]> = &FairMutex::<_>::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutex_force_lock() { + let lock = FairMutex::<_>::new(()); + ::std::mem::forget(lock.lock()); + unsafe { + lock.force_unlock(); + } + assert!(lock.try_lock().is_some()); + } +} diff --git a/src/mutex/spin.rs b/src/mutex/spin.rs index fce3eb9..561d765 100644 --- a/src/mutex/spin.rs +++ b/src/mutex/spin.rs @@ -3,14 +3,17 @@ //! Waiting threads hammer an atomic variable until it becomes available. Best-case latency is low, but worst-case //! latency is theoretically infinite. +use crate::{ + atomic::{AtomicBool, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, fmt, - ops::{Deref, DerefMut}, - sync::atomic::{AtomicBool, Ordering}, marker::PhantomData, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, }; -use crate::{RelaxStrategy, Spin}; /// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data. /// @@ -41,9 +44,11 @@ use crate::{RelaxStrategy, Spin}; /// // We use a barrier to ensure the readout happens after all writing /// let barrier = Arc::new(Barrier::new(thread_count + 1)); /// +/// # let mut ts = Vec::new(); /// for _ in (0..thread_count) { /// let my_barrier = barrier.clone(); /// let my_lock = spin_mutex.clone(); +/// # let t = /// std::thread::spawn(move || { /// let mut guard = my_lock.lock(); /// *guard += 1; @@ -52,12 +57,17 @@ use crate::{RelaxStrategy, Spin}; /// drop(guard); /// my_barrier.wait(); /// }); +/// # ts.push(t); /// } /// /// barrier.wait(); /// /// let answer = { *spin_mutex.lock() }; /// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } /// ``` pub struct SpinMutex<T: ?Sized, R = Spin> { phantom: PhantomData<R>, @@ -70,12 +80,15 @@ pub struct SpinMutex<T: ?Sized, R = Spin> { /// When the guard falls out of scope it will release the lock. pub struct SpinMutexGuard<'a, T: ?Sized + 'a> { lock: &'a AtomicBool, - data: &'a mut T, + data: *mut T, } // Same unsafe impls as `std::sync::Mutex` -unsafe impl<T: ?Sized + Send> Sync for SpinMutex<T> {} -unsafe impl<T: ?Sized + Send> Send for SpinMutex<T> {} +unsafe impl<T: ?Sized + Send, R> Sync for SpinMutex<T, R> {} +unsafe impl<T: ?Sized + Send, R> Send for SpinMutex<T, R> {} + +unsafe impl<T: ?Sized + Sync> Sync for SpinMutexGuard<'_, T> {} +unsafe impl<T: ?Sized + Send> Send for SpinMutexGuard<'_, T> {} impl<T, R> SpinMutex<T, R> { /// Creates a new [`SpinMutex`] wrapping the supplied data. @@ -129,7 +142,7 @@ impl<T, R> SpinMutex<T, R> { /// /// unsafe { /// core::mem::forget(lock.lock()); - /// + /// /// assert_eq!(lock.as_mut_ptr().read(), 42); /// lock.as_mut_ptr().write(58); /// @@ -164,7 +177,11 @@ impl<T: ?Sized, R: RelaxStrategy> SpinMutex<T, R> { pub fn lock(&self) -> SpinMutexGuard<T> { // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` // when called in a loop. - while self.lock.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + while self + .lock + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { // Wait until the lock looks unlocked before retrying while self.is_locked() { R::relax(); @@ -220,7 +237,11 @@ impl<T: ?Sized, R> SpinMutex<T, R> { pub fn try_lock(&self) -> Option<SpinMutexGuard<T>> { // The reason for using a strong compare_exchange is explained here: // https://github.com/Amanieu/parking_lot/pull/207#issuecomment-575869107 - if self.lock.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_ok() { + if self + .lock + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { Some(SpinMutexGuard { lock: &self.lock, data: unsafe { &mut *self.data.get() }, @@ -289,9 +310,10 @@ impl<'a, T: ?Sized> SpinMutexGuard<'a, T> { /// ``` #[inline(always)] pub fn leak(this: Self) -> &'a mut T { - let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing - core::mem::forget(this); - unsafe { &mut *data } + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } } } @@ -310,13 +332,15 @@ impl<'a, T: ?Sized + fmt::Display> fmt::Display for SpinMutexGuard<'a, T> { impl<'a, T: ?Sized> Deref for SpinMutexGuard<'a, T> { type Target = T; fn deref(&self) -> &T { - self.data + // We know statically that only we are referencing data + unsafe { &*self.data } } } impl<'a, T: ?Sized> DerefMut for SpinMutexGuard<'a, T> { fn deref_mut(&mut self) -> &mut T { - self.data + // We know statically that only we are referencing data + unsafe { &mut *self.data } } } @@ -390,17 +414,18 @@ mod tests { } let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..K { let tx2 = tx.clone(); - thread::spawn(move || { + ts.push(thread::spawn(move || { inc(); tx2.send(()).unwrap(); - }); + })); let tx2 = tx.clone(); - thread::spawn(move || { + ts.push(thread::spawn(move || { inc(); tx2.send(()).unwrap(); - }); + })); } drop(tx); @@ -408,6 +433,10 @@ mod tests { rx.recv().unwrap(); } assert_eq!(unsafe { CNT }, J * K * 2); + + for t in ts { + t.join().unwrap(); + } } #[test] @@ -418,7 +447,7 @@ mod tests { let a = mutex.try_lock(); assert_eq!(a.as_ref().map(|r| **r), Some(42)); - // Additional lock failes + // Additional lock fails let b = mutex.try_lock(); assert!(b.is_none()); @@ -459,13 +488,14 @@ mod tests { let arc = Arc::new(SpinMutex::<_>::new(1)); let arc2 = Arc::new(SpinMutex::<_>::new(arc)); let (tx, rx) = channel(); - let _t = thread::spawn(move || { + let t = thread::spawn(move || { let lock = arc2.lock(); let lock2 = lock.lock(); assert_eq!(*lock2, 1); tx.send(()).unwrap(); }); rx.recv().unwrap(); + t.join().unwrap(); } #[test] diff --git a/src/mutex/ticket.rs b/src/mutex/ticket.rs index 128b434..01b905e 100644 --- a/src/mutex/ticket.rs +++ b/src/mutex/ticket.rs @@ -5,18 +5,20 @@ //! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the //! queue to finish. +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, fmt, - ops::{Deref, DerefMut}, - sync::atomic::{AtomicUsize, Ordering}, marker::PhantomData, + ops::{Deref, DerefMut}, }; -use crate::{RelaxStrategy, Spin}; /// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data. /// -/// A ticket lock is analagous to a queue management system for lock requests. When a thread tries to take a lock, it +/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it /// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the /// next ticket will be processed. /// @@ -84,8 +86,8 @@ pub struct TicketMutexGuard<'a, T: ?Sized + 'a> { data: &'a mut T, } -unsafe impl<T: ?Sized + Send> Sync for TicketMutex<T> {} -unsafe impl<T: ?Sized + Send> Send for TicketMutex<T> {} +unsafe impl<T: ?Sized + Send, R> Sync for TicketMutex<T, R> {} +unsafe impl<T: ?Sized + Send, R> Send for TicketMutex<T, R> {} impl<T, R> TicketMutex<T, R> { /// Creates a new [`TicketMutex`] wrapping the supplied data. @@ -136,7 +138,7 @@ impl<T, R> TicketMutex<T, R> { /// /// unsafe { /// core::mem::forget(lock.lock()); - /// + /// /// assert_eq!(lock.as_mut_ptr().read(), 42); /// lock.as_mut_ptr().write(58); /// @@ -440,7 +442,7 @@ mod tests { let a = mutex.try_lock(); assert_eq!(a.as_ref().map(|r| **r), Some(42)); - // Additional lock failes + // Additional lock fails let b = mutex.try_lock(); assert!(b.is_none()); diff --git a/src/once.rs b/src/once.rs index e4aadee..5f0186d 100644 --- a/src/once.rs +++ b/src/once.rs @@ -1,13 +1,10 @@ - //! Synchronization primitives for one-time evaluation. - -use core::{ - cell::UnsafeCell, - mem::MaybeUninit, - sync::atomic::{AtomicU8, Ordering}, - marker::PhantomData, - fmt, +//! Synchronization primitives for one-time evaluation. + +use crate::{ + atomic::{AtomicU8, Ordering}, + RelaxStrategy, Spin, }; -use crate::{RelaxStrategy, Spin}; +use core::{cell::UnsafeCell, fmt, marker::PhantomData, mem::MaybeUninit}; /// A primitive that provides lazy one-time initialization. /// @@ -34,13 +31,19 @@ pub struct Once<T = (), R = Spin> { data: UnsafeCell<MaybeUninit<T>>, } +impl<T, R> Default for Once<T, R> { + fn default() -> Self { + Self::new() + } +} + impl<T: fmt::Debug, R> fmt::Debug for Once<T, R> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.get() { Some(s) => write!(f, "Once {{ data: ") - .and_then(|()| s.fmt(f)) - .and_then(|()| write!(f, "}}")), - None => write!(f, "Once {{ <uninitialized> }}") + .and_then(|()| s.fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "Once {{ <uninitialized> }}"), } } } @@ -99,14 +102,22 @@ mod status { self.0.store(status as u8, ordering); } #[inline(always)] - pub fn compare_exchange(&self, old: Status, new: Status, success: Ordering, failure: Ordering) -> Result<Status, Status> { - match self.0.compare_exchange(old as u8, new as u8, success, failure) { + pub fn compare_exchange( + &self, + old: Status, + new: Status, + success: Ordering, + failure: Ordering, + ) -> Result<Status, Status> { + match self + .0 + .compare_exchange(old as u8, new as u8, success, failure) + { // SAFETY: A compare exchange will always return a value that was later stored into // the atomic u8, but due to the invariant that it must be a valid Status, we know // that both Ok(_) and Err(_) will be safely transmutable. - Ok(ok) => Ok(unsafe { Status::new_unchecked(ok) }), - Err(err) => Ok(unsafe { Status::new_unchecked(err) }), + Err(err) => Err(unsafe { Status::new_unchecked(err) }), } } #[inline(always)] @@ -117,7 +128,7 @@ mod status { } } } -use self::status::{Status, AtomicStatus}; +use self::status::{AtomicStatus, Status}; use core::hint::unreachable_unchecked as unreachable; @@ -157,6 +168,46 @@ impl<T, R: RelaxStrategy> Once<T, R> { /// } /// ``` pub fn call_once<F: FnOnce() -> T>(&self, f: F) -> &T { + match self.try_call_once(|| Ok::<T, core::convert::Infallible>(f())) { + Ok(x) => x, + Err(void) => match void {}, + } + } + + /// This method is similar to `call_once`, but allows the given closure to + /// fail, and lets the `Once` in a uninitialized state if it does. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns without error, it is guaranteed that some + /// initialization has run and completed (it may not be the closure + /// specified). The returned reference will point to the result from the + /// closure that was run. + /// + /// # Panics + /// + /// This function will panic if the [`Once`] previously panicked while attempting + /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s + /// primitives. + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// static INIT: spin::Once<usize> = spin::Once::new(); + /// + /// fn get_cached_val() -> Result<usize, String> { + /// INIT.try_call_once(expensive_fallible_computation).map(|x| *x) + /// } + /// + /// fn expensive_fallible_computation() -> Result<usize, String> { + /// // ... + /// # Ok(2) + /// } + /// ``` + pub fn try_call_once<F: FnOnce() -> Result<T, E>, E>(&self, f: F) -> Result<&T, E> { // SAFETY: We perform an Acquire load because if this were to return COMPLETE, then we need // the preceding stores done while initializing, to become visible after this load. let mut status = self.status.load(Ordering::Acquire); @@ -181,16 +232,27 @@ impl<T, R: RelaxStrategy> Once<T, R> { Ordering::Acquire, ) { Ok(_must_be_state_incomplete) => { - // The compare-exchange suceeded, so we shall initialize it. + // The compare-exchange succeeded, so we shall initialize it. // We use a guard (Finish) to catch panics caused by builder - let finish = Finish { status: &self.status }; + let finish = Finish { + status: &self.status, + }; + let val = match f() { + Ok(val) => val, + Err(err) => { + // If an error occurs, clean up everything and leave. + core::mem::forget(finish); + self.status.store(Status::Incomplete, Ordering::Release); + return Err(err); + } + }; unsafe { // SAFETY: // `UnsafeCell`/deref: currently the only accessor, mutably // and immutably by cas exclusion. // `write`: pointer comes from `MaybeUninit`. - (*self.data.get()).as_mut_ptr().write(f()) + (*self.data.get()).as_mut_ptr().write(val); }; // If there were to be a panic with unwind enabled, the code would // short-circuit and never reach the point where it writes the inner data. @@ -214,7 +276,7 @@ impl<T, R: RelaxStrategy> Once<T, R> { self.status.store(Status::Complete, Ordering::Release); // This next line is mainly an optimization. - return unsafe { self.force_get() }; + return unsafe { Ok(self.force_get()) }; } // The compare-exchange failed, so we know for a fact that the status cannot be // INCOMPLETE, or it would have succeeded. @@ -222,29 +284,27 @@ impl<T, R: RelaxStrategy> Once<T, R> { } } - match status { + Ok(match status { // SAFETY: We have either checked with an Acquire load, that the status is COMPLETE, or // initialized it ourselves, in which case no additional synchronization is needed. Status::Complete => unsafe { self.force_get() }, Status::Panicked => panic!("Once panicked"), - Status::Running => self - .poll() - .unwrap_or_else(|| { - if cfg!(debug_assertions) { - unreachable!("Encountered INCOMPLETE when polling Once") - } else { - // SAFETY: This poll is guaranteed never to fail because the API of poll - // promises spinning if initialization is in progress. We've already - // checked that initialisation is in progress, and initialisation is - // monotonic: once done, it cannot be undone. We also fetched the status - // with Acquire semantics, thereby guaranteeing that the later-executed - // poll will also agree with us that initialization is in progress. Ergo, - // this poll cannot fail. - unsafe { - unreachable(); - } + Status::Running => self.poll().unwrap_or_else(|| { + if cfg!(debug_assertions) { + unreachable!("Encountered INCOMPLETE when polling Once") + } else { + // SAFETY: This poll is guaranteed never to fail because the API of poll + // promises spinning if initialization is in progress. We've already + // checked that initialisation is in progress, and initialisation is + // monotonic: once done, it cannot be undone. We also fetched the status + // with Acquire semantics, thereby guaranteeing that the later-executed + // poll will also agree with us that initialization is in progress. Ergo, + // this poll cannot fail. + unsafe { + unreachable(); } - }), + } + }), // SAFETY: The only invariant possible in addition to the aforementioned ones at the // moment, is INCOMPLETE. However, the only way for this match statement to be @@ -252,8 +312,7 @@ impl<T, R: RelaxStrategy> Once<T, R> { // which case we know for a fact that the state cannot be changed back to INCOMPLETE as // `Once`s are monotonic. Status::Incomplete => unsafe { unreachable() }, - } - + }) } /// Spins until the [`Once`] contains a value. @@ -309,7 +368,7 @@ impl<T, R> Once<T, R> { }; /// Creates a new [`Once`]. - pub const fn new() -> Self{ + pub const fn new() -> Self { Self::INIT } @@ -395,6 +454,23 @@ impl<T, R> Once<T, R> { } } + /// Returns a mutable reference to the inner value + /// + /// # Safety + /// + /// This is *extremely* unsafe if the `Once` has not already been initialized because a reference to uninitialized + /// memory will be returned, immediately triggering undefined behaviour (even if the reference goes unused). + /// However, this can be useful in some instances for exposing the `Once` to FFI or when the overhead of atomically + /// checking initialization is unacceptable and the `Once` has already been initialized. + pub unsafe fn get_mut_unchecked(&mut self) -> &mut T { + debug_assert_eq!( + self.status.load(Ordering::SeqCst), + Status::Complete, + "Attempted to access an unintialized Once. If this was to run without debug checks, this would be undefined behavior. This is a serious bug and you must fix it.", + ); + self.force_get_mut() + } + /// Returns a the inner value if the [`Once`] has been initialized. /// /// Because this method requires ownership of the [`Once`], no synchronization overhead @@ -406,6 +482,22 @@ impl<T, R> Once<T, R> { } } + /// Returns a the inner value if the [`Once`] has been initialized. + /// # Safety + /// + /// This is *extremely* unsafe if the `Once` has not already been initialized because a reference to uninitialized + /// memory will be returned, immediately triggering undefined behaviour (even if the reference goes unused) + /// This can be useful, if `Once` has already been initialized, and you want to bypass an + /// option check. + pub unsafe fn into_inner_unchecked(self) -> T { + debug_assert_eq!( + self.status.load(Ordering::SeqCst), + Status::Complete, + "Attempted to access an unintialized Once. If this was to run without debug checks, this would be undefined behavior. This is a serious bug and you must fix it.", + ); + self.force_into_inner() + } + /// Checks whether the value has been initialized. /// /// This is done using [`Acquire`](core::sync::atomic::Ordering::Acquire) ordering, and @@ -485,10 +577,13 @@ mod tests { static mut RUN: bool = false; let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..10 { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..4 { thread::yield_now() } + ts.push(thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } unsafe { O.call_once(|| { assert!(!RUN); @@ -497,7 +592,7 @@ mod tests { assert!(RUN); } tx.send(()).unwrap(); - }); + })); } unsafe { @@ -511,6 +606,10 @@ mod tests { for _ in 0..10 { rx.recv().unwrap(); } + + for t in ts { + t.join().unwrap(); + } } #[test] @@ -527,12 +626,16 @@ mod tests { static INIT: Once<usize> = Once::new(); assert!(INIT.get().is_none()); - thread::spawn(move|| { - INIT.call_once(|| loop { }); + let t = thread::spawn(move || { + INIT.call_once(|| { + thread::sleep(std::time::Duration::from_secs(3)); + 42 + }); }); assert!(INIT.get().is_none()); - } + t.join().unwrap(); + } #[test] fn poll() { @@ -543,26 +646,29 @@ mod tests { assert_eq!(INIT.poll().map(|r| *r), Some(3)); } - #[test] fn wait() { static INIT: Once<usize> = Once::new(); - std::thread::spawn(|| { + let t = std::thread::spawn(|| { assert_eq!(*INIT.wait(), 3); assert!(INIT.is_completed()); }); - for _ in 0..4 { thread::yield_now() } + for _ in 0..4 { + thread::yield_now() + } assert!(INIT.poll().is_none()); INIT.call_once(|| 3); + + t.join().unwrap(); } #[test] #[ignore = "Android uses panic_abort"] fn panic() { - use ::std::panic; + use std::panic; static INIT: Once = Once::new(); @@ -601,8 +707,11 @@ mod tests { } } + // This is sort of two test cases, but if we write them as separate test methods + // they can be executed concurrently and then fail some small fraction of the + // time. #[test] - fn drop_occurs() { + fn drop_occurs_and_skip_uninit_drop() { unsafe { CALLED = false; } @@ -612,13 +721,8 @@ mod tests { once.call_once(|| DropTest {}); } - assert!(unsafe { - CALLED - }); - } - - #[test] - fn skip_uninit_drop() { + assert!(unsafe { CALLED }); + // Now test that we skip drops for the uninitialized case. unsafe { CALLED = false; } @@ -626,8 +730,35 @@ mod tests { let once = Once::<DropTest>::new(); drop(once); - assert!(unsafe { - !CALLED - }); + assert!(unsafe { !CALLED }); + } + + #[test] + fn call_once_test() { + for _ in 0..20 { + use std::sync::atomic::AtomicUsize; + use std::sync::Arc; + use std::time::Duration; + let share = Arc::new(AtomicUsize::new(0)); + let once = Arc::new(Once::<_, Spin>::new()); + let mut hs = Vec::new(); + for _ in 0..8 { + let h = thread::spawn({ + let share = share.clone(); + let once = once.clone(); + move || { + thread::sleep(Duration::from_millis(10)); + once.call_once(|| { + share.fetch_add(1, Ordering::SeqCst); + }); + } + }); + hs.push(h); + } + for h in hs { + h.join().unwrap(); + } + assert_eq!(1, share.load(Ordering::SeqCst)); + } } } diff --git a/src/relax.rs b/src/relax.rs index 6d9a690..8842f80 100644 --- a/src/relax.rs +++ b/src/relax.rs @@ -23,7 +23,10 @@ pub struct Spin; impl RelaxStrategy for Spin { #[inline(always)] fn relax() { - core::hint::spin_loop(); + // Use the deprecated spin_loop_hint() to ensure that we don't get + // a higher MSRV than we need to. + #[allow(deprecated)] + core::sync::atomic::spin_loop_hint(); } } diff --git a/src/rwlock.rs b/src/rwlock.rs index 28602c9..beae5c1 100644 --- a/src/rwlock.rs +++ b/src/rwlock.rs @@ -1,14 +1,17 @@ //! A lock that provides data access to either one writer or many readers. +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, - ops::{Deref, DerefMut}, - sync::atomic::{AtomicUsize, Ordering}, - marker::PhantomData, fmt, + marker::PhantomData, mem, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, }; -use crate::{RelaxStrategy, Spin}; /// A lock that provides data access to either one writer or many readers. /// @@ -79,7 +82,7 @@ const WRITER: usize = 1; /// potentially releasing the lock. pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { lock: &'a AtomicUsize, - data: &'a T, + data: *const T, } /// A guard that provides mutable data access. @@ -88,7 +91,7 @@ pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> { phantom: PhantomData<R>, inner: &'a RwLock<T, R>, - data: &'a mut T, + data: *mut T, } /// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`]. @@ -101,13 +104,22 @@ pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> { pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized, R = Spin> { phantom: PhantomData<R>, inner: &'a RwLock<T, R>, - data: &'a T, + data: *const T, } // Same unsafe impls as `std::sync::RwLock` unsafe impl<T: ?Sized + Send, R> Send for RwLock<T, R> {} unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLock<T, R> {} +unsafe impl<T: ?Sized + Send + Sync, R> Send for RwLockWriteGuard<'_, T, R> {} +unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLockWriteGuard<'_, T, R> {} + +unsafe impl<T: ?Sized + Sync> Send for RwLockReadGuard<'_, T> {} +unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {} + +unsafe impl<T: ?Sized + Send + Sync, R> Send for RwLockUpgradableGuard<'_, T, R> {} +unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLockUpgradableGuard<'_, T, R> {} + impl<T, R> RwLock<T, R> { /// Creates a new spinlock wrapping the supplied data. /// @@ -155,7 +167,7 @@ impl<T, R> RwLock<T, R> { /// /// unsafe { /// core::mem::forget(lock.write()); - /// + /// /// assert_eq!(lock.as_mut_ptr().read(), 42); /// lock.as_mut_ptr().write(58); /// @@ -245,6 +257,21 @@ impl<T: ?Sized, R: RelaxStrategy> RwLock<T, R> { } impl<T: ?Sized, R> RwLock<T, R> { + // Acquire a read lock, returning the new lock value. + fn acquire_reader(&self) -> usize { + // An arbitrary cap that allows us to catch overflows long before they happen + const MAX_READERS: usize = core::usize::MAX / READER / 2; + + let value = self.lock.fetch_add(READER, Ordering::Acquire); + + if value > MAX_READERS * READER { + self.lock.fetch_sub(READER, Ordering::Relaxed); + panic!("Too many lock readers, cannot safely proceed"); + } else { + value + } + } + /// Attempt to acquire this lock with shared read access. /// /// This function will never block and will return immediately if `read` @@ -269,7 +296,7 @@ impl<T: ?Sized, R> RwLock<T, R> { /// ``` #[inline] pub fn try_read(&self) -> Option<RwLockReadGuard<T>> { - let value = self.lock.fetch_add(READER, Ordering::Acquire); + let value = self.acquire_reader(); // We check the UPGRADED bit here so that new readers are prevented when an UPGRADED lock is held. // This helps reduce writer starvation. @@ -398,18 +425,18 @@ impl<T: ?Sized, R> RwLock<T, R> { } } - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the `RwLock` mutably, no actual locking needs to - /// take place -- the mutable borrow statically guarantees no locks exist. - /// - /// # Examples - /// - /// ``` - /// let mut lock = spin::RwLock::new(0); - /// *lock.get_mut() = 10; - /// assert_eq!(*lock.read(), 10); - /// ``` + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// let mut lock = spin::RwLock::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.read(), 10); + /// ``` pub fn get_mut(&mut self) -> &mut T { // We know statically that there are no other references to `self`, so // there's no need to lock the inner lock. @@ -454,8 +481,9 @@ impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock T { - let Self { data, .. } = this; - data + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } } } @@ -554,7 +582,7 @@ impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> { /// ``` pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { // Reserve the read guard for ourselves - self.inner.lock.fetch_add(READER, Ordering::Acquire); + self.inner.acquire_reader(); let inner = self.inner; @@ -580,8 +608,9 @@ impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock T { - let Self { data, .. } = this; - data + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } } } @@ -613,7 +642,7 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { #[inline] pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { // Reserve the read guard for ourselves - self.inner.lock.fetch_add(READER, Ordering::Acquire); + self.inner.acquire_reader(); let inner = self.inner; @@ -639,7 +668,10 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { /// ``` #[inline] pub fn downgrade_to_upgradeable(self) -> RwLockUpgradableGuard<'rwlock, T, R> { - debug_assert_eq!(self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), WRITER); + debug_assert_eq!( + self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), + WRITER + ); // Reserve the read guard for ourselves self.inner.lock.store(UPGRADED, Ordering::Release); @@ -670,9 +702,9 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock mut T { - let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing - core::mem::forget(this); - unsafe { &mut *data } + let mut this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &mut *this.data } } } @@ -692,7 +724,8 @@ impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } @@ -700,7 +733,8 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockUpgradableGuard<'rwlock, T, R> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } @@ -708,13 +742,15 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockWriteGuard<'rwlock, T, R> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } impl<'rwlock, T: ?Sized, R> DerefMut for RwLockWriteGuard<'rwlock, T, R> { fn deref_mut(&mut self) -> &mut T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &mut *self.data } } } @@ -741,7 +777,9 @@ impl<'rwlock, T: ?Sized, R> Drop for RwLockWriteGuard<'rwlock, T, R> { // Writer is responsible for clearing both WRITER and UPGRADED bits. // The UPGRADED bit may be set if an upgradeable lock attempts an upgrade while this lock is held. - self.inner.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release); + self.inner + .lock + .fetch_and(!(WRITER | UPGRADED), Ordering::Release); } } @@ -825,7 +863,9 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R> #[inline(always)] fn try_lock_upgradable(&self) -> bool { // Prevent guard destructor running - self.try_upgradeable_read().map(|g| core::mem::forget(g)).is_some() + self.try_upgradeable_read() + .map(|g| core::mem::forget(g)) + .is_some() } #[inline(always)] @@ -854,7 +894,10 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R> data: &(), phantom: PhantomData, }; - tmp_guard.try_upgrade().map(|g| core::mem::forget(g)).is_ok() + tmp_guard + .try_upgrade() + .map(|g| core::mem::forget(g)) + .is_ok() } } @@ -947,7 +990,7 @@ mod tests { let arc2 = arc.clone(); let (tx, rx) = channel(); - thread::spawn(move || { + let t = thread::spawn(move || { let mut lock = arc2.write(); for _ in 0..10 { let tmp = *lock; @@ -977,6 +1020,8 @@ mod tests { rx.recv().unwrap(); let lock = arc.read(); assert_eq!(*lock, 10); + + assert!(t.join().is_ok()); } #[test] |