diff options
author | Jeongik Cha <jeongik@google.com> | 2023-09-27 12:20:56 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2023-09-27 12:20:56 +0000 |
commit | f24af9efcb2f14f250d68d2389d773b8b64fc991 (patch) | |
tree | f88ae15e69b2bbf985aeb8943dbe6a3a676b9d14 | |
parent | 855eeef1fe8c65b150ea1bcda4408854411f0100 (diff) | |
parent | a6dcbfd9d0ad672e35981ab69a69528338fbb4a8 (diff) | |
download | arc-swap-f24af9efcb2f14f250d68d2389d773b8b64fc991.tar.gz |
Import arc-swap am: 7d8ef139ea am: f7e04fc25b am: 85f996155f am: 90f369a663 am: a6dcbfd9d0
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/arc-swap/+/2753492
Change-Id: I5e49e9d6a4114fb8bfa7e0277e5a784a8a4c558a
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
49 files changed, 6870 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..ddde21d --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "2a62e2bf6916c9cc8fb3832abac95b9e3287b9ec" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 0000000..f2cf6e4 --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,9 @@ +comment: + layout: "diff, flags, files" + require_changes: true + +coverage: + status: + project: + default: + informational: true diff --git a/.github/workflows/audit.yaml b/.github/workflows/audit.yaml new file mode 100644 index 0000000..82ff67f --- /dev/null +++ b/.github/workflows/audit.yaml @@ -0,0 +1,17 @@ +name: Security audit +on: + pull_request: + push: + branches: + - master + schedule: + - cron: '0 0 * * 0' + +jobs: + security_audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/audit-check@35b7b53b1e25b55642157ac01b4adceb5b9ebef3 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/benchmarks.yaml b/.github/workflows/benchmarks.yaml new file mode 100644 index 0000000..bff4402 --- /dev/null +++ b/.github/workflows/benchmarks.yaml @@ -0,0 +1,67 @@ +on: + # For some reason, this fails due to some permissions. + # pull_request: + push: + branches: + - master + # Run once a week to preserve the cache + # (even though it still feels the cache gets lost sometimes?) + # FIXME: Doesn't seem to be working. Using the GH pages thing for now. + #schedule: + # - cron: '0 0 * * 0' + +name: benchmark pull requests + +jobs: + runBenchmark: + name: run benchmark + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + default: true + profile: minimal + + - name: Restore compile cache + uses: Swatinem/rust-cache@v1 + + - name: Restore previous benchmark data + uses: actions/cache@v2 + with: + path: ./bench-cache + key: ${{ runner.os }}-benchmark + + - name: Run benchmarks + # We choose just the tracking ones. There's a whole fleet that we check + # that compile, but they are too heavy both to run in CI and to show in + # the PRs. And they mostly compare us to other methods. + # + # Provide the bencher output, as the following tool knows how to read that. + run: cargo bench --bench track -- --output-format bencher | grep -v 'Gnuplot not found' | tee benches.out + + - name: Compare benchmarks + uses: rhysd/github-action-benchmark@4eed2c2f4cd0d374720c4b913f79faa8aafcfa6b + with: + name: Track benchmarks + tool: cargo + output-file-path: benches.out + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: true + alert-threshold: '150%' + comment-on-alert: true + comment-always: true + # We don't want that to fail. Both our benchmarks and the CI are a + # bit noisy and we have quite a few measurements, so the chance of + # one failing at random is quite high. It's still nice to have it + # measured and available as a comment. + fail-on-alert: false + #external-data-json-path: ./bench-cache/benchmark-data.json + # Because it doesn't put it into the PR, it puts it into the commit :-| + alert-comment-cc-users: '@vorner' diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml new file mode 100644 index 0000000..8d8b714 --- /dev/null +++ b/.github/workflows/coverage.yaml @@ -0,0 +1,47 @@ +name: Test coverage + +on: + push: + branches: + - master + pull_request: + +env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: full + +jobs: + coverage: + name: Coverage + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + profile: minimal + default: true + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run cargo-tarpaulin + uses: actions-rs/tarpaulin@v0.1 + with: + args: '--all-features --run-types Doctests,Tests' + timeout: 120 + + - name: Upload to codecov.io + uses: codecov/codecov-action@5a8bb4701eca7ba3673f21664b887f652c58d0a3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + + - name: Archive code coverage results + uses: actions/upload-artifact@v2 + with: + name: code-coverage-report + path: cobertura.xml + retention-days: 30 diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..bde4450 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,268 @@ +name: test + +on: + push: + pull_request: + +env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: full + +jobs: + test: + name: Build & test + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + - macos-latest + - windows-latest + rust: + - stable + - beta + - nightly + # 1.45.0 # The weak-into-raw feature stabilized + # 1.31.0 is tested separately, because it is supposed to only build + + runs-on: ${{ matrix.os }} + + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + default: true + profile: minimal + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Build & test + env: + RUST_VERSION: ${{ matrix.rust }} + OS: ${{ matrix.os }} + RUSTFLAGS: -D warnings + run: cargo test --all-features + + big-tests: + name: Run the big ignored tests + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + default: true + profile: minimal + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Build & test + env: + RUSTFLAGS: -D warnings + run: cargo test --all-features --release -- --ignored + + bits32: + name: 32bit tests + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + default: true + profile: minimal + target: x86_64-unknown-linux-musl + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Build & test + env: + RUSTFLAGS: -D warnings + run: cargo test --all-features --target x86_64-unknown-linux-musl + + rustfmt: + name: Check formatting + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + default: true + components: rustfmt + + - run: cargo fmt --all -- --check + + links: + name: Check documentation links + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + default: true + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Check links + run: cargo rustdoc --all-features -- -D warnings + + clippy: + name: Clippy lints + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + default: true + components: clippy + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run clippy linter + run: cargo clippy --all --all-features --tests -- -D clippy::all -D warnings + + bench: + name: Verify benchmarks compile + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + default: true + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run clippy linter + run: cargo test --all --release --benches --all-features + + semi-ancient: + name: Check it compiles on old Rust (1.45.0) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.45.0 + profile: minimal + default: true + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run check + run: rm Cargo.lock && cargo check --all-features + + ancient: + name: Check it compiles on old Rust (1.31.0) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.31.0 + profile: minimal + default: true + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run check + run: rm Cargo.lock && cargo check + + miri: + name: Miri checks + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + profile: minimal + default: true + components: "miri" + + - name: Restore cache + uses: Swatinem/rust-cache@v1 + + - name: Run miri + env: + PROPTEST_CASES: "10" + MIRIFLAGS: "-Zmiri-disable-isolation -Zmiri-permissive-provenance" + run: cargo miri test --all-features + + thread_sanitizer-MacOS: + name: Thread Sanitizer checks MacOS + runs-on: macos-latest + + steps: + - uses: actions/checkout@v2 + - uses: actions/cache@v2 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ github.event.repository.name }}-${{ runner.os }}-cargo-thread_sanitizer-v2 + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + components: rust-src + - name: Run thread sanitizer + run: | + # Thread sanitizer isn't guaranteed to catch all data races, it can only catch it + # the data race if it happens when running the program. + # + # Running the tests multiple times increase the chances that data races are found + # by the thread sanitizer. + for _ in $(seq 1 10); do cargo +nightly test -Z build-std --target $(uname -m)-apple-darwin; done + env: + RUSTDOCFLAGS: "-Zsanitizer=thread" + RUSTFLAGS: "-Zsanitizer=thread" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b69fb7e --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +core +/target +*/target +**/*.rs.bk +tags diff --git a/Android.bp b/Android.bp new file mode 100644 index 0000000..3f9846d --- /dev/null +++ b/Android.bp @@ -0,0 +1,13 @@ +// This file is generated by cargo2android.py --run. +// Do not modify this file as changes will be overridden on upgrade. + + + +rust_library_host { + name: "libarc_swap", + crate_name: "arc_swap", + cargo_env_compat: true, + cargo_pkg_version: "1.6.0", + srcs: ["src/lib.rs"], + edition: "2018", +} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d805fca --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,227 @@ +# 1.6.0 + +* Fix a data race reported by MIRI. +* Avoid violating stacked borrows (AFAIK these are still experimental and not + normative, but better safe than sorry). (#80). +* The `AccessConvert` wrapper is needed less often in practice (#77). + +# 1.5.1 + +* bug: Insufficient synchronization on weak platforms (#76). + + Never observed in practice (it's suspected practical weak platforms like ARM + are still stronger than the model), but still technically UB. +* docs: Mention triomphe's `ThinArc` around the fat-pointer limitations. + +# 1.5.0 + +* Support serde (by a feature). + +# 1.4.0 + +* Allow const-initializing ArcSwapOption (`const_empty` method). + +# 1.3.2 + +* More helpful description of the `AsRaw` trait (isn't implemented for owned + `Arc`/`Option<Arc>`). + +# 1.3.1 + +* Cache doc improvements. + +# 1.3.0 + +* Allow mapping of DynAccess. +* Fix some lints. +* Don't leave threads running in tests/doctests. It's a bad form and annoys + miri. + +# 1.2.0 + +* Miri and 32 bit tests in CI. +* Making the writers lock-free. Soft-removing the IndependentStrategy, as it is + no longer needed (hidden and the same as the DafultStrategy). + +# 1.1.0 + +* Fix soundness bug around access::Map. Technically a breaking change, but + unlikely to bite and breaking seems to be the least bad option. #45. + +# 1.0.0 + +* Remove Clone implementation. People are often confused by it and it is easy to + emulate by hand in the rare case it is actually needed. + +# 1.0.0-rc1 + +* Get rid of the `load_signal_safe`. It only complicates things and it is niche; + signal-hook-registry has its own simplified version. +* Avoid `from_ptr(as_ptr())`. Slight change in `RefCnt::inc` which technically + is API breaking change, but this one should not matter in practice. +* Extend documentation about clone behaviour. +* Few more traits for Guard (`From<T: RefCnt>`, `Default`). +* Get rid of `rcu_unwap`, the whole concept is a trap. +* Hide the whole gen lock thing. +* Introduce the `Strategy`, as a high level way to choose how exactly the + locking happens. + - Not possible to implement by downstream users just yet, or call them. + - The CaS is its own trait for flexibility. +* Adding the SimpleGenLock experimental strategy. + - Not part of stability guarantees. + +# 0.4.7 + +* Rename the `unstable-weak` to `weak` feature. The support is now available on + 1.45 (currently in beta). + +# 0.4.6 + +* Adjust to `Weak::as_ptr` from std (the weak pointer support, relying on + unstable features). +* Support running on miri (without some optimizations), so dependencies may run + miri tests. +* Little optimization when waiting out the contention on write operations. + +# 0.4.5 + +* Added `Guard::from_inner`. + +# 0.4.4 + +* Top-level docs rewrite (less rambling, hopefully more readable). + +# 0.4.3 + +* Fix the `Display` implementation on `Guard` to correctly delegate to the + underlying `Display` implementation. + +# 0.4.2 + +* The Access functionality ‒ ability to pass a handle to subpart of held data to + somewhere with the ability to update itself. +* Mapped cache can take `FnMut` as well as `Fn`. + +# 0.4.1 + +* Mapped caches ‒ to allow giving access to parts of config only. + +# 0.4.0 + +* Support for Weak pointers. +* RefCnt implemented for Rc. +* Breaking: Big API cleanups. + - Peek is gone. + - Terminology of getting the data unified to `load`. + - There's only one kind of `Guard` now. + - Guard derefs to the `Arc`/`Option<Arc>` or similar. + - `Cache` got moved to top level of the crate. + - Several now unneeded semi-internal traits and trait methods got removed. +* Splitting benchmarks into a separate sub-crate. +* Minor documentation improvements. + +# 0.3.11 + +* Prevention against UB due to dropping Guards and overflowing the guard + counter (aborting instead, such problem is very degenerate anyway and wouldn't + work in the first place). + +# 0.3.10 + +* Tweak slot allocation to take smaller performance hit if some leases are held. +* Increase the number of lease slots per thread to 8. +* Added a cache for faster access by keeping an already loaded instance around. + +# 0.3.9 + +* Fix Send/Sync for Guard and Lease (they were broken in the safe but + uncomfortable direction ‒ not implementing them even if they could). + +# 0.3.8 + +* `Lease<Option<_>>::unwrap()`, `expect()` and `into_option()` for convenient + use. + +# 0.3.7 + +* Use the correct `#[deprecated]` syntax. + +# 0.3.6 + +* Another locking store (`PrivateSharded`) to complement the global and private + unsharded ones. +* Comparison to other crates/approaches in the docs. + +# 0.3.5 + +* Updates to documentation, made it hopefully easier to digest. +* Added the ability to separate gen-locks of one ArcSwapAny from others. +* Some speed improvements by inlining. +* Simplified the `lease` method internally, making it faster in optimistic + cases. + +# 0.3.4 + +* Another potentially weak ordering discovered (with even less practical effect + than the previous). + +# 0.3.3 + +* Increased potentially weak ordering (probably without any practical effect). + +# 0.3.2 + +* Documentation link fix. + +# 0.3.1 + +* Few convenience constructors. +* More tests (some randomized property testing). + +# 0.3.0 + +* `compare_and_swap` no longer takes `&Guard` as current as that is a sure way + to create a deadlock. +* Introduced `Lease` for temporary storage, which doesn't suffer from contention + like `load`, but doesn't block writes like `Guard`. The downside is it slows + down with number of held by the current thread. +* `compare_and_swap` and `rcu` uses leases. +* Made the `ArcSwap` as small as the pointer itself, by making the + shards/counters and generation ID global. This comes at a theoretical cost of + more contention when different threads use different instances. + +# 0.2.0 + +* Added an `ArcSwapOption`, which allows storing NULL values (as None) as well + as a valid pointer. +* `compare_and_swap` accepts borrowed `Arc` as `current` and doesn't consume one + ref count. +* Sharding internal counters, to improve performance on read-mostly contented + scenarios. +* Providing `peek_signal_safe` as the only async signal safe method to use + inside signal handlers. This removes the footgun with dropping the `Arc` + returned from `load` inside a signal handler. + +# 0.1.4 + +* The `peek` method to use the `Arc` inside without incrementing the reference + count. +* Some more (and hopefully better) benchmarks. + +# 0.1.3 + +* Documentation fix (swap is *not* lock-free in current implementation). + +# 0.1.2 + +* More freedom in the `rcu` and `rcu_unwrap` return types. + +# 0.1.1 + +* `rcu` support. +* `compare_and_swap` support. +* Added some primitive benchmarks. + +# 0.1.0 + +* Initial implementation. diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..fe2ab62 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,90 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +name = "arc-swap" +version = "1.6.0" +authors = ["Michal 'vorner' Vaner <vorner@vorner.cz>"] +description = "Atomically swappable Arc" +documentation = "https://docs.rs/arc-swap" +readme = "README.md" +keywords = [ + "atomic", + "Arc", +] +categories = [ + "data-structures", + "memory-management", +] +license = "MIT OR Apache-2.0" +repository = "https://github.com/vorner/arc-swap" + +[package.metadata.docs.rs] +all-features = true + +[profile.bench] +debug = true + +[[bench]] +name = "background" +harness = false + +[[bench]] +name = "int-access" +harness = false + +[[bench]] +name = "track" +harness = false + +[dependencies.serde] +version = "1" +features = ["rc"] +optional = true + +[dev-dependencies.adaptive-barrier] +version = "~1" + +[dev-dependencies.criterion] +version = "~0.4" + +[dev-dependencies.crossbeam-utils] +version = "~0.8" + +[dev-dependencies.itertools] +version = "0.10" + +[dev-dependencies.num_cpus] +version = "~1" + +[dev-dependencies.once_cell] +version = "~1" + +[dev-dependencies.parking_lot] +version = "~0.12" + +[dev-dependencies.proptest] +version = "1" + +[dev-dependencies.serde_derive] +version = "1.0.130" + +[dev-dependencies.serde_test] +version = "1.0.130" + +[features] +experimental-strategies = [] +internal-test-strategies = [] +weak = [] + +[badges.maintenance] +status = "actively-developed" diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..2def6fd --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,56 @@ +[package] +name = "arc-swap" +version = "1.6.0" +authors = ["Michal 'vorner' Vaner <vorner@vorner.cz>"] +description = "Atomically swappable Arc" +documentation = "https://docs.rs/arc-swap" +repository = "https://github.com/vorner/arc-swap" +readme = "README.md" +keywords = ["atomic", "Arc"] +categories = ["data-structures", "memory-management"] +license = "MIT OR Apache-2.0" +edition = "2018" + +[badges] +maintenance = { status = "actively-developed" } + +[features] +# ArcSwapWeak (for std::sycn::Weak) support +weak = [] +# Some strategies used for testing few internal cornercases. *DO NOT USE* (no stability guarantees and their performance is likely very bad). +internal-test-strategies = [] +# Possibly some strategies we are experimenting with. Currently empty. No stability guarantees are included about them. +experimental-strategies = [] + +[dependencies] +serde = { version = "1", features = ["rc"], optional = true } + +[dev-dependencies] +adaptive-barrier = "~1" +criterion = "~0.4" +crossbeam-utils = "~0.8" +itertools = "0.10" +num_cpus = "~1" +once_cell = "~1" +parking_lot = "~0.12" +proptest = "1" +serde_derive = "1.0.130" +serde_test = "1.0.130" + +[profile.bench] +debug = true + +[package.metadata.docs.rs] +all-features = true + +[[bench]] +name = "background" +harness = false + +[[bench]] +name = "int-access" +harness = false + +[[bench]] +name = "track" +harness = false @@ -0,0 +1 @@ +LICENSE-APACHE
\ No newline at end of file diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..01083d1 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,229 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--- + +Copyright (c) 2017 arc-swap developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..b4e2857 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 arc-swap developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/METADATA b/METADATA new file mode 100644 index 0000000..dae609e --- /dev/null +++ b/METADATA @@ -0,0 +1,20 @@ +name: "arc-swap" +description: "Atomically swappable Arc" +third_party { + identifier { + type: "crates.io" + value: "https://crates.io/crates/arc-swap" + } + identifier { + type: "Archive" + value: "https://static.crates.io/crates/arc-swap/arc-swap-1.6.0.crate" + } + version: "1.6.0" + # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same. + license_type: NOTICE + last_upgrade_date { + year: 2023 + month: 9 + day: 6 + } +} diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2 new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/MODULE_LICENSE_APACHE2 diff --git a/MODULE_LICENSE_MIT b/MODULE_LICENSE_MIT new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/MODULE_LICENSE_MIT @@ -0,0 +1 @@ +include platform/prebuilts/rust:master:/OWNERS diff --git a/README.md b/README.md new file mode 100644 index 0000000..5667add --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# ArcSwap + +[![Actions Status](https://github.com/vorner/arc-swap/workflows/test/badge.svg)](https://github.com/vorner/arc-swap/actions) +[![codecov](https://codecov.io/gh/vorner/arc-swap/branch/master/graph/badge.svg?token=3KA3R2D9fV)](https://codecov.io/gh/vorner/arc-swap) +[![docs](https://docs.rs/arc-swap/badge.svg)](https://docs.rs/arc-swap) + +This provides something similar to what `RwLock<Arc<T>>` is or what +`Atomic<Arc<T>>` would be if it existed, optimized for read-mostly write-seldom +scenarios, with consistent performance characteristics. + +Read [the documentation](https://docs.rs/arc-swap) before using. + +## Rust version policy + +The 1. version will build on any edition 2018 capable compiler. This does not +include: + +* Tests. Tests build and run on recent compilers, mostly because of + dependencies. +* Additional feature flags. Most feature flags are guaranteed to build since the + version they are introduced. Experimental features are without any guarantees. + +## License + +Licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally +submitted for inclusion in the work by you, as defined in the Apache-2.0 +license, shall be dual licensed as above, without any additional terms +or conditions. + +[`Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html +[`AtomicPtr`]: https://doc.rust-lang.org/std/sync/atomic/struct.AtomicPtr.html +[`ArcSwap`]: https://docs.rs/arc-swap/*/arc_swap/type.ArcSwap.html @@ -0,0 +1 @@ +* A cache without the thing inside ‒ passed to load every time. Possibly with multiple cached values. diff --git a/benches/background.rs b/benches/background.rs new file mode 100644 index 0000000..be1c2e1 --- /dev/null +++ b/benches/background.rs @@ -0,0 +1,335 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use arc_swap::{ArcSwap, ArcSwapOption, Cache}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use crossbeam_utils::thread; +use once_cell::sync::Lazy; + +// Mostly a leftover from earlier times, but it still allows one to tweak the number of ops per one +// iteration of the benchmark easily, so it's left in here. +const ITERS: usize = 1; + +macro_rules! method { + ($c: expr, $name:ident) => {{ + let mut g = $c.benchmark_group(&format!("{}_{}", NAME, stringify!($name))); + noise(&mut g, "r1", 1, 0, 0, $name); + noise(&mut g, "r3", 3, 0, 0, $name); + noise(&mut g, "l1", 0, 1, 0, $name); + noise(&mut g, "l3", 0, 3, 0, $name); + noise(&mut g, "rw", 1, 0, 1, $name); + noise(&mut g, "lw", 0, 1, 1, $name); + noise(&mut g, "w2", 0, 0, 2, $name); + g.bench_function("uncontended", |b| b.iter($name)); + g.finish(); + }}; +} + +macro_rules! noise { + () => { + use criterion::measurement::Measurement; + use criterion::BenchmarkGroup; + + use super::{thread, Arc, AtomicBool, Ordering, ITERS}; + + fn noise<M: Measurement, F: Fn()>( + g: &mut BenchmarkGroup<M>, + name: &str, + readers: usize, + leasers: usize, + writers: usize, + f: F, + ) { + let flag = Arc::new(AtomicBool::new(true)); + thread::scope(|s| { + for _ in 0..readers { + s.spawn(|_| { + while flag.load(Ordering::Relaxed) { + read(); + } + }); + } + for _ in 0..leasers { + s.spawn(|_| { + while flag.load(Ordering::Relaxed) { + lease(); + } + }); + } + for _ in 0..writers { + s.spawn(|_| { + while flag.load(Ordering::Relaxed) { + write(); + } + }); + } + g.bench_function(name, |b| b.iter(&f)); + flag.store(false, Ordering::Relaxed); + }) + .unwrap(); + } + }; +} + +macro_rules! strategy { + ($name: ident, $type: ty) => { + mod $name { + use super::*; + + static A: Lazy<$type> = Lazy::new(|| <$type>::from_pointee(0)); + const NAME: &str = stringify!($name); + + fn lease() { + for _ in 0..ITERS { + black_box(**A.load()); + } + } + + // Leases kind of degrade in performance if there are multiple on the same thread. + fn four_leases() { + for _ in 0..ITERS { + let l1 = A.load(); + let l2 = A.load(); + let l3 = A.load(); + let l4 = A.load(); + black_box((**l1, **l2, **l3, **l4)); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(A.load_full()); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(A.store(Arc::new(0))); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + method!(c, lease); + method!(c, four_leases); + } + } + }; +} + +strategy!(arc_swap_b, ArcSwap::<usize>); + +mod arc_swap_option { + use super::{black_box, ArcSwapOption, Criterion, Lazy}; + + static A: Lazy<ArcSwapOption<usize>> = Lazy::new(|| ArcSwapOption::from(None)); + const NAME: &str = "arc_swap_option"; + + fn lease() { + for _ in 0..ITERS { + black_box(A.load().as_ref().map(|l| **l).unwrap_or(0)); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(A.load_full().map(|a| -> usize { *a }).unwrap_or(0)); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(A.store(Some(Arc::new(0)))); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + method!(c, lease); + } +} + +mod arc_swap_cached { + use super::{black_box, ArcSwap, Cache, Criterion, Lazy}; + + static A: Lazy<ArcSwap<usize>> = Lazy::new(|| ArcSwap::from_pointee(0)); + const NAME: &str = "arc_swap_cached"; + + fn read() { + let mut cache = Cache::from(&A as &ArcSwap<usize>); + for _ in 0..ITERS { + black_box(Arc::clone(cache.load())); + } + } + + fn lease() { + for _ in 0..ITERS { + black_box(**A.load()); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(A.store(Arc::new(0))); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + } +} + +mod mutex { + use super::{black_box, Criterion, Lazy, Mutex}; + + static M: Lazy<Mutex<Arc<usize>>> = Lazy::new(|| Mutex::new(Arc::new(0))); + const NAME: &str = "mutex"; + + fn lease() { + for _ in 0..ITERS { + black_box(**M.lock().unwrap()); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(Arc::clone(&*M.lock().unwrap())); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(*M.lock().unwrap() = Arc::new(42)); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + } +} + +mod parking_mutex { + use parking_lot::Mutex as ParkingMutex; + + use super::{black_box, Criterion, Lazy}; + + static M: Lazy<ParkingMutex<Arc<usize>>> = Lazy::new(|| ParkingMutex::new(Arc::new(0))); + const NAME: &str = "parking_mutex"; + + fn lease() { + for _ in 0..ITERS { + black_box(**M.lock()); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(Arc::clone(&*M.lock())); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(*M.lock() = Arc::new(42)); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + } +} + +mod rwlock { + use std::sync::RwLock; + + use super::{black_box, Criterion, Lazy}; + + static L: Lazy<RwLock<Arc<usize>>> = Lazy::new(|| RwLock::new(Arc::new(0))); + const NAME: &str = "rwlock"; + + fn lease() { + for _ in 0..ITERS { + black_box(**L.read().unwrap()); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(Arc::clone(&*L.read().unwrap())); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(*L.write().unwrap() = Arc::new(42)); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + } +} + +mod parking_rwlock { + use parking_lot::RwLock; + + use super::{black_box, Criterion, Lazy}; + + static L: Lazy<RwLock<Arc<usize>>> = Lazy::new(|| RwLock::new(Arc::new(0))); + const NAME: &str = "parking_rwlock"; + + fn lease() { + for _ in 0..ITERS { + black_box(**L.read()); + } + } + + fn read() { + for _ in 0..ITERS { + black_box(Arc::clone(&*L.read())); + } + } + + fn write() { + for _ in 0..ITERS { + black_box(*L.write() = Arc::new(42)); + } + } + + noise!(); + + pub fn run_all(c: &mut Criterion) { + method!(c, read); + method!(c, write); + } +} + +criterion_group!( + benches, + arc_swap_b::run_all, + arc_swap_option::run_all, + arc_swap_cached::run_all, + mutex::run_all, + parking_mutex::run_all, + rwlock::run_all, + parking_rwlock::run_all, +); +criterion_main!(benches); diff --git a/benches/int-access.rs b/benches/int-access.rs new file mode 100644 index 0000000..78ae83a --- /dev/null +++ b/benches/int-access.rs @@ -0,0 +1,130 @@ +//! These are very minimal benchmarks ‒ reading and writing an integer shared in +//! different ways. You can compare the times and see the characteristics. + +use std::io::{self, Write}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Instant; + +use arc_swap::ArcSwap; +use criterion::black_box; +use crossbeam_utils::thread; + +fn test_run<R, W>( + name: &str, + read_threads: usize, + write_threads: usize, + iterations: usize, + r: R, + w: W, +) where + R: Fn() -> usize + Sync + Send, + W: Fn(usize) + Sync + Send, +{ + print!( + "{:20} ({} + {}) x {}: ", + name, read_threads, write_threads, iterations + ); + io::stdout().flush().unwrap(); + let before = Instant::now(); + thread::scope(|scope| { + for _ in 0..read_threads { + scope.spawn(|_| { + for _ in 0..iterations { + black_box(r()); + } + }); + } + for _ in 0..write_threads { + scope.spawn(|_| { + for i in 0..iterations { + black_box(w(i)); + } + }); + } + }) + .unwrap(); + let duration = Instant::now() - before; + println!( + "{:03}.{:03}s", + duration.as_secs(), + duration.subsec_nanos() / 100_000 + ); +} + +fn test_round<R, W>(name: &str, iterations: usize, r: R, w: W) +where + R: Fn() -> usize + Sync + Send, + W: Fn(usize) + Sync + Send, +{ + test_run(name, 1, 0, iterations, &r, &w); + test_run(name, 2, 0, iterations, &r, &w); + test_run(name, 4, 0, iterations, &r, &w); + test_run(name, 8, 0, iterations, &r, &w); + test_run(name, 1, 1, iterations, &r, &w); + test_run(name, 4, 1, iterations, &r, &w); + test_run(name, 4, 2, iterations, &r, &w); + test_run(name, 4, 4, iterations, &r, &w); + test_run(name, 8, 1, iterations, &r, &w); + test_run(name, 8, 2, iterations, &r, &w); + test_run(name, 8, 4, iterations, &r, &w); + test_run(name, 0, 1, iterations, &r, &w); + test_run(name, 0, 4, iterations, &r, &w); +} + +fn main() { + let mutex = Mutex::new(42); + test_round( + "mutex", + 100_000, + || *mutex.lock().unwrap(), + |i| *mutex.lock().unwrap() = i, + ); + let mutex = Mutex::new(Arc::new(42)); + test_round( + "mutex-arc", + 100_000, + || **mutex.lock().unwrap(), + |i| *mutex.lock().unwrap() = Arc::new(i), + ); + test_round( + "mutex-arc-clone", + 100_000, + || *Arc::clone(&*mutex.lock().unwrap()), + |i| *mutex.lock().unwrap() = Arc::new(i), + ); + let lock = RwLock::new(42); + test_round( + "rw", + 100_000, + || *lock.read().unwrap(), + |i| *lock.write().unwrap() = i, + ); + let lock = RwLock::new(Arc::new(42)); + test_round( + "rw-arc", + 100_000, + || **lock.read().unwrap(), + |i| *lock.write().unwrap() = Arc::new(i), + ); + test_round( + "rw-arc-clone", + 100_000, + || *Arc::clone(&*lock.read().unwrap()), + |i| *lock.write().unwrap() = Arc::new(i), + ); + let arc = ArcSwap::from(Arc::new(42)); + test_round( + "arc-load-store", + 100_000, + || **arc.load(), + |i| arc.store(Arc::new(i)), + ); + test_round( + "arc-rcu", + 100_000, + || *arc.load_full(), + |i| { + arc.rcu(|_| Arc::new(i)); + }, + ); +} diff --git a/benches/track.rs b/benches/track.rs new file mode 100644 index 0000000..987439a --- /dev/null +++ b/benches/track.rs @@ -0,0 +1,113 @@ +//! Benchmarks to track basic performance across changes. +//! +//! Slightly based on the <background.rs> benchmarks, but simplified and stripped down to run +//! reasonably fast. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use arc_swap::access::{Access, Map}; +use arc_swap::cache::Cache; +use arc_swap::ArcSwap; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use crossbeam_utils::thread; + +/// Execute a group of measurements +/// +/// It expects any kind of „environment“ is already in place for it. +fn batch(c: &mut Criterion, name: &str, shared_number: &ArcSwap<usize>) { + let mut g = c.benchmark_group(name); + + g.bench_function("load", |b| { + b.iter(|| { + black_box(shared_number.load()); + }) + }); + g.bench_function("load_full", |b| { + b.iter(|| { + black_box(shared_number.load_full()); + }) + }); + g.bench_function("load_many", |b| { + // Here we simulate running out of the debt slots scenario + const MANY: usize = 32; + let mut guards = Vec::with_capacity(MANY); + b.iter(|| { + guards.push(black_box(shared_number.load())); + if guards.len() == MANY { + guards.clear(); + } + }) + }); + g.bench_function("store", |b| { + b.iter(|| { + black_box(shared_number.store(Arc::new(42))); + }) + }); + g.bench_function("cache", |b| { + let mut cache = Cache::new(shared_number); + b.iter(|| { + black_box(cache.load()); + }) + }); + + g.finish(); +} + +fn with_background<F: Fn(&ArcSwap<usize>) + Sync>( + c: &mut Criterion, + name: &str, + cnt: usize, + noise: F, +) { + let stop = AtomicBool::new(false); + let shared_number = ArcSwap::from_pointee(42); + thread::scope(|s| { + // Start some background noise threads, to contend the arc swap. + for _ in 0..cnt { + s.spawn(|_| { + while !stop.load(Ordering::Relaxed) { + noise(&shared_number); + } + }); + } + + // Perform the benchmarks + batch(c, name, &shared_number); + + // Ask the threads to terminate, so they don't disturb any other banchmarks + stop.store(true, Ordering::Relaxed); + }) + .unwrap(); +} + +fn utilities(c: &mut Criterion) { + let mut g = c.benchmark_group("utilities"); + + struct Composed { + val: i32, + } + + g.bench_function("access-map", |b| { + let a = Arc::new(ArcSwap::from_pointee(Composed { val: 42 })); + let m = Map::new(Arc::clone(&a), |c: &Composed| &c.val); + b.iter(|| { + let g = black_box(m.load()); + assert_eq!(42, *g); + }); + }); +} + +fn benchmark(c: &mut Criterion) { + batch(c, "uncontended", &ArcSwap::from_pointee(42)); + with_background(c, "concurrent_loads", 2, |s| { + black_box(s.load()); + }); + with_background(c, "concurrent_store", 1, |s| { + black_box(s.store(Arc::new(42))); + }); + utilities(c); +} + +criterion_group!(benches, benchmark); +criterion_main!(benches); diff --git a/cargo.out b/cargo.out new file mode 100644 index 0000000..5ae7bd1 --- /dev/null +++ b/cargo.out @@ -0,0 +1,6 @@ +### Running: /usr/local/mp400/workspace/Android/aosp-master-with-phones/development/scripts/../../prebuilts/rust/linux-x86/1.72.0/bin/cargo -v clean --target-dir target.tmp >> ./cargo.out 2>&1 +### Running: /usr/local/mp400/workspace/Android/aosp-master-with-phones/development/scripts/../../prebuilts/rust/linux-x86/1.72.0/bin/cargo -v build --target x86_64-unknown-linux-gnu --target-dir target.tmp >> ./cargo.out 2>&1 + Updating crates.io index + Compiling arc-swap v1.6.0 (/usr/local/mp400/workspace/Android/aosp-master-with-phones/external/rust/crates/arc-swap) + Running `rustc --crate-name arc_swap --edition=2018 src/lib.rs --error-format=json --json=diagnostic-rendered-ansi,artifacts,future-incompat --crate-type lib --emit=dep-info,metadata,link -C embed-bitcode=no -C debuginfo=2 -C metadata=0a86af8a3b21ad51 -C extra-filename=-0a86af8a3b21ad51 --out-dir /usr/local/mp400/workspace/Android/aosp-master-with-phones/external/rust/crates/arc-swap/target.tmp/x86_64-unknown-linux-gnu/debug/deps --target x86_64-unknown-linux-gnu -C incremental=/usr/local/mp400/workspace/Android/aosp-master-with-phones/external/rust/crates/arc-swap/target.tmp/x86_64-unknown-linux-gnu/debug/incremental -L dependency=/usr/local/mp400/workspace/Android/aosp-master-with-phones/external/rust/crates/arc-swap/target.tmp/x86_64-unknown-linux-gnu/debug/deps -L dependency=/usr/local/mp400/workspace/Android/aosp-master-with-phones/external/rust/crates/arc-swap/target.tmp/debug/deps` + Finished dev [unoptimized + debuginfo] target(s) in 6.01s diff --git a/ci-check.sh b/ci-check.sh new file mode 100755 index 0000000..2a99dd1 --- /dev/null +++ b/ci-check.sh @@ -0,0 +1,16 @@ +#!/bin/sh + +set -ex + +rm -f Cargo.lock +cargo build + +if [ "$RUST_VERSION" = 1.31.0 ] ; then + exit +fi + +# Allow some warnings on the very old compiler. +export RUSTFLAGS="-D warnings" + +cargo test --release --all-features +cargo test --release --all-features -- --ignored diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/rustfmt.toml diff --git a/src/access.rs b/src/access.rs new file mode 100644 index 0000000..c50e20d --- /dev/null +++ b/src/access.rs @@ -0,0 +1,544 @@ +#![deny(unsafe_code)] + +//! Abstracting over accessing parts of stored value. +//! +//! Sometimes, there's a big globalish data structure (like a configuration for the whole program). +//! Then there are parts of the program that need access to up-to-date version of their *part* of +//! the configuration, but for reasons of code separation and reusability, it is not desirable to +//! pass the whole configuration to each of the parts. +//! +//! This module provides means to grant the parts access to the relevant subsets of such global +//! data structure while masking the fact it is part of the bigger whole from the component. +//! +//! Note that the [`cache`][crate::cache] module has its own [`Access`][crate::cache::Access] trait +//! that serves a similar purpose, but with cached access. The signatures are different, therefore +//! an incompatible trait. +//! +//! # The general idea +//! +//! Each part of the code accepts generic [`Access<T>`][Access] for the `T` of its interest. This +//! provides means to load current version of the structure behind the scenes and get only the +//! relevant part, without knowing what the big structure is. +//! +//! For technical reasons, the [`Access`] trait is not object safe. If type erasure is desired, it +//! is possible use the [`DynAccess`][crate::access::DynAccess] instead, which is object safe, but +//! slightly slower. +//! +//! For some cases, it is possible to use [`ArcSwapAny::map`]. If that is not flexible enough, the +//! [`Map`] type can be created directly. +//! +//! Note that the [`Access`] trait is also implemented for [`ArcSwapAny`] itself. Additionally, +//! there's the [`Constant`][crate::access::Constant] helper type, which is useful mostly for +//! testing (it doesn't allow reloading). +//! +//! # Performance +//! +//! In general, these utilities use [`ArcSwapAny::load`] internally and then apply the provided +//! transformation. This has several consequences: +//! +//! * Limitations of the [`load`][ArcSwapAny::load] apply ‒ including the recommendation to not +//! hold the returned guard object for too long, but long enough to get consistency. +//! * The transformation should be cheap ‒ optimally just borrowing into the structure. +//! +//! # Examples +//! +//! ```rust +//! use std::sync::Arc; +//! use std::thread::{self, JoinHandle}; +//! use std::time::Duration; +//! +//! use arc_swap::ArcSwap; +//! use arc_swap::access::{Access, Constant, Map}; +//! +//! fn work_with_usize<A: Access<usize> + Send + 'static>(a: A) -> JoinHandle<()> { +//! thread::spawn(move || { +//! let mut value = 0; +//! while value != 42 { +//! let guard = a.load(); +//! value = *guard; +//! println!("{}", value); +//! // Not strictly necessary, but dropping the guard can free some resources, like +//! // slots for tracking what values are still in use. We do it before the sleeping, +//! // not at the end of the scope. +//! drop(guard); +//! thread::sleep(Duration::from_millis(50)); +//! } +//! }) +//! } +//! +//! // Passing the whole thing directly +//! // (If we kept another Arc to it, we could change the value behind the scenes) +//! work_with_usize(Arc::new(ArcSwap::from_pointee(42))).join().unwrap(); +//! +//! // Passing a subset of a structure +//! struct Cfg { +//! value: usize, +//! } +//! +//! let cfg = Arc::new(ArcSwap::from_pointee(Cfg { value: 0 })); +//! let thread = work_with_usize(Map::new(Arc::clone(&cfg), |cfg: &Cfg| &cfg.value)); +//! cfg.store(Arc::new(Cfg { value: 42 })); +//! thread.join().unwrap(); +//! +//! // Passing a constant that can't change. Useful mostly for testing purposes. +//! work_with_usize(Constant(42)).join().unwrap(); +//! ``` + +use std::marker::PhantomData; +use std::ops::Deref; +use std::rc::Rc; +use std::sync::Arc; + +use super::ref_cnt::RefCnt; +use super::strategy::Strategy; +use super::{ArcSwapAny, Guard}; + +/// Abstracts over ways code can get access to a value of type `T`. +/// +/// This is the trait that parts of code will use when accessing a subpart of the big data +/// structure. See the [module documentation](index.html) for details. +pub trait Access<T> { + /// A guard object containing the value and keeping it alive. + /// + /// For technical reasons, the library doesn't allow direct access into the stored value. A + /// temporary guard object must be loaded, that keeps the actual value alive for the time of + /// use. + type Guard: Deref<Target = T>; + + /// The loading method. + /// + /// This returns the guard that holds the actual value. Should be called anew each time a fresh + /// value is needed. + fn load(&self) -> Self::Guard; +} + +impl<T, A: Access<T> + ?Sized, P: Deref<Target = A>> Access<T> for P { + type Guard = A::Guard; + fn load(&self) -> Self::Guard { + self.deref().load() + } +} + +impl<T> Access<T> for dyn DynAccess<T> + '_ { + type Guard = DynGuard<T>; + + fn load(&self) -> Self::Guard { + self.load() + } +} + +impl<T> Access<T> for dyn DynAccess<T> + '_ + Send { + type Guard = DynGuard<T>; + + fn load(&self) -> Self::Guard { + self.load() + } +} + +impl<T> Access<T> for dyn DynAccess<T> + '_ + Sync + Send { + type Guard = DynGuard<T>; + + fn load(&self) -> Self::Guard { + self.load() + } +} + +impl<T: RefCnt, S: Strategy<T>> Access<T> for ArcSwapAny<T, S> { + type Guard = Guard<T, S>; + + fn load(&self) -> Self::Guard { + self.load() + } +} + +#[derive(Debug)] +#[doc(hidden)] +pub struct DirectDeref<T: RefCnt, S: Strategy<T>>(Guard<T, S>); + +impl<T, S: Strategy<Arc<T>>> Deref for DirectDeref<Arc<T>, S> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref().deref() + } +} + +impl<T, S: Strategy<Arc<T>>> Access<T> for ArcSwapAny<Arc<T>, S> { + type Guard = DirectDeref<Arc<T>, S>; + fn load(&self) -> Self::Guard { + DirectDeref(self.load()) + } +} + +impl<T, S: Strategy<Rc<T>>> Deref for DirectDeref<Rc<T>, S> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref().deref() + } +} + +impl<T, S: Strategy<Rc<T>>> Access<T> for ArcSwapAny<Rc<T>, S> { + type Guard = DirectDeref<Rc<T>, S>; + fn load(&self) -> Self::Guard { + DirectDeref(self.load()) + } +} + +#[doc(hidden)] +pub struct DynGuard<T: ?Sized>(Box<dyn Deref<Target = T>>); + +impl<T: ?Sized> Deref for DynGuard<T> { + type Target = T; + fn deref(&self) -> &T { + &self.0 + } +} + +/// An object-safe version of the [`Access`] trait. +/// +/// This can be used instead of the [`Access`] trait in case a type erasure is desired. This has +/// the effect of performance hit (due to boxing of the result and due to dynamic dispatch), but +/// makes certain code simpler and possibly makes the executable smaller. +/// +/// This is automatically implemented for everything that implements [`Access`]. +/// +/// # Examples +/// +/// ```rust +/// use arc_swap::access::{Constant, DynAccess}; +/// +/// fn do_something(value: Box<dyn DynAccess<usize> + Send>) { +/// let v = value.load(); +/// println!("{}", *v); +/// } +/// +/// do_something(Box::new(Constant(42))); +/// ``` +pub trait DynAccess<T> { + /// The equivalent of [`Access::load`]. + fn load(&self) -> DynGuard<T>; +} + +impl<T, A> DynAccess<T> for A +where + A: Access<T>, + A::Guard: 'static, +{ + fn load(&self) -> DynGuard<T> { + DynGuard(Box::new(Access::load(self))) + } +} + +/// [DynAccess] to [Access] wrapper. +/// +/// In previous versions, `Box<dyn DynAccess>` didn't implement [Access], to use inside [Map] one +/// could use this wrapper. Since then, a way was found to solve it. In most cases, this wrapper is +/// no longer necessary. +/// +/// This is left in place for two reasons: +/// * Backwards compatibility. +/// * Corner-cases not covered by the found solution. For example, trait inheritance in the form of +/// `Box<dyn SomeTrait>` where `SomeTrait: Access` doesn't work out of the box and still needs +/// this wrapper. +/// +/// # Examples +/// +/// The example is for the simple case (which is no longer needed, but may help as an inspiration). +/// +/// ```rust +/// use std::sync::Arc; +/// +/// use arc_swap::ArcSwap; +/// use arc_swap::access::{AccessConvert, DynAccess, Map}; +/// +/// struct Inner { +/// val: usize, +/// } +/// +/// struct Middle { +/// inner: Inner, +/// } +/// +/// struct Outer { +/// middle: Middle, +/// } +/// +/// let outer = Arc::new(ArcSwap::from_pointee(Outer { +/// middle: Middle { +/// inner: Inner { +/// val: 42, +/// } +/// } +/// })); +/// +/// let middle: Arc<dyn DynAccess<Middle>> = +/// Arc::new(Map::new(outer, |outer: &Outer| &outer.middle)); +/// let inner: Arc<dyn DynAccess<Inner>> = +/// Arc::new(Map::new(AccessConvert(middle), |middle: &Middle| &middle.inner)); +/// let guard = inner.load(); +/// assert_eq!(42, guard.val); +/// ``` +pub struct AccessConvert<D>(pub D); + +impl<T, D> Access<T> for AccessConvert<D> +where + D: Deref, + D::Target: DynAccess<T>, +{ + type Guard = DynGuard<T>; + + fn load(&self) -> Self::Guard { + self.0.load() + } +} + +#[doc(hidden)] +#[derive(Copy, Clone, Debug)] +pub struct MapGuard<G, F, T, R> { + guard: G, + projection: F, + _t: PhantomData<fn(&T) -> &R>, +} + +impl<G, F, T, R> Deref for MapGuard<G, F, T, R> +where + G: Deref<Target = T>, + F: Fn(&T) -> &R, +{ + type Target = R; + fn deref(&self) -> &R { + (self.projection)(&self.guard) + } +} + +/// An adaptor to provide access to a part of larger structure. +/// +/// This is the *active* part of this module. Use the [module documentation](index.html) for the +/// details. +#[derive(Copy, Clone, Debug)] +pub struct Map<A, T, F> { + access: A, + projection: F, + _t: PhantomData<fn() -> T>, +} + +impl<A, T, F> Map<A, T, F> { + /// Creates a new instance. + /// + /// # Parameters + /// + /// * `access`: Access to the bigger structure. This is usually something like `Arc<ArcSwap>` + /// or `&ArcSwap`. It is technically possible to use any other [`Access`] here, though, for + /// example to sub-delegate into even smaller structure from a [`Map`] (or generic + /// [`Access`]). + /// * `projection`: A function (or closure) responsible to providing a reference into the + /// bigger bigger structure, selecting just subset of it. In general, it is expected to be + /// *cheap* (like only taking reference). + pub fn new<R>(access: A, projection: F) -> Self + where + F: Fn(&T) -> &R + Clone, + { + Map { + access, + projection, + _t: PhantomData, + } + } +} + +impl<A, F, T, R> Access<R> for Map<A, T, F> +where + A: Access<T>, + F: Fn(&T) -> &R + Clone, +{ + type Guard = MapGuard<A::Guard, F, T, R>; + fn load(&self) -> Self::Guard { + let guard = self.access.load(); + MapGuard { + guard, + projection: self.projection.clone(), + _t: PhantomData, + } + } +} + +#[doc(hidden)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct ConstantDeref<T>(T); + +impl<T> Deref for ConstantDeref<T> { + type Target = T; + fn deref(&self) -> &T { + &self.0 + } +} + +/// Access to an constant. +/// +/// This wraps a constant value to provide [`Access`] to it. It is constant in the sense that, +/// unlike [`ArcSwapAny`] and [`Map`], the loaded value will always stay the same (there's no +/// remote `store`). +/// +/// The purpose is mostly testing and plugging a parameter that works generically from code that +/// doesn't need the updating functionality. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct Constant<T>(pub T); + +impl<T: Clone> Access<T> for Constant<T> { + type Guard = ConstantDeref<T>; + fn load(&self) -> Self::Guard { + ConstantDeref(self.0.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::super::{ArcSwap, ArcSwapOption}; + + use super::*; + + fn check_static_dispatch_direct<A: Access<usize>>(a: A) { + assert_eq!(42, *a.load()); + } + + fn check_static_dispatch<A: Access<Arc<usize>>>(a: A) { + assert_eq!(42, **a.load()); + } + + /// Tests dispatching statically from arc-swap works + #[test] + fn static_dispatch() { + let a = ArcSwap::from_pointee(42); + check_static_dispatch_direct(&a); + check_static_dispatch(&a); + check_static_dispatch(a); + } + + fn check_dyn_dispatch_direct(a: &dyn DynAccess<usize>) { + assert_eq!(42, *a.load()); + } + + fn check_dyn_dispatch(a: &dyn DynAccess<Arc<usize>>) { + assert_eq!(42, **a.load()); + } + + /// Tests we can also do a dynamic dispatch of the companion trait + #[test] + fn dyn_dispatch() { + let a = ArcSwap::from_pointee(42); + check_dyn_dispatch_direct(&a); + check_dyn_dispatch(&a); + } + + fn check_transition<A>(a: A) + where + A: Access<usize>, + A::Guard: 'static, + { + check_dyn_dispatch_direct(&a) + } + + /// Tests we can easily transition from the static dispatch trait to the dynamic one + #[test] + fn transition() { + let a = ArcSwap::from_pointee(42); + check_transition(&a); + check_transition(a); + } + + /// Test we can dispatch from Arc<ArcSwap<_>> or similar. + #[test] + fn indirect() { + let a = Arc::new(ArcSwap::from_pointee(42)); + check_static_dispatch(&a); + check_dyn_dispatch(&a); + } + + struct Cfg { + value: usize, + } + + #[test] + fn map() { + let a = ArcSwap::from_pointee(Cfg { value: 42 }); + let map = a.map(|a: &Cfg| &a.value); + check_static_dispatch_direct(&map); + check_dyn_dispatch_direct(&map); + } + + #[test] + fn map_option_some() { + let a = ArcSwapOption::from_pointee(Cfg { value: 42 }); + let map = a.map(|a: &Option<Arc<Cfg>>| a.as_ref().map(|c| &c.value).unwrap()); + check_static_dispatch_direct(&map); + check_dyn_dispatch_direct(&map); + } + + #[test] + fn map_option_none() { + let a = ArcSwapOption::empty(); + let map = a.map(|a: &Option<Arc<Cfg>>| a.as_ref().map(|c| &c.value).unwrap_or(&42)); + check_static_dispatch_direct(&map); + check_dyn_dispatch_direct(&map); + } + + #[test] + fn constant() { + let c = Constant(42); + check_static_dispatch_direct(c); + check_dyn_dispatch_direct(&c); + check_static_dispatch_direct(c); + } + + #[test] + fn map_reload() { + let a = ArcSwap::from_pointee(Cfg { value: 0 }); + let map = a.map(|cfg: &Cfg| &cfg.value); + assert_eq!(0, *Access::load(&map)); + a.store(Arc::new(Cfg { value: 42 })); + assert_eq!(42, *Access::load(&map)); + } + + // Compile tests for dynamic access + fn _expect_access<T>(_: impl Access<T>) {} + + fn _dyn_access<T>(x: Box<dyn DynAccess<T> + '_>) { + _expect_access(x) + } + + fn _dyn_access_send<T>(x: Box<dyn DynAccess<T> + '_ + Send>) { + _expect_access(x) + } + + fn _dyn_access_send_sync<T>(x: Box<dyn DynAccess<T> + '_ + Send + Sync>) { + _expect_access(x) + } + + #[test] + fn double_dyn_access_complex() { + struct Inner { + val: usize, + } + + struct Middle { + inner: Inner, + } + + struct Outer { + middle: Middle, + } + + let outer = Arc::new(ArcSwap::from_pointee(Outer { + middle: Middle { + inner: Inner { val: 42 }, + }, + })); + + let middle: Arc<dyn DynAccess<Middle>> = + Arc::new(Map::new(outer, |outer: &Outer| &outer.middle)); + let inner: Arc<dyn DynAccess<Inner>> = + Arc::new(Map::new(middle, |middle: &Middle| &middle.inner)); + // Damn. We have the DynAccess wrapper in scope and need to disambiguate the inner.load() + let guard = Access::load(&inner); + assert_eq!(42, guard.val); + } +} diff --git a/src/as_raw.rs b/src/as_raw.rs new file mode 100644 index 0000000..f7bb169 --- /dev/null +++ b/src/as_raw.rs @@ -0,0 +1,72 @@ +use super::{Guard, RefCnt}; + +mod sealed { + pub trait Sealed {} +} + +use self::sealed::Sealed; + +/// A trait describing things that can be turned into a raw pointer. +/// +/// This is just an abstraction of things that can be passed to the +/// [`compare_and_swap`](struct.ArcSwapAny.html#method.compare_and_swap). +/// +/// # Examples +/// +/// ``` +/// use std::ptr; +/// use std::sync::Arc; +/// +/// use arc_swap::ArcSwapOption; +/// +/// let a = Arc::new(42); +/// let shared = ArcSwapOption::from(Some(Arc::clone(&a))); +/// +/// shared.compare_and_swap(&a, Some(Arc::clone(&a))); +/// shared.compare_and_swap(&None::<Arc<_>>, Some(Arc::clone(&a))); +/// shared.compare_and_swap(shared.load(), Some(Arc::clone(&a))); +/// shared.compare_and_swap(&shared.load(), Some(Arc::clone(&a))); +/// shared.compare_and_swap(ptr::null(), Some(Arc::clone(&a))); +/// ``` +/// +/// Due to technical limitation, this is not implemented for owned `Arc`/`Option<Arc<_>>`, they +/// need to be borrowed. +pub trait AsRaw<T>: Sealed { + /// Converts the value into a raw pointer. + fn as_raw(&self) -> *mut T; +} + +impl<'a, T: RefCnt> Sealed for &'a T {} +impl<'a, T: RefCnt> AsRaw<T::Base> for &'a T { + fn as_raw(&self) -> *mut T::Base { + T::as_ptr(self) + } +} + +impl<'a, T: RefCnt> Sealed for &'a Guard<T> {} +impl<'a, T: RefCnt> AsRaw<T::Base> for &'a Guard<T> { + fn as_raw(&self) -> *mut T::Base { + T::as_ptr(self) + } +} + +impl<T: RefCnt> Sealed for Guard<T> {} +impl<T: RefCnt> AsRaw<T::Base> for Guard<T> { + fn as_raw(&self) -> *mut T::Base { + T::as_ptr(self) + } +} + +impl<T> Sealed for *mut T {} +impl<T> AsRaw<T> for *mut T { + fn as_raw(&self) -> *mut T { + *self + } +} + +impl<T> Sealed for *const T {} +impl<T> AsRaw<T> for *const T { + fn as_raw(&self) -> *mut T { + *self as *mut T + } +} diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..086bb62 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,343 @@ +#![deny(unsafe_code)] + +//! Caching handle into the [ArcSwapAny]. +//! +//! The [Cache] keeps a copy of the internal [Arc] for faster access. +//! +//! [Arc]: std::sync::Arc + +use std::ops::Deref; +use std::sync::atomic::Ordering; + +use super::ref_cnt::RefCnt; +use super::strategy::Strategy; +use super::ArcSwapAny; + +/// Generalization of caches providing access to `T`. +/// +/// This abstracts over all kinds of caches that can provide a cheap access to values of type `T`. +/// This is useful in cases where some code doesn't care if the `T` is the whole structure or just +/// a part of it. +/// +/// See the example at [`Cache::map`]. +pub trait Access<T> { + /// Loads the value from cache. + /// + /// This revalidates the value in the cache, then provides the access to the cached value. + fn load(&mut self) -> &T; +} + +/// Caching handle for [`ArcSwapAny`][ArcSwapAny]. +/// +/// Instead of loading the [`Arc`][Arc] on every request from the shared storage, this keeps +/// another copy inside itself. Upon request it only cheaply revalidates it is up to +/// date. If it is, access is significantly faster. If it is stale, the [load_full] is done and the +/// cache value is replaced. Under a read-heavy loads, the measured speedup are 10-25 times, +/// depending on the architecture. +/// +/// There are, however, downsides: +/// +/// * The handle needs to be kept around by the caller (usually, one per thread). This is fine if +/// there's one global `ArcSwapAny`, but starts being tricky with eg. data structures build from +/// them. +/// * As it keeps a copy of the [Arc] inside the cache, the old value may be kept alive for longer +/// period of time ‒ it is replaced by the new value on [load][Cache::load]. You may not want to +/// use this if dropping the old value in timely manner is important (possibly because of +/// releasing large amount of RAM or because of closing file handles). +/// +/// # Examples +/// +/// ```rust +/// # fn do_something<V>(_v: V) { } +/// use std::sync::Arc; +/// use std::sync::atomic::{AtomicBool, Ordering}; +/// +/// use arc_swap::{ArcSwap, Cache}; +/// +/// let shared = Arc::new(ArcSwap::from_pointee(42)); +/// # let mut threads = Vec::new(); +/// let terminate = Arc::new(AtomicBool::new(false)); +/// // Start 10 worker threads... +/// for _ in 0..10 { +/// let mut cache = Cache::new(Arc::clone(&shared)); +/// let terminate = Arc::clone(&terminate); +/// # let thread = +/// std::thread::spawn(move || { +/// // Keep loading it like mad.. +/// while !terminate.load(Ordering::Relaxed) { +/// let value = cache.load(); +/// do_something(value); +/// } +/// }); +/// # threads.push(thread); +/// } +/// shared.store(Arc::new(12)); +/// # terminate.store(true, Ordering::Relaxed); +/// # for thread in threads { thread.join().unwrap() } +/// ``` +/// +/// Another one with using a thread local storage and explicit types: +/// +/// ```rust +/// # use std::sync::Arc; +/// # use std::ops::Deref; +/// # use std::cell::RefCell; +/// # +/// # use arc_swap::ArcSwap; +/// # use arc_swap::cache::Cache; +/// # use once_cell::sync::Lazy; +/// # +/// # #[derive(Debug, Default)] +/// # struct Config; +/// # +/// static CURRENT_CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config::default())); +/// +/// thread_local! { +/// static CACHE: RefCell<Cache<&'static ArcSwap<Config>, Arc<Config>>> = RefCell::new(Cache::from(CURRENT_CONFIG.deref())); +/// } +/// +/// CACHE.with(|c| { +/// // * RefCell needed, because load on cache is `&mut`. +/// // * You want to operate inside the `with` ‒ cloning the Arc is comparably expensive as +/// // ArcSwap::load itself and whatever you'd save by the cache would be lost on that. +/// println!("{:?}", c.borrow_mut().load()); +/// }); +/// ``` +/// +/// [Arc]: std::sync::Arc +/// [load_full]: ArcSwapAny::load_full +#[derive(Clone, Debug)] +pub struct Cache<A, T> { + arc_swap: A, + cached: T, +} + +impl<A, T, S> Cache<A, T> +where + A: Deref<Target = ArcSwapAny<T, S>>, + T: RefCnt, + S: Strategy<T>, +{ + /// Creates a new caching handle. + /// + /// The parameter is something dereferencing into an [`ArcSwapAny`] (eg. either to [`ArcSwap`] + /// or [`ArcSwapOption`]). That can be [`ArcSwapAny`] itself, but that's not very useful. But + /// it also can be a reference to it or `Arc`, which makes it possible to share the + /// [`ArcSwapAny`] with multiple caches or access it in non-cached way too. + /// + /// [`ArcSwapOption`]: crate::ArcSwapOption + /// [`ArcSwap`]: crate::ArcSwap + pub fn new(arc_swap: A) -> Self { + let cached = arc_swap.load_full(); + Self { arc_swap, cached } + } + + /// Gives access to the (possibly shared) cached [`ArcSwapAny`]. + pub fn arc_swap(&self) -> &A::Target { + &self.arc_swap + } + + /// Loads the currently held value. + /// + /// This first checks if the cached value is up to date. This check is very cheap. + /// + /// If it is up to date, the cached value is simply returned without additional costs. If it is + /// outdated, a load is done on the underlying shared storage. The newly loaded value is then + /// stored in the cache and returned. + #[inline] + pub fn load(&mut self) -> &T { + self.revalidate(); + self.load_no_revalidate() + } + + #[inline] + fn load_no_revalidate(&self) -> &T { + &self.cached + } + + #[inline] + fn revalidate(&mut self) { + let cached_ptr = RefCnt::as_ptr(&self.cached); + // Node: Relaxed here is fine. We do not synchronize any data through this, we already have + // it synchronized in self.cache. We just want to check if it changed, if it did, the + // load_full will be responsible for any synchronization needed. + let shared_ptr = self.arc_swap.ptr.load(Ordering::Relaxed); + if cached_ptr != shared_ptr { + self.cached = self.arc_swap.load_full(); + } + } + + /// Turns this cache into a cache with a projection inside the cached value. + /// + /// You'd use this in case when some part of code needs access to fresh values of `U`, however + /// a bigger structure containing `U` is provided by this cache. The possibility of giving the + /// whole structure to the part of the code falls short in terms of reusability (the part of + /// the code could be used within multiple contexts, each with a bigger different structure + /// containing `U`) and code separation (the code shouldn't needs to know about the big + /// structure). + /// + /// # Warning + /// + /// As the provided `f` is called inside every [`load`][Access::load], this one should be + /// cheap. Most often it is expected to be just a closure taking reference of some inner field. + /// + /// For the same reasons, it should not have side effects and should never panic (these will + /// not break Rust's safety rules, but might produce behaviour you don't expect). + /// + /// # Examples + /// + /// ```rust + /// use arc_swap::ArcSwap; + /// use arc_swap::cache::{Access, Cache}; + /// + /// struct InnerCfg { + /// answer: usize, + /// } + /// + /// struct FullCfg { + /// inner: InnerCfg, + /// } + /// + /// fn use_inner<A: Access<InnerCfg>>(cache: &mut A) { + /// let value = cache.load(); + /// println!("The answer is: {}", value.answer); + /// } + /// + /// let full_cfg = ArcSwap::from_pointee(FullCfg { + /// inner: InnerCfg { + /// answer: 42, + /// } + /// }); + /// let cache = Cache::new(&full_cfg); + /// use_inner(&mut cache.map(|full| &full.inner)); + /// + /// let inner_cfg = ArcSwap::from_pointee(InnerCfg { answer: 24 }); + /// let mut inner_cache = Cache::new(&inner_cfg); + /// use_inner(&mut inner_cache); + /// ``` + pub fn map<F, U>(self, f: F) -> MapCache<A, T, F> + where + F: FnMut(&T) -> &U, + { + MapCache { + inner: self, + projection: f, + } + } +} + +impl<A, T, S> Access<T::Target> for Cache<A, T> +where + A: Deref<Target = ArcSwapAny<T, S>>, + T: Deref<Target = <T as RefCnt>::Base> + RefCnt, + S: Strategy<T>, +{ + fn load(&mut self) -> &T::Target { + self.load().deref() + } +} + +impl<A, T, S> From<A> for Cache<A, T> +where + A: Deref<Target = ArcSwapAny<T, S>>, + T: RefCnt, + S: Strategy<T>, +{ + fn from(arc_swap: A) -> Self { + Self::new(arc_swap) + } +} + +/// An implementation of a cache with a projection into the accessed value. +/// +/// This is the implementation structure for [`Cache::map`]. It can't be created directly and it +/// should be used through the [`Access`] trait. +#[derive(Clone, Debug)] +pub struct MapCache<A, T, F> { + inner: Cache<A, T>, + projection: F, +} + +impl<A, T, S, F, U> Access<U> for MapCache<A, T, F> +where + A: Deref<Target = ArcSwapAny<T, S>>, + T: RefCnt, + S: Strategy<T>, + F: FnMut(&T) -> &U, +{ + fn load(&mut self) -> &U { + (self.projection)(self.inner.load()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::{ArcSwap, ArcSwapOption}; + + #[test] + fn cached_value() { + let a = ArcSwap::from_pointee(42); + let mut c1 = Cache::new(&a); + let mut c2 = Cache::new(&a); + + assert_eq!(42, **c1.load()); + assert_eq!(42, **c2.load()); + + a.store(Arc::new(43)); + assert_eq!(42, **c1.load_no_revalidate()); + assert_eq!(43, **c1.load()); + } + + #[test] + fn cached_through_arc() { + let a = Arc::new(ArcSwap::from_pointee(42)); + let mut c = Cache::new(Arc::clone(&a)); + assert_eq!(42, **c.load()); + a.store(Arc::new(0)); + drop(a); // A is just one handle, the ArcSwap is kept alive by the cache. + } + + #[test] + fn cache_option() { + let a = ArcSwapOption::from_pointee(42); + let mut c = Cache::new(&a); + + assert_eq!(42, **c.load().as_ref().unwrap()); + a.store(None); + assert!(c.load().is_none()); + } + + struct Inner { + answer: usize, + } + + struct Outer { + inner: Inner, + } + + #[test] + fn map_cache() { + let a = ArcSwap::from_pointee(Outer { + inner: Inner { answer: 42 }, + }); + + let mut cache = Cache::new(&a); + let mut inner = cache.clone().map(|outer| &outer.inner); + let mut answer = cache.clone().map(|outer| &outer.inner.answer); + + assert_eq!(42, cache.load().inner.answer); + assert_eq!(42, inner.load().answer); + assert_eq!(42, *answer.load()); + + a.store(Arc::new(Outer { + inner: Inner { answer: 24 }, + })); + + assert_eq!(24, cache.load().inner.answer); + assert_eq!(24, inner.load().answer); + assert_eq!(24, *answer.load()); + } +} diff --git a/src/compile_fail_tests.rs b/src/compile_fail_tests.rs new file mode 100644 index 0000000..c56bbb7 --- /dev/null +++ b/src/compile_fail_tests.rs @@ -0,0 +1,93 @@ +// The doc tests allow us to do a compile_fail test, which is cool and what we want, but we don't +// want to expose this in the docs, so we use a private struct for that reason. +// +// Note we also bundle one that *does* compile with each, just to make sure they don't silently +// not-compile by some different reason. +//! ```rust,compile_fail +//! let shared = arc_swap::ArcSwap::from_pointee(std::cell::Cell::new(42)); +//! std::thread::spawn(|| { +//! drop(shared); +//! }); +//! ``` +//! +//! ```rust +//! let shared = arc_swap::ArcSwap::from_pointee(42); +//! std::thread::spawn(|| { +//! drop(shared); +//! }) +//! .join() +//! .unwrap(); +//! ``` +//! +//! ```rust,compile_fail +//! let shared = arc_swap::ArcSwap::from_pointee(std::cell::Cell::new(42)); +//! let guard = shared.load(); +//! std::thread::spawn(|| { +//! drop(guard); +//! }); +//! ``` +//! +//! ```rust +//! let shared = arc_swap::ArcSwap::from_pointee(42); +//! let guard = shared.load(); +//! std::thread::spawn(|| { +//! drop(guard); +//! }) +//! .join() +//! .unwrap(); +//! ``` +//! +//! ```rust,compile_fail +//! let shared = arc_swap::ArcSwap::from_pointee(std::cell::Cell::new(42)); +//! crossbeam_utils::thread::scope(|scope| { +//! scope.spawn(|_| { +//! let _ = &shared; +//! }); +//! }).unwrap(); +//! ``` +//! +//! ```rust +//! let shared = arc_swap::ArcSwap::from_pointee(42); +//! crossbeam_utils::thread::scope(|scope| { +//! scope.spawn(|_| { +//! let _ = &shared; +//! }); +//! }).unwrap(); +//! ``` +//! +//! ```rust,compile_fail +//! let shared = arc_swap::ArcSwap::from_pointee(std::cell::Cell::new(42)); +//! let guard = shared.load(); +//! crossbeam_utils::thread::scope(|scope| { +//! scope.spawn(|_| { +//! let _ = &guard; +//! }); +//! }).unwrap(); +//! ``` +//! +//! ```rust +//! let shared = arc_swap::ArcSwap::from_pointee(42); +//! let guard = shared.load(); +//! crossbeam_utils::thread::scope(|scope| { +//! scope.spawn(|_| { +//! let _ = &guard; +//! }); +//! }).unwrap(); +//! ``` +//! +//! See that `ArcSwapAny<Rc>` really isn't Send. +//! ```rust +//! use std::sync::Arc; +//! use arc_swap::ArcSwapAny; +//! +//! let a: ArcSwapAny<Arc<usize>> = ArcSwapAny::new(Arc::new(42)); +//! std::thread::spawn(move || drop(a)).join().unwrap(); +//! ``` +//! +//! ```rust,compile_fail +//! use std::rc::Rc; +//! use arc_swap::ArcSwapAny; +//! +//! let a: ArcSwapAny<Rc<usize>> = ArcSwapAny::new(Rc::new(42)); +//! std::thread::spawn(move || drop(a)); +//! ``` diff --git a/src/debt/fast.rs b/src/debt/fast.rs new file mode 100644 index 0000000..5500e24 --- /dev/null +++ b/src/debt/fast.rs @@ -0,0 +1,76 @@ +//! The fast slots for the primary strategy. +//! +//! They are faster, but fallible (in case the slots run out or if there's a collision with a +//! writer thread, this gives up and falls back to secondary strategy). +//! +//! They are based on hazard pointer ideas. To acquire one, the pointer is loaded, stored in the +//! slot and the debt is confirmed by loading it again and checking it is the same. +//! +//! # Orderings +//! +//! We ensure just one thing here. Since we do both the acquisition of the slot and the exchange of +//! the pointer in the writer with SeqCst, we are guaranteed to either see the change in case it +//! hits somewhere in between the two reads of the pointer, or to have successfully acquired it +//! before the change and before any cleanup of the old pointer happened (in which case we know the +//! writer will see our debt). + +use std::cell::Cell; +use std::slice::Iter; +use std::sync::atomic::Ordering::*; + +use super::Debt; + +const DEBT_SLOT_CNT: usize = 8; + +/// Thread-local information for the [`Slots`] +#[derive(Default)] +pub(super) struct Local { + // The next slot in round-robin rotation. Heuristically tries to balance the load across them + // instead of having all of them stuffed towards the start of the array which gets + // unsuccessfully iterated through every time. + offset: Cell<usize>, +} + +/// Bunch of fast debt slots. +#[derive(Default)] +pub(super) struct Slots([Debt; DEBT_SLOT_CNT]); + +impl Slots { + /// Try to allocate one slot and get the pointer in it. + /// + /// Fails if there are no free slots. + #[inline] + pub(super) fn get_debt(&self, ptr: usize, local: &Local) -> Option<&Debt> { + // Trick with offsets: we rotate through the slots (save the value from last time) + // so successive leases are likely to succeed on the first attempt (or soon after) + // instead of going through the list of already held ones. + let offset = local.offset.get(); + let len = self.0.len(); + for i in 0..len { + let i = (i + offset) % len; + // Note: the indexing check is almost certainly optimised out because the len + // is used above. And using .get_unchecked was actually *slower*. + let slot = &self.0[i]; + if slot.0.load(Relaxed) == Debt::NONE { + // We are allowed to split into the check and acquiring the debt. That's because we + // are the only ones allowed to change NONE to something else. But we still need a + // read-write operation wit SeqCst on it :-( + let old = slot.0.swap(ptr, SeqCst); + debug_assert_eq!(Debt::NONE, old); + local.offset.set(i + 1); + return Some(&self.0[i]); + } + } + None + } +} + +impl<'a> IntoIterator for &'a Slots { + type Item = &'a Debt; + + type IntoIter = Iter<'a, Debt>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} diff --git a/src/debt/helping.rs b/src/debt/helping.rs new file mode 100644 index 0000000..1358bae --- /dev/null +++ b/src/debt/helping.rs @@ -0,0 +1,334 @@ +//! Slots and global/thread local data for the Helping strategy. +//! +//! This is inspired (but not an exact copy) of +//! <https://pvk.ca/Blog/2020/07/07/flatter-wait-free-hazard-pointers/>. The debts are mostly +//! copies of the ones used by the hybrid strategy, but modified a bit. Just like in the hybrid +//! strategy, in case the slots run out or when the writer updates the value, the debts are paid by +//! incrementing the ref count (which is a little slower, but still wait-free/lock-free and still +//! in order of nanoseconds). +//! +//! ## Reader, the fast path +//! +//! * Publish an active address ‒ the address we'll be loading stuff from. +//! * Puts a generation into the control. +//! * Loads the pointer and puts it to the debt slot. +//! * Confirms by CaS-replacing the generation back to idle state. +//! +//! * Later, we pay it back by CaS-replacing it with the NO_DEPT (like any other slot). +//! +//! ## Writer, the non-colliding path +//! +//! * Replaces the pointer in the storage. +//! * The writer walks over all debts. It pays each debt that it is concerned with by bumping the +//! reference and replacing the dept with NO_DEPT. The relevant reader will fail in the CaS +//! (either because it finds NO_DEPT or other pointer in there) and knows the reference was +//! bumped, so it needs to decrement it. Note that it is possible that someone also reuses the +//! slot for the _same_ pointer. In that case that reader will set it to NO_DEPT and the newer +//! reader will have a pre-paid debt, which is fine. +//! +//! ## The collision path +//! +//! The reservation of a slot is not atomic, therefore a writer can observe the reservation in +//! progress. But it doesn't want to wait for it to complete (it wants to be lock-free, which means +//! it needs to be able to resolve the situation on its own). +//! +//! The way it knows it is in progress of the reservation is by seeing a generation in there (it has +//! a distinct tag). In that case it'll try to: +//! +//! * First verify that the reservation is being done for the same address it modified, by reading +//! and re-confirming the active_addr slot corresponding to the currently handled node. If it is +//! for some other address, the writer doesn't have to be concerned and proceeds to the next slot. +//! * It does a full load. That is fine, because the writer must be on a different thread than the +//! reader and therefore there is at least one free slot. Full load means paying the debt right +//! away by incrementing the reference count. +//! * Then it tries to pass the already fully protected/paid pointer to the reader. It writes it to +//! an envelope and CaS-replaces it into the control, instead of the generation (if it fails, +//! someone has been faster and it rolls back). We need the envelope because the pointer itself +//! doesn't have to be aligned to 4 bytes and we need the space for tags to distinguish the types +//! of info in control; we can ensure the envelope is). +//! * The reader then finds the generation got replaced by a pointer to the envelope and uses that +//! pointer inside the envelope. It aborts its own debt. This effectively exchanges the envelopes +//! between the threads so each one has an envelope ready for future. +//! +//! ## ABA protection +//! +//! The generation as pre-reserving the slot allows the writer to make sure it is offering the +//! loaded pointer to the same reader and that the read value is new enough (and of the same type). +//! +//! This solves the general case, but there's also much less frequent but theoretical ABA problem +//! that could lead to UB, if left unsolved: +//! +//! * There is a collision on generation G. +//! * The writer loads a pointer, bumps it. +//! * In the meantime, all the 2^30 or 2^62 generations (depending on the usize width) generations +//! wrap around. +//! * The writer stores the outdated and possibly different-typed pointer in there and the reader +//! uses it. +//! +//! To mitigate that, every time the counter overflows we take the current node and un-assign it +//! from our current thread. We mark it as in "cooldown" and let it in there until there are no +//! writers messing with that node any more (if they are not on the node, they can't experience the +//! ABA problem on it). After that, we are allowed to use it again. +//! +//! This doesn't block the reader, it'll simply find *a* node next time ‒ this one, or possibly a +//! different (or new) one. +//! +//! # Orderings +//! +//! The linked lists/nodes are already provided for us. So we just need to make sure the debt +//! juggling is done right. We assume that the local node is ours to write to (others have only +//! limited right to write to certain fields under certain conditions) and that we are counted into +//! active writers while we dig through it on the writer end. +//! +//! We use SeqCst on a read-write operation both here at the very start of the sequence (storing +//! the generation into the control) and in the writer on the actual pointer. That establishes a +//! relation of what has happened first. +//! +//! After that we split the time into segments by read-write operations with AcqRel read-write +//! operations on the control. There's just one control in play for both threads so we don't need +//! SeqCst and the segments are understood by both the same way. The writer can sometimes use only +//! load-Acquire on that, because it needs to only read from data written by the reader. It'll +//! always see data from at least the segment before the observed control value and uses CaS to +//! send the results back, so it can't go into the past. +//! +//! There are two little gotchas: +//! +//! * When we read the address we should be loading from, we need to give up if the address does +//! not match (we can't simply load from there, because it can be dangling by that point and we +//! don't know its type, so we need to use our address for all loading ‒ and we just check they +//! match). If we give up, we don't do that CaS into control, therefore we could have given up on +//! newer address than the control we have read. For that reason, the address is also stored by +//! reader with Release and we read it with Acquire, which'll bring an up to date version of +//! control into our thread ‒ and we re-read that one to confirm the address is indeed between +//! two same values holding the generation, therefore corresponding to it. +//! * The destructor doesn't have a SeqCst in the writer, because there was no write in there. +//! That's OK. We need to ensure there are no new readers after the "change" we confirm in the +//! writer and that change is the destruction ‒ by that time, the destroying thread has exclusive +//! ownership and therefore there can be no new readers. + +use std::cell::Cell; +use std::ptr; +use std::sync::atomic::Ordering::*; +use std::sync::atomic::{AtomicPtr, AtomicUsize}; + +use super::Debt; +use crate::RefCnt; + +pub const REPLACEMENT_TAG: usize = 0b01; +pub const GEN_TAG: usize = 0b10; +pub const TAG_MASK: usize = 0b11; +pub const IDLE: usize = 0; + +/// Thread local data for the helping strategy. +#[derive(Default)] +pub(super) struct Local { + // The generation counter. + generation: Cell<usize>, +} + +// Make sure the pointers have 2 empty bits. Always. +#[derive(Default)] +#[repr(align(4))] +struct Handover(AtomicUsize); + +/// The slots for the helping strategy. +pub(super) struct Slots { + /// The control structure of the slot. + /// + /// Different threads signal what stage they are in in there. It can contain: + /// + /// * `IDLE` (nothing is happening, and there may or may not be an active debt). + /// * a generation, tagged with GEN_TAG. The reader is trying to acquire a slot right now and a + /// writer might try to help out. + /// * A replacement pointer, tagged with REPLACEMENT_TAG. This pointer points to an Handover, + /// containing an already protected value, provided by the writer for the benefit of the + /// reader. The reader should abort its own debt and use this instead. This indirection + /// (storing pointer to the envelope with the actual pointer) is to make sure there's a space + /// for the tag ‒ there is no guarantee the real pointer is aligned to at least 4 bytes, we + /// can however force that for the Handover type. + control: AtomicUsize, + /// A possibly active debt. + slot: Debt, + /// If there's a generation in control, this signifies what address the reader is trying to + /// load from. + active_addr: AtomicUsize, + /// A place where a writer can put a replacement value. + /// + /// Note that this is simply an allocation, and every participating slot contributes one, but + /// they may be passed around through the lifetime of the program. It is not accessed directly, + /// but through the space_offer thing. + /// + handover: Handover, + /// A pointer to a handover envelope this node currently owns. + /// + /// A writer makes a switch of its and readers handover when successfully storing a replacement + /// in the control. + space_offer: AtomicPtr<Handover>, +} + +impl Default for Slots { + fn default() -> Self { + Slots { + control: AtomicUsize::new(IDLE), + slot: Debt::default(), + // Doesn't matter yet + active_addr: AtomicUsize::new(0), + // Also doesn't matter + handover: Handover::default(), + // Here we would like it to point to our handover. But for that we need to be in place + // in RAM (effectively pinned, though we use older Rust than Pin, possibly?), so not + // yet. See init(). + space_offer: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Slots { + pub(super) fn slot(&self) -> &Debt { + &self.slot + } + + pub(super) fn get_debt(&self, ptr: usize, local: &Local) -> (usize, bool) { + // Incrementing by 4 ensures we always have enough space for 2 bit of tags. + let gen = local.generation.get().wrapping_add(4); + debug_assert_eq!(gen & GEN_TAG, 0); + local.generation.set(gen); + // Signal the caller that the node should be sent to a cooldown. + let discard = gen == 0; + let gen = gen | GEN_TAG; + // We will sync by the write to the control. But we also sync the value of the previous + // generation/released slot. That way we may re-confirm in the writer that the reader is + // not in between here and the compare_exchange below with a stale gen (eg. if we are in + // here, the re-confirm there will load the NO_DEPT and we are fine). + self.active_addr.store(ptr, SeqCst); + + // We are the only ones allowed to do the IDLE -> * transition and we never leave it in + // anything else after an transaction, so this is OK. But we still need a load-store SeqCst + // operation here to form a relation between this and the store of the actual pointer in + // the writer thread :-(. + let prev = self.control.swap(gen, SeqCst); + debug_assert_eq!(IDLE, prev, "Left control in wrong state"); + + (gen, discard) + } + + pub(super) fn help<R, T>(&self, who: &Self, storage_addr: usize, replacement: &R) + where + T: RefCnt, + R: Fn() -> T, + { + debug_assert_eq!(IDLE, self.control.load(Relaxed)); + // Also acquires the auxiliary data in other variables. + let mut control = who.control.load(SeqCst); + loop { + match control & TAG_MASK { + // Nothing to help with + IDLE if control == IDLE => break, + // Someone has already helped out with that, so we have nothing to do here + REPLACEMENT_TAG => break, + // Something is going on, let's have a better look. + GEN_TAG => { + debug_assert!( + !ptr::eq(self, who), + "Refusing to help myself, makes no sense" + ); + // Get the address that other thread is trying to load from. By that acquire, + // we also sync the control into our thread once more and reconfirm that the + // value of the active_addr is in between two same instances, therefore up to + // date to it. + let active_addr = who.active_addr.load(SeqCst); + if active_addr != storage_addr { + // Acquire for the same reason as on the top. + let new_control = who.control.load(SeqCst); + if new_control == control { + // The other thread is doing something, but to some other ArcSwap, so + // we don't care. Cool, done. + break; + } else { + // The control just changed under our hands, we don't know what to + // trust, so retry. + control = new_control; + continue; + } + } + + // Now we know this work is for us. Try to create a replacement and offer it. + // This actually does a full-featured load under the hood, but we are currently + // idle and the load doesn't re-enter write, so that's all fine. + let replacement = replacement(); + let replace_addr = T::as_ptr(&replacement) as usize; + // If we succeed in helping the other thread, we take their empty space in + // return for us that we pass to them. It's already there, the value is synced + // to us by Acquire on control. + let their_space = who.space_offer.load(SeqCst); + // Relaxed is fine, our own thread and nobody but us writes in here. + let my_space = self.space_offer.load(SeqCst); + // Relaxed is fine, we'll sync by the next compare-exchange. If we don't, the + // value won't ever be read anyway. + unsafe { + (*my_space).0.store(replace_addr, SeqCst); + } + // Ensured by the align annotation at the type. + assert_eq!(my_space as usize & TAG_MASK, 0); + let space_addr = (my_space as usize) | REPLACEMENT_TAG; + // Acquire on failure -> same reason as at the top, reading the value. + // Release on success -> we send data to that thread through here. Must be + // AcqRel, because success must be superset of failure. Also, load to get their + // space (it won't have changed, it does when the control is set to IDLE). + match who + .control + .compare_exchange(control, space_addr, SeqCst, SeqCst) + { + Ok(_) => { + // We have successfully sent our replacement out (Release) and got + // their space in return (Acquire on that load above). + self.space_offer.store(their_space, SeqCst); + // The ref count went with it, so forget about it here. + T::into_ptr(replacement); + // We have successfully helped out, so we are done. + break; + } + Err(new_control) => { + // Something has changed in between. Let's try again, nothing changed + // (the replacement will get dropped at the end of scope, we didn't do + // anything with the spaces, etc. + control = new_control; + } + } + } + _ => unreachable!("Invalid control value {:X}", control), + } + } + } + + pub(super) fn init(&mut self) { + *self.space_offer.get_mut() = &mut self.handover; + } + + pub(super) fn confirm(&self, gen: usize, ptr: usize) -> Result<(), usize> { + // Put the slot there and consider it acquire of a „lock“. For that we need swap, not store + // only (we need Acquire and Acquire works only on loads). Release is to make sure control + // is observable by the other thread (but that's probably not necessary anyway?) + let prev = self.slot.0.swap(ptr, SeqCst); + debug_assert_eq!(Debt::NONE, prev); + + // Confirm by writing to the control (or discover that we got helped). We stop anyone else + // from helping by setting it to IDLE. + let control = self.control.swap(IDLE, SeqCst); + if control == gen { + // Nobody interfered, we have our debt in place and can proceed. + Ok(()) + } else { + // Someone put a replacement in there. + debug_assert_eq!(control & TAG_MASK, REPLACEMENT_TAG); + let handover = (control & !TAG_MASK) as *mut Handover; + let replacement = unsafe { &*handover }.0.load(SeqCst); + // Make sure we advertise the right envelope when we set it to generation next time. + self.space_offer.store(handover, SeqCst); + // Note we've left the debt in place. The caller should pay it back (without ever + // taking advantage of it) to make sure any extra is actually dropped (it is possible + // someone provided the replacement *and* paid the debt and we need just one of them). + Err(replacement) + } + } +} diff --git a/src/debt/list.rs b/src/debt/list.rs new file mode 100644 index 0000000..3d17388 --- /dev/null +++ b/src/debt/list.rs @@ -0,0 +1,346 @@ +//! A linked list of debt nodes. +//! +//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node, +//! writer walks everything (but may also use some owned values). +//! +//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another +//! thread later on). This makes the implementation much simpler, since everything here is +//! `'static` and we don't have to care about knowing when to free stuff. +//! +//! The nodes contain both the fast primary slots and a secondary fallback ones. +//! +//! # Synchronization +//! +//! We synchronize several things here. +//! +//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each +//! attempt to add another node). Note that certain parts never change after that (they aren't even +//! atomic) and other things that do change take care of themselves (the debt slots have their own +//! synchronization, etc). +//! +//! The ownership is acquire-release lock pattern. +//! +//! Similar, the counting of active writers is an acquire-release lock pattern. +//! +//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see +//! at least as up to date value of the writers as when the cooldown started. That we if we see 0, +//! we know it must have happened since then. + +use std::cell::Cell; +use std::ptr; +use std::slice::Iter; +use std::sync::atomic::Ordering::*; +use std::sync::atomic::{AtomicPtr, AtomicUsize}; + +use super::fast::{Local as FastLocal, Slots as FastSlots}; +use super::helping::{Local as HelpingLocal, Slots as HelpingSlots}; +use super::Debt; +use crate::RefCnt; + +const NODE_UNUSED: usize = 0; +const NODE_USED: usize = 1; +const NODE_COOLDOWN: usize = 2; + +/// The head of the debt linked list. +static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); + +pub struct NodeReservation<'a>(&'a Node); + +impl Drop for NodeReservation<'_> { + fn drop(&mut self) { + self.0.active_writers.fetch_sub(1, Release); + } +} + +/// One thread-local node for debts. +#[repr(C, align(64))] +pub(crate) struct Node { + fast: FastSlots, + helping: HelpingSlots, + in_use: AtomicUsize, + // Next node in the list. + // + // It is a pointer because we touch it before synchronization (we don't _dereference_ it before + // synchronization, only manipulate the pointer itself). That is illegal according to strict + // interpretation of the rules by MIRI on references. + next: *const Node, + active_writers: AtomicUsize, +} + +impl Default for Node { + fn default() -> Self { + Node { + fast: FastSlots::default(), + helping: HelpingSlots::default(), + in_use: AtomicUsize::new(NODE_USED), + next: ptr::null(), + active_writers: AtomicUsize::new(0), + } + } +} + +impl Node { + /// Goes through the debt linked list. + /// + /// This traverses the linked list, calling the closure on each node. If the closure returns + /// `Some`, it terminates with that value early, otherwise it runs to the end. + pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> { + // Acquire ‒ we want to make sure we read the correct version of data at the end of the + // pointer. Any write to the DEBT_HEAD is with Release. + // + // Furthermore, we need to see the newest version of the list in case we examine the debts + // - if a new one is added recently, we don't want a stale read -> SeqCst. + // + // Note that the other pointers in the chain never change and are *ordinary* pointers. The + // whole linked list is synchronized through the head. + let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() }; + while let Some(node) = current { + let result = f(node); + if result.is_some() { + return result; + } + current = unsafe { node.next.as_ref() }; + } + None + } + + /// Put the current thread node into cooldown + fn start_cooldown(&self) { + // Trick: Make sure we have an up to date value of the active_writers in this thread, so we + // can properly release it below. + let _reservation = self.reserve_writer(); + assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release)); + } + + /// Perform a cooldown if the node is ready. + /// + /// See the ABA protection at the [helping]. + fn check_cooldown(&self) { + // Check if the node is in cooldown, for two reasons: + // * Skip most of nodes fast, without dealing with them. + // * More importantly, sync the value of active_writers to be at least the value when the + // cooldown started. That way we know the 0 we observe happened some time after + // start_cooldown. + if self.in_use.load(Acquire) == NODE_COOLDOWN { + // The rest can be nicely relaxed ‒ no memory is being synchronized by these + // operations. We just see an up to date 0 and allow someone (possibly us) to claim the + // node later on. + if self.active_writers.load(Relaxed) == 0 { + let _ = self + .in_use + .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed); + } + } + } + + /// Mark this node that a writer is currently playing with it. + pub fn reserve_writer(&self) -> NodeReservation { + self.active_writers.fetch_add(1, Acquire); + NodeReservation(self) + } + + /// "Allocate" a node. + /// + /// Either a new one is created, or previous one is reused. The node is claimed to become + /// in_use. + fn get() -> &'static Self { + // Try to find an unused one in the chain and reuse it. + Self::traverse(|node| { + node.check_cooldown(); + if node + .in_use + // We claim a unique control over the generation and the right to write to slots if + // they are NO_DEPT + .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed) + .is_ok() + { + Some(node) + } else { + None + } + }) + // If that didn't work, create a new one and prepend to the list. + .unwrap_or_else(|| { + let node = Box::leak(Box::<Node>::default()); + node.helping.init(); + // We don't want to read any data in addition to the head, Relaxed is fine + // here. + // + // We do need to release the data to others, but for that, we acquire in the + // compare_exchange below. + let mut head = LIST_HEAD.load(Relaxed); + loop { + node.next = head; + if let Err(old) = LIST_HEAD.compare_exchange_weak( + head, node, + // We need to release *the whole chain* here. For that, we need to + // acquire it first. + // + // SeqCst because we need to make sure it is properly set "before" we do + // anything to the debts. + SeqCst, Relaxed, // Nothing changed, go next round of the loop. + ) { + head = old; + } else { + return node; + } + } + }) + } + + /// Iterate over the fast slots. + pub(crate) fn fast_slots(&self) -> Iter<Debt> { + self.fast.into_iter() + } + + /// Access the helping slot. + pub(crate) fn helping_slot(&self) -> &Debt { + self.helping.slot() + } +} + +/// A wrapper around a node pointer, to un-claim the node on thread shutdown. +pub(crate) struct LocalNode { + /// Node for this thread, if any. + /// + /// We don't necessarily have to own one, but if we don't, we'll get one before the first use. + node: Cell<Option<&'static Node>>, + + /// Thread-local data for the fast slots. + fast: FastLocal, + + /// Thread local data for the helping strategy. + helping: HelpingLocal, +} + +impl LocalNode { + pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { + let f = Cell::new(Some(f)); + THREAD_HEAD + .try_with(|head| { + if head.node.get().is_none() { + head.node.set(Some(Node::get())); + } + let f = f.take().unwrap(); + f(head) + }) + // During the application shutdown, the thread local storage may be already + // deallocated. In that case, the above fails but we still need something. So we just + // find or allocate a node and use it just once. + // + // Note that the situation should be very very rare and not happen often, so the slower + // performance doesn't matter that much. + .unwrap_or_else(|_| { + let tmp_node = LocalNode { + node: Cell::new(Some(Node::get())), + fast: FastLocal::default(), + helping: HelpingLocal::default(), + }; + let f = f.take().unwrap(); + f(&tmp_node) + // Drop of tmp_node -> sends the node we just used into cooldown. + }) + } + + /// Creates a new debt. + /// + /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a + /// reference to that slot, or gives up with `None` if all the slots are currently full. + #[inline] + pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> { + let node = &self.node.get().expect("LocalNode::with ensures it is set"); + debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); + node.fast.get_debt(ptr, &self.fast) + } + + /// Initializes a helping slot transaction. + /// + /// Returns the generation (with tag). + pub(crate) fn new_helping(&self, ptr: usize) -> usize { + let node = &self.node.get().expect("LocalNode::with ensures it is set"); + debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); + let (gen, discard) = node.helping.get_debt(ptr, &self.helping); + if discard { + // Too many generations happened, make sure the writers give the poor node a break for + // a while so they don't observe the generation wrapping around. + node.start_cooldown(); + self.node.take(); + } + gen + } + + /// Confirm the helping transaction. + /// + /// The generation comes from previous new_helping. + /// + /// Will either return a debt with the pointer, or a debt to pay and a replacement (already + /// protected) address. + pub(crate) fn confirm_helping( + &self, + gen: usize, + ptr: usize, + ) -> Result<&'static Debt, (&'static Debt, usize)> { + let node = &self.node.get().expect("LocalNode::with ensures it is set"); + debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); + let slot = node.helping_slot(); + node.helping + .confirm(gen, ptr) + .map(|()| slot) + .map_err(|repl| (slot, repl)) + } + + /// The writer side of a helping slot. + /// + /// This potentially helps the `who` node (uses self as the local node, which must be + /// different) by loading the address that one is trying to load. + pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) + where + T: RefCnt, + R: Fn() -> T, + { + let node = &self.node.get().expect("LocalNode::with ensures it is set"); + debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); + node.helping.help(&who.helping, storage_addr, replacement) + } +} + +impl Drop for LocalNode { + fn drop(&mut self) { + if let Some(node) = self.node.get() { + // Release - syncing writes/ownership of this Node + node.start_cooldown(); + } + } +} + +thread_local! { + /// A debt node assigned to this thread. + static THREAD_HEAD: LocalNode = LocalNode { + node: Cell::new(None), + fast: FastLocal::default(), + helping: HelpingLocal::default(), + }; +} + +#[cfg(test)] +mod tests { + use super::*; + + impl Node { + fn is_empty(&self) -> bool { + self.fast_slots() + .chain(std::iter::once(self.helping_slot())) + .all(|d| d.0.load(Relaxed) == Debt::NONE) + } + + fn get_thread() -> &'static Self { + LocalNode::with(|h| h.node.get().unwrap()) + } + } + + /// A freshly acquired thread local node is empty. + #[test] + fn new_empty() { + assert!(Node::get_thread().is_empty()); + } +} diff --git a/src/debt/mod.rs b/src/debt/mod.rs new file mode 100644 index 0000000..8a792a2 --- /dev/null +++ b/src/debt/mod.rs @@ -0,0 +1,137 @@ +//! Debt handling. +//! +//! A debt is a reference count of a smart pointer that is owed. This module provides a lock-free +//! storage for debts. +//! +//! Each thread has its own node with bunch of slots. Only that thread can allocate debts in there, +//! but others are allowed to inspect and pay them. The nodes form a linked list for the reason of +//! inspection. The nodes are never removed (even after the thread terminates), but if the thread +//! gives it up, another (new) thread can claim it. +//! +//! The writers walk the whole chain and pay the debts (by bumping the ref counts) of the just +//! removed pointer. +//! +//! Each node has some fast (but fallible) nodes and a fallback node, with different algorithms to +//! claim them (see the relevant submodules). + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::*; + +pub(crate) use self::list::{LocalNode, Node}; +use super::RefCnt; + +mod fast; +mod helping; +mod list; + +/// One debt slot. +/// +/// It may contain an „owed“ reference count. +#[derive(Debug)] +pub(crate) struct Debt(pub(crate) AtomicUsize); + +impl Debt { + /// The value of pointer `3` should be pretty safe, for two reasons: + /// + /// * It's an odd number, but the pointers we have are likely aligned at least to the word size, + /// because the data at the end of the `Arc` has the counters. + /// * It's in the very first page where NULL lives, so it's not mapped. + pub(crate) const NONE: usize = 0b11; +} + +impl Default for Debt { + fn default() -> Self { + Debt(AtomicUsize::new(Self::NONE)) + } +} + +impl Debt { + /// Tries to pay the given debt. + /// + /// If the debt is still there, for the given pointer, it is paid and `true` is returned. If it + /// is empty or if there's some other pointer, it is not paid and `false` is returned, meaning + /// the debt was paid previously by someone else. + /// + /// # Notes + /// + /// * It is possible that someone paid the debt and then someone else put a debt for the same + /// pointer in there. This is fine, as we'll just pay the debt for that someone else. + /// * This relies on the fact that the same pointer must point to the same object and + /// specifically to the same type ‒ the caller provides the type, it's destructor, etc. + /// * It also relies on the fact the same thing is not stuffed both inside an `Arc` and `Rc` or + /// something like that, but that sounds like a reasonable assumption. Someone storing it + /// through `ArcSwap<T>` and someone else with `ArcSwapOption<T>` will work. + #[inline] + pub(crate) fn pay<T: RefCnt>(&self, ptr: *const T::Base) -> bool { + self.0 + // If we don't change anything because there's something else, Relaxed is fine. + // + // The Release works as kind of Mutex. We make sure nothing from the debt-protected + // sections leaks below this point. + // + // Note that if it got paid already, it is inside the reference count. We don't + // necessarily observe that increment, but whoever destroys the pointer *must* see the + // up to date value, with all increments already counted in (the Arc takes care of that + // part). + .compare_exchange(ptr as usize, Self::NONE, Release, Relaxed) + .is_ok() + } + + /// Pays all the debts on the given pointer and the storage. + pub(crate) fn pay_all<T, R>(ptr: *const T::Base, storage_addr: usize, replacement: R) + where + T: RefCnt, + R: Fn() -> T, + { + LocalNode::with(|local| { + let val = unsafe { T::from_ptr(ptr) }; + // Pre-pay one ref count that can be safely put into a debt slot to pay it. + T::inc(&val); + + Node::traverse::<(), _>(|node| { + // Make the cooldown trick know we are poking into this node. + let _reservation = node.reserve_writer(); + + local.help(node, storage_addr, &replacement); + + let all_slots = node + .fast_slots() + .chain(std::iter::once(node.helping_slot())); + for slot in all_slots { + // Note: Release is enough even here. That makes sure the increment is + // visible to whoever might acquire on this slot and can't leak below this. + // And we are the ones doing decrements anyway. + if slot.pay::<T>(ptr) { + // Pre-pay one more, for another future slot + T::inc(&val); + } + } + + None + }); + // Implicit dec by dropping val in here, pair for the above + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + /// Checks the assumption that arcs to ZSTs have different pointer values. + #[test] + fn arc_zst() { + struct A; + struct B; + + let a = Arc::new(A); + let b = Arc::new(B); + + let aref: &A = &a; + let bref: &B = &b; + + let aptr = aref as *const _ as usize; + let bptr = bref as *const _ as usize; + assert_ne!(aptr, bptr); + } +} diff --git a/src/docs/internal.rs b/src/docs/internal.rs new file mode 100644 index 0000000..f5ee001 --- /dev/null +++ b/src/docs/internal.rs @@ -0,0 +1,106 @@ +//! Internal details. +//! +//! While the other parts of documentation are useful to users of the crate, this part is probably +//! helpful only if you want to look into the code or are curious about how it works internally. +//! +//! Also note that any of these details may change in future versions and are not part of the +//! stability guarantees. Don't rely on anything here. +//! +//! # Storing the [`Arc`]. +//! +//! The [`Arc`] can be turned into a raw pointer and back. This is abstracted by the [`RefCnt`] +//! trait and it is technically possible to implement it for custom types (this crate also +//! implements it for [`Rc`] and [`Weak`], though the actual usefulness of these is a bit +//! questionable). +//! +//! The raw pointer is stored inside an [`AtomicPtr`]. +//! +//! # Protection of reference counts +//! +//! The first idea would be to just use [`AtomicPtr`] with whatever the [`Arc::into_raw`] returns. +//! Then replacing it would be fine (there's no need to update ref counts). The load needs to +//! increment the reference count ‒ one still stays inside and another is returned to the caller. +//! This is done by re-creating the Arc from the raw pointer and then cloning it, throwing one +//! instance away (without destroying it). +//! +//! This approach has a problem. There's a short time between we read the raw pointer and increment +//! the count. If some other thread replaces the stored Arc and throws it away, the ref count could +//! drop to 0, get destroyed and we would be trying to bump ref counts in a ghost, which would be +//! totally broken. +//! +//! To prevent this, we actually use two approaches in a hybrid manner. +//! +//! The first one is based on hazard pointers idea, but slightly modified. There's a global +//! repository of pointers that owe a reference. When someone swaps a pointer, it walks this list +//! and pays all the debts (and takes them out of the repository). +//! +//! For simplicity and performance, storing into the repository is fallible. If storing into the +//! repository fails (because the thread used up all its own slots, or because the pointer got +//! replaced in just the wrong moment and it can't confirm the reservation), unlike the full +//! hazard-pointers approach, we don't retry, but fall back onto secondary strategy. +//! +//! The secondary strategy is similar, but a bit more complex (and therefore slower, that's why it +//! is only a fallback). We first publish an intent to read a pointer (and where we are reading it +//! from). Then we actually do so and publish the debt, like previously. +//! +//! The writer pays the debts as usual. But also, if it sees the intent to read the value, it helps +//! along, reads it, bumps the reference and passes it to the reader. Therefore, if the reader +//! fails to do the protection itself, because it got interrupted by a writer, it finds a +//! ready-made replacement value it can just use and doesn't have to retry. Also, the writer +//! doesn't have to wait for the reader in any way, because it can just solve its problem and move +//! on. +//! +//! # Unsafety +//! +//! All the uses of the unsafe keyword is just to turn the raw pointer back to Arc. It originated +//! from an Arc in the first place, so the only thing to ensure is it is still valid. That means its +//! ref count never dropped to 0. +//! +//! At the beginning, there's ref count of 1 stored in the raw pointer (and maybe some others +//! elsewhere, but we can't rely on these). This 1 stays there for the whole time the pointer is +//! stored there. When the arc is replaced, this 1 is returned to the caller, so we just have to +//! make sure no more readers access it by that time. +//! +//! # Leases and debts +//! +//! Instead of incrementing the reference count, the pointer reference can be owed. In such case, it +//! is recorded into a global storage. As each thread has its own storage (the global storage is +//! composed of multiple thread storages), the readers don't contend. When the pointer is no longer +//! in use, the debt is erased. +//! +//! The writer pays all the existing debts, therefore the reader have the full Arc with ref count at +//! that time. The reader is made aware the debt was paid and decrements the reference count. +//! +//! # Memory orders +//! +//! ## Synchronizing the data pointed to by the pointer. +//! +//! We have AcqRel (well, SeqCst, but that's included) on the swap and Acquire on the loads. In case +//! of the double read around the debt allocation, we do that on the *second*, because of ABA. +//! That's also why that SeqCst on the allocation of debt itself is not enough. +//! the *latest* decrement. By making both the increment and decrement AcqRel, we effectively chain +//! the edges together. +//! +//! # Memory orders around debts +//! +//! The linked list of debt nodes only grows. The shape of the list (existence of nodes) is +//! synchronized through Release on creation and Acquire on load on the head pointer. +//! +//! The debts work similar to locks ‒ Acquire and Release make all the pointer manipulation at the +//! interval where it is written down. However, we use the SeqCst on the allocation of the debt +//! because when we see an empty slot, we need to make sure that it happened after we have +//! overwritten the pointer. +//! +//! In case the writer pays the debt, it sees the new enough data (for the same reasons the stale +//! empties are not seen). The reference count on the Arc is AcqRel and makes sure it is not +//! destroyed too soon. The writer traverses all the slots, therefore they don't need to synchronize +//! with each other. +//! +//! Further details are inside the internal `debt` module. +//! +//! [`RefCnt`]: crate::RefCnt +//! [`Arc`]: std::sync::Arc +//! [`Arc::into_raw`]: std::sync::Arc::into_raw +//! [`Rc`]: std::rc::Rc +//! [`Weak`]: std::sync::Weak +//! [`AtomicPtr`]: std::sync::atomic::AtomicPtr diff --git a/src/docs/limitations.rs b/src/docs/limitations.rs new file mode 100644 index 0000000..5d3ca00 --- /dev/null +++ b/src/docs/limitations.rs @@ -0,0 +1,53 @@ +//! Limitations and common pitfalls. +//! +//! # Sized types +//! +//! This currently works only for `Sized` types. Unsized types have „fat pointers“, which are twice +//! as large as the normal ones. The [`AtomicPtr`] doesn't support them. One could use something +//! like `AtomicU128` for them. The catch is this doesn't exist and the difference would make it +//! really hard to implement the debt storage/stripped down hazard pointers. +//! +//! A workaround is to use double indirection: +//! +//! ```rust +//! # use arc_swap::ArcSwap; +//! // This doesn't work: +//! // let data: ArcSwap<[u8]> = ArcSwap::new(Arc::from([1, 2, 3])); +//! +//! // But this does: +//! let data: ArcSwap<Box<[u8]>> = ArcSwap::from_pointee(Box::new([1, 2, 3])); +//! # drop(data); +//! ``` +//! +//! It also may be possible to use `ArcSwap` with the [`triomphe::ThinArc`] (that crate needs +//! enabling a feature flag to cooperate with `ArcSwap`). +//! +//! # Too many [`Guard`]s +//! +//! There's only limited number of "fast" slots for borrowing from [`ArcSwap`] for each single +//! thread (currently 8, but this might change in future versions). If these run out, the algorithm +//! falls back to slower path. +//! +//! If too many [`Guard`]s are kept around, the performance might be poor. These are not intended +//! to be stored in data structures or used across async yield points. +//! +//! [`ArcSwap`]: crate::ArcSwap +//! [`Guard`]: crate::Guard +//! [`AtomicPtr`]: std::sync::atomic::AtomicPtr +//! +//! # No `Clone` implementation +//! +//! Previous version implemented [`Clone`], but it turned out to be very confusing to people, since +//! it created fully independent [`ArcSwap`]. Users expected the instances to be tied to each +//! other, that store in one would change the result of future load of the other. +//! +//! To emulate the original behaviour, one can do something like this: +//! +//! ```rust +//! # use arc_swap::ArcSwap; +//! # let old = ArcSwap::from_pointee(42); +//! let new = ArcSwap::new(old.load_full()); +//! # let _ = new; +//! ``` +//! +//! [`triomphe::ThinArc`]: https://docs.rs/triomphe/latest/triomphe/struct.ThinArc.html diff --git a/src/docs/mod.rs b/src/docs/mod.rs new file mode 100644 index 0000000..9d5a45f --- /dev/null +++ b/src/docs/mod.rs @@ -0,0 +1,41 @@ +//! Additional documentation. +//! +//! Here we have some more general topics that might be good to know that just don't fit to the +//! crate level intro. +//! +//! Also, there were some previous blog posts about the crate which you might find interesting. +//! +//! # Atomic orderings +//! +//! Each operation on the [`ArcSwapAny`] with [`DefaultStrategy`] type callable concurrently (eg. +//! [`load`], but not [`into_inner`]) contains at least one [`SeqCst`] atomic read-write operation, +//! therefore even operations on different instances have a defined global order of operations. +//! +//! # Features +//! +//! The `weak` feature adds the ability to use arc-swap with the [`Weak`] pointer too, +//! through the [`ArcSwapWeak`] type. The needed std support is stabilized in rust version 1.45 (as +//! of now in beta). +//! +//! The `experimental-strategies` enables few more strategies that can be used. Note that these +//! **are not** part of the API stability guarantees and they may be changed, renamed or removed at +//! any time. +//! +//! # Minimal compiler version +//! +//! The `1` versions will compile on all compilers supporting the 2018 edition. Note that this +//! applies only if no additional feature flags are enabled and does not apply to compiling or +//! running tests. +//! +//! [`ArcSwapAny`]: crate::ArcSwapAny +//! [`ArcSwapWeak`]: crate::ArcSwapWeak +//! [`load`]: crate::ArcSwapAny::load +//! [`into_inner`]: crate::ArcSwapAny::into_inner +//! [`DefaultStrategy`]: crate::DefaultStrategy +//! [`SeqCst`]: std::sync::atomic::Ordering::SeqCst +//! [`Weak`]: std::sync::Weak + +pub mod internal; +pub mod limitations; +pub mod patterns; +pub mod performance; diff --git a/src/docs/patterns.rs b/src/docs/patterns.rs new file mode 100644 index 0000000..b2f91f5 --- /dev/null +++ b/src/docs/patterns.rs @@ -0,0 +1,271 @@ +//! Common use patterns +//! +//! Here are some common patterns one can use for inspiration. These are mostly covered by examples +//! at the right type in the crate, but this lists them at a single place. +//! +//! # Sharing of configuration data +//! +//! We want to share configuration from some source with rare updates to some high performance +//! worker threads. It can be configuration in its true sense, or a routing table. +//! +//! The idea here is, each new version is a newly allocated in its own [`Arc`]. It is then stored +//! into a *shared* `ArcSwap` instance. +//! +//! Each worker then loads the current version before each work chunk. In case a new version is +//! stored, the worker keeps using the loaded one until it ends the work chunk and, if it's the +//! last one to have the version, deallocates it automatically by dropping the [`Guard`] +//! +//! Note that the configuration needs to be passed through a *single shared* [`ArcSwap`]. That +//! means we need to share that instance and we do so through an [`Arc`] (one could use a global +//! variable instead). +//! +//! Therefore, what we have is `Arc<ArcSwap<Config>>`. +//! +//! ```rust +//! # use std::sync::Arc; +//! # use std::sync::atomic::{AtomicBool, Ordering}; +//! # use std::thread; +//! # use std::time::Duration; +//! # +//! # use arc_swap::ArcSwap; +//! # struct Work; +//! # impl Work { fn fetch() -> Self { Work } fn perform(&self, _: &Config) {} } +//! # +//! #[derive(Debug, Default)] +//! struct Config { +//! // ... Stuff in here ... +//! } +//! +//! // We wrap the ArcSwap into an Arc, so we can share it between threads. +//! let config = Arc::new(ArcSwap::from_pointee(Config::default())); +//! +//! let terminate = Arc::new(AtomicBool::new(false)); +//! let mut threads = Vec::new(); +//! +//! // The configuration thread +//! threads.push(thread::spawn({ +//! let config = Arc::clone(&config); +//! let terminate = Arc::clone(&terminate); +//! move || { +//! while !terminate.load(Ordering::Relaxed) { +//! thread::sleep(Duration::from_secs(6)); +//! // Actually, load it from somewhere +//! let new_config = Arc::new(Config::default()); +//! config.store(new_config); +//! } +//! } +//! })); +//! +//! // The worker thread +//! for _ in 0..10 { +//! threads.push(thread::spawn({ +//! let config = Arc::clone(&config); +//! let terminate = Arc::clone(&terminate); +//! move || { +//! while !terminate.load(Ordering::Relaxed) { +//! let work = Work::fetch(); +//! let config = config.load(); +//! work.perform(&config); +//! } +//! } +//! })); +//! } +//! +//! // Terminate gracefully +//! terminate.store(true, Ordering::Relaxed); +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! # Consistent snapshots +//! +//! While one probably wants to get a fresh instance every time a work chunk is available, +//! therefore there would be one [`load`] for each work chunk, it is often also important that the +//! configuration doesn't change in the *middle* of processing of one chunk. Therefore, one +//! commonly wants *exactly* one [`load`] for the work chunk, not *at least* one. If the processing +//! had multiple phases, one would use something like this: +//! +//! ```rust +//! # use std::sync::Arc; +//! # +//! # use arc_swap::ArcSwap; +//! # struct Config; +//! # struct Work; +//! # impl Work { +//! # fn fetch() -> Self { Work } +//! # fn phase_1(&self, _: &Config) {} +//! # fn phase_2(&self, _: &Config) {} +//! # } +//! # let config = Arc::new(ArcSwap::from_pointee(Config)); +//! let work = Work::fetch(); +//! let config = config.load(); +//! work.phase_1(&config); +//! // We keep the same config value here +//! work.phase_2(&config); +//! ``` +//! +//! Over this: +//! +//! ```rust +//! # use std::sync::Arc; +//! # +//! # use arc_swap::ArcSwap; +//! # struct Config; +//! # struct Work; +//! # impl Work { +//! # fn fetch() -> Self { Work } +//! # fn phase_1(&self, _: &Config) {} +//! # fn phase_2(&self, _: &Config) {} +//! # } +//! # let config = Arc::new(ArcSwap::from_pointee(Config)); +//! let work = Work::fetch(); +//! work.phase_1(&config.load()); +//! // WARNING!! This is broken, because in between phase_1 and phase_2, the other thread could +//! // have replaced the config. Then each phase would be performed with a different one and that +//! // could lead to surprises. +//! work.phase_2(&config.load()); +//! ``` +//! +//! # Caching of the configuration +//! +//! Let's say that the work chunks are really small, but there's *a lot* of them to work on. Maybe +//! we are routing packets and the configuration is the routing table that can sometimes change, +//! but mostly doesn't. +//! +//! There's an overhead to [`load`]. If the work chunks are small enough, that could be measurable. +//! We can reach for [`Cache`]. It makes loads much faster (in the order of accessing local +//! variables) in case nothing has changed. It has two costs, it makes the load slightly slower in +//! case the thing *did* change (which is rare) and if the worker is inactive, it holds the old +//! cached value alive. +//! +//! This is OK for our use case, because the routing table is usually small enough so some stale +//! instances taking a bit of memory isn't an issue. +//! +//! The part that takes care of updates stays the same as above. +//! +//! ```rust +//! # use std::sync::Arc; +//! # use std::thread; +//! # use std::sync::atomic::{AtomicBool, Ordering}; +//! # use arc_swap::{ArcSwap, Cache}; +//! # struct Packet; impl Packet { fn receive() -> Self { Packet } } +//! +//! #[derive(Debug, Default)] +//! struct RoutingTable { +//! // ... Stuff in here ... +//! } +//! +//! impl RoutingTable { +//! fn route(&self, _: Packet) { +//! // ... Interesting things are done here ... +//! } +//! } +//! +//! let routing_table = Arc::new(ArcSwap::from_pointee(RoutingTable::default())); +//! +//! let terminate = Arc::new(AtomicBool::new(false)); +//! let mut threads = Vec::new(); +//! +//! for _ in 0..10 { +//! let t = thread::spawn({ +//! let routing_table = Arc::clone(&routing_table); +//! let terminate = Arc::clone(&terminate); +//! move || { +//! let mut routing_table = Cache::new(routing_table); +//! while !terminate.load(Ordering::Relaxed) { +//! let packet = Packet::receive(); +//! // This load is cheaper, because we cache in the private Cache thing. +//! // But if the above receive takes a long time, the Cache will keep the stale +//! // value alive until this time (when it will get replaced by up to date value). +//! let current = routing_table.load(); +//! current.route(packet); +//! } +//! } +//! }); +//! threads.push(t); +//! } +//! +//! // Shut down properly +//! terminate.store(true, Ordering::Relaxed); +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! # Projecting into configuration field +//! +//! We have a larger application, composed of multiple components. Each component has its own +//! `ComponentConfig` structure. Then, the whole application has a `Config` structure that contains +//! a component config for each component: +//! +//! ```rust +//! # struct ComponentConfig; +//! +//! struct Config { +//! component: ComponentConfig, +//! // ... Some other components and things ... +//! } +//! # let c = Config { component: ComponentConfig }; +//! # let _ = c.component; +//! ``` +//! +//! We would like to use [`ArcSwap`] to push updates to the components. But for various reasons, +//! it's not a good idea to put the whole `ArcSwap<Config>` to each component, eg: +//! +//! * That would make each component depend on the top level config, which feels reversed. +//! * It doesn't allow reusing the same component in multiple applications, as these would have +//! different `Config` structures. +//! * One needs to build the whole `Config` for tests. +//! * There's a risk of entanglement, that the component would start looking at configuration of +//! different parts of code, which would be hard to debug. +//! +//! We also could have a separate `ArcSwap<ComponentConfig>` for each component, but that also +//! doesn't feel right, as we would have to push updates to multiple places and they could be +//! inconsistent for a while and we would have to decompose the `Config` structure into the parts, +//! because we need our things in [`Arc`]s to be put into [`ArcSwap`]. +//! +//! This is where the [`Access`] trait comes into play. The trait abstracts over things that can +//! give access to up to date version of specific T. That can be a [`Constant`] (which is useful +//! mostly for the tests, where one doesn't care about the updating), it can be an +//! [`ArcSwap<T>`][`ArcSwap`] itself, but it also can be an [`ArcSwap`] paired with a closure to +//! project into the specific field. The [`DynAccess`] is similar, but allows type erasure. That's +//! more convenient, but a little bit slower. +//! +//! ```rust +//! # use std::sync::Arc; +//! # use arc_swap::ArcSwap; +//! # use arc_swap::access::{DynAccess, Map}; +//! +//! #[derive(Debug, Default)] +//! struct ComponentConfig; +//! +//! struct Component { +//! config: Box<dyn DynAccess<ComponentConfig>>, +//! } +//! +//! #[derive(Debug, Default)] +//! struct Config { +//! component: ComponentConfig, +//! } +//! +//! let config = Arc::new(ArcSwap::from_pointee(Config::default())); +//! +//! let component = Component { +//! config: Box::new(Map::new(Arc::clone(&config), |config: &Config| &config.component)), +//! }; +//! # let _ = component.config; +//! ``` +//! +//! One would use `Box::new(Constant(ComponentConfig))` in unittests instead as the `config` field. +//! +//! The [`Cache`] has its own [`Access`][crate::cache::Access] trait for similar purposes. +//! +//! [`Arc`]: std::sync::Arc +//! [`Guard`]: crate::Guard +//! [`load`]: crate::ArcSwapAny::load +//! [`ArcSwap`]: crate::ArcSwap +//! [`Cache`]: crate::cache::Cache +//! [`Access`]: crate::access::Access +//! [`DynAccess`]: crate::access::DynAccess +//! [`Constant`]: crate::access::Constant diff --git a/src/docs/performance.rs b/src/docs/performance.rs new file mode 100644 index 0000000..6358c87 --- /dev/null +++ b/src/docs/performance.rs @@ -0,0 +1,87 @@ +//! Performance characteristics. +//! +//! There are several performance advantages of [`ArcSwap`] over [`RwLock`]. +//! +//! ## Lock-free readers +//! +//! All the read operations are always [lock-free]. Most of the time, they are actually +//! [wait-free]. They are [lock-free] from time to time, with at least `usize::MAX / 4` accesses +//! that are [wait-free] in between. +//! +//! Writers are [lock-free]. +//! +//! Whenever the documentation talks about *contention* in the context of [`ArcSwap`], it talks +//! about contention on the CPU level ‒ multiple cores having to deal with accessing the same cache +//! line. This slows things down (compared to each one accessing its own cache line), but an +//! eventual progress is still guaranteed and the cost is significantly lower than parking threads +//! as with mutex-style contention. +//! +//! ## Speeds +//! +//! The base line speed of read operations is similar to using an *uncontended* [`Mutex`]. +//! However, [`load`] suffers no contention from any other read operations and only slight +//! ones during updates. The [`load_full`] operation is additionally contended only on +//! the reference count of the [`Arc`] inside ‒ so, in general, while [`Mutex`] rapidly +//! loses its performance when being in active use by multiple threads at once and +//! [`RwLock`] is slow to start with, [`ArcSwap`] mostly keeps its performance even when read by +//! many threads in parallel. +//! +//! Write operations are considered expensive. A write operation is more expensive than access to +//! an *uncontended* [`Mutex`] and on some architectures even slower than uncontended +//! [`RwLock`]. However, it is faster than either under contention. +//! +//! There are some (very unscientific) [benchmarks] within the source code of the library, and the +//! [`DefaultStrategy`][crate::DefaultStrategy] has some numbers measured on my computer. +//! +//! The exact numbers are highly dependant on the machine used (both absolute numbers and relative +//! between different data structures). Not only architectures have a huge impact (eg. x86 vs ARM), +//! but even AMD vs. Intel or two different Intel processors. Therefore, if what matters is more +//! the speed than the wait-free guarantees, you're advised to do your own measurements. +//! +//! Further speed improvements may be gained by the use of the [`Cache`]. +//! +//! ## Consistency +//! +//! The combination of [wait-free] guarantees of readers and no contention between concurrent +//! [`load`]s provides *consistent* performance characteristics of the synchronization mechanism. +//! This might be important for soft-realtime applications (the CPU-level contention caused by a +//! recent update/write operation might be problematic for some hard-realtime cases, though). +//! +//! ## Choosing the right reading operation +//! +//! There are several load operations available. While the general go-to one should be +//! [`load`], there may be situations in which the others are a better match. +//! +//! The [`load`] usually only borrows the instance from the shared [`ArcSwap`]. This makes +//! it faster, because different threads don't contend on the reference count. There are two +//! situations when this borrow isn't possible. If the content gets changed, all existing +//! [`Guard`]s are promoted to contain an owned instance. The promotion is done by the +//! writer, but the readers still need to decrement the reference counts of the old instance when +//! they no longer use it, contending on the count. +//! +//! The other situation derives from internal implementation. The number of borrows each thread can +//! have at each time (across all [`Guard`]s) is limited. If this limit is exceeded, an owned +//! instance is created instead. +//! +//! Therefore, if you intend to hold onto the loaded value for extended time span, you may prefer +//! [`load_full`]. It loads the pointer instance ([`Arc`]) without borrowing, which is +//! slower (because of the possible contention on the reference count), but doesn't consume one of +//! the borrow slots, which will make it more likely for following [`load`]s to have a slot +//! available. Similarly, if some API needs an owned `Arc`, [`load_full`] is more convenient and +//! potentially faster then first [`load`]ing and then cloning that [`Arc`]. +//! +//! Additionally, it is possible to use a [`Cache`] to get further speed improvement at the +//! cost of less comfortable API and possibly keeping the older values alive for longer than +//! necessary. +//! +//! [`ArcSwap`]: crate::ArcSwap +//! [`Cache`]: crate::cache::Cache +//! [`Guard`]: crate::Guard +//! [`load`]: crate::ArcSwapAny::load +//! [`load_full`]: crate::ArcSwapAny::load_full +//! [`Arc`]: std::sync::Arc +//! [`Mutex`]: std::sync::Mutex +//! [`RwLock`]: std::sync::RwLock +//! [benchmarks]: https://github.com/vorner/arc-swap/tree/master/benchmarks +//! [lock-free]: https://en.wikipedia.org/wiki/Non-blocking_algorithm#Lock-freedom +//! [wait-free]: https://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..598dfa0 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1306 @@ +#![doc(test(attr(deny(warnings))))] +#![warn(missing_docs)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![allow(deprecated)] + +//! Making [`Arc`][Arc] itself atomic +//! +//! The [`ArcSwap`] type is a container for an `Arc` that can be changed atomically. Semantically, +//! it is similar to something like `Atomic<Arc<T>>` (if there was such a thing) or +//! `RwLock<Arc<T>>` (but without the need for the locking). It is optimized for read-mostly +//! scenarios, with consistent performance characteristics. +//! +//! # Motivation +//! +//! There are many situations in which one might want to have some data structure that is often +//! read and seldom updated. Some examples might be a configuration of a service, routing tables, +//! snapshot of some data that is renewed every few minutes, etc. +//! +//! In all these cases one needs: +//! * Being able to read the current value of the data structure, fast, often and concurrently from +//! many threads. +//! * Using the same version of the data structure over longer period of time ‒ a query should be +//! answered by a consistent version of data, a packet should be routed either by an old or by a +//! new version of the routing table but not by a combination, etc. +//! * Perform an update without disrupting the processing. +//! +//! The first idea would be to use [`RwLock<T>`][RwLock] and keep a read-lock for the whole time of +//! processing. Update would, however, pause all processing until done. +//! +//! Better option would be to have [`RwLock<Arc<T>>`][RwLock]. Then one would lock, clone the [Arc] +//! and unlock. This suffers from CPU-level contention (on the lock and on the reference count of +//! the [Arc]) which makes it relatively slow. Depending on the implementation, an update may be +//! blocked for arbitrary long time by a steady inflow of readers. +//! +//! ```rust +//! # use std::sync::{Arc, RwLock}; +//! # use once_cell::sync::Lazy; +//! # struct RoutingTable; struct Packet; impl RoutingTable { fn route(&self, _: Packet) {} } +//! static ROUTING_TABLE: Lazy<RwLock<Arc<RoutingTable>>> = Lazy::new(|| { +//! RwLock::new(Arc::new(RoutingTable)) +//! }); +//! +//! fn process_packet(packet: Packet) { +//! let table = Arc::clone(&ROUTING_TABLE.read().unwrap()); +//! table.route(packet); +//! } +//! # fn main() { process_packet(Packet); } +//! ``` +//! +//! The [ArcSwap] can be used instead, which solves the above problems and has better performance +//! characteristics than the [RwLock], both in contended and non-contended scenarios. +//! +//! ```rust +//! # use arc_swap::ArcSwap; +//! # use once_cell::sync::Lazy; +//! # struct RoutingTable; struct Packet; impl RoutingTable { fn route(&self, _: Packet) {} } +//! static ROUTING_TABLE: Lazy<ArcSwap<RoutingTable>> = Lazy::new(|| { +//! ArcSwap::from_pointee(RoutingTable) +//! }); +//! +//! fn process_packet(packet: Packet) { +//! let table = ROUTING_TABLE.load(); +//! table.route(packet); +//! } +//! # fn main() { process_packet(Packet); } +//! ``` +//! +//! # Crate contents +//! +//! At the heart of the crate there are [`ArcSwap`] and [`ArcSwapOption`] types, containers for an +//! [`Arc`] and [`Option<Arc>`][Option]. +//! +//! Technically, these are type aliases for partial instantiations of the [`ArcSwapAny`] type. The +//! [`ArcSwapAny`] is more flexible and allows tweaking of many things (can store other things than +//! [`Arc`]s, can configure the locking [`Strategy`]). For details about the tweaking, see the +//! documentation of the [`strategy`] module and the [`RefCnt`] trait. +//! +//! The [`cache`] module provides means for speeding up read access of the contained data at the +//! cost of delayed reclamation. +//! +//! The [`access`] module can be used to do projections into the contained data to separate parts +//! of application from each other (eg. giving a component access to only its own part of +//! configuration while still having it reloaded as a whole). +//! +//! # Before using +//! +//! The data structure is a bit niche. Before using, please check the +//! [limitations and common pitfalls][docs::limitations] and the [performance +//! characteristics][docs::performance], including choosing the right [read +//! operation][docs::performance#read-operations]. +//! +//! You can also get an inspiration about what's possible in the [common patterns][docs::patterns] +//! section. +//! +//! # Examples +//! +//! ```rust +//! use std::sync::Arc; +//! +//! use arc_swap::ArcSwap; +//! use crossbeam_utils::thread; +//! +//! fn main() { +//! let config = ArcSwap::from(Arc::new(String::default())); +//! thread::scope(|scope| { +//! scope.spawn(|_| { +//! let new_conf = Arc::new("New configuration".to_owned()); +//! config.store(new_conf); +//! }); +//! for _ in 0..10 { +//! scope.spawn(|_| { +//! loop { +//! let cfg = config.load(); +//! if !cfg.is_empty() { +//! assert_eq!(**cfg, "New configuration"); +//! return; +//! } +//! } +//! }); +//! } +//! }).unwrap(); +//! } +//! ``` +//! +//! [RwLock]: https://doc.rust-lang.org/std/sync/struct.RwLock.html + +pub mod access; +mod as_raw; +pub mod cache; +mod compile_fail_tests; +mod debt; +pub mod docs; +mod ref_cnt; +#[cfg(feature = "serde")] +mod serde; +pub mod strategy; +#[cfg(feature = "weak")] +mod weak; + +use std::borrow::Borrow; +use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; +use std::marker::PhantomData; +use std::mem; +use std::ops::Deref; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::Arc; + +use crate::access::{Access, Map}; +pub use crate::as_raw::AsRaw; +pub use crate::cache::Cache; +pub use crate::ref_cnt::RefCnt; +use crate::strategy::hybrid::{DefaultConfig, HybridStrategy}; +use crate::strategy::sealed::Protected; +use crate::strategy::{CaS, Strategy}; +pub use crate::strategy::{DefaultStrategy, IndependentStrategy}; + +/// A temporary storage of the pointer. +/// +/// This guard object is returned from most loading methods (with the notable exception of +/// [`load_full`](struct.ArcSwapAny.html#method.load_full)). It dereferences to the smart pointer +/// loaded, so most operations are to be done using that. +pub struct Guard<T: RefCnt, S: Strategy<T> = DefaultStrategy> { + inner: S::Protected, +} + +impl<T: RefCnt, S: Strategy<T>> Guard<T, S> { + /// Converts it into the held value. + /// + /// This, on occasion, may be a tiny bit faster than cloning the Arc or whatever is being held + /// inside. + // Associated function on purpose, because of deref + #[allow(clippy::wrong_self_convention)] + #[inline] + pub fn into_inner(lease: Self) -> T { + lease.inner.into_inner() + } + + /// Create a guard for a given value `inner`. + /// + /// This can be useful on occasion to pass a specific object to code that expects or + /// wants to store a Guard. + /// + /// # Example + /// + /// ```rust + /// # use arc_swap::{ArcSwap, DefaultStrategy, Guard}; + /// # use std::sync::Arc; + /// # let p = ArcSwap::from_pointee(42); + /// // Create two guards pointing to the same object + /// let g1 = p.load(); + /// let g2 = Guard::<_, DefaultStrategy>::from_inner(Arc::clone(&*g1)); + /// # drop(g2); + /// ``` + pub fn from_inner(inner: T) -> Self { + Guard { + inner: S::Protected::from_inner(inner), + } + } +} + +impl<T: RefCnt, S: Strategy<T>> Deref for Guard<T, S> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + self.inner.borrow() + } +} + +impl<T: RefCnt, S: Strategy<T>> From<T> for Guard<T, S> { + fn from(inner: T) -> Self { + Self::from_inner(inner) + } +} + +impl<T: Default + RefCnt, S: Strategy<T>> Default for Guard<T, S> { + fn default() -> Self { + Self::from(T::default()) + } +} + +impl<T: Debug + RefCnt, S: Strategy<T>> Debug for Guard<T, S> { + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + self.deref().fmt(formatter) + } +} + +impl<T: Display + RefCnt, S: Strategy<T>> Display for Guard<T, S> { + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + self.deref().fmt(formatter) + } +} + +/// Comparison of two pointer-like things. +// A and B are likely to *be* references, or thin wrappers around that. Calling that with extra +// reference is just annoying. +#[allow(clippy::needless_pass_by_value)] +fn ptr_eq<Base, A, B>(a: A, b: B) -> bool +where + A: AsRaw<Base>, + B: AsRaw<Base>, +{ + let a = a.as_raw(); + let b = b.as_raw(); + ptr::eq(a, b) +} + +/// An atomic storage for a reference counted smart pointer like [`Arc`] or `Option<Arc>`. +/// +/// This is a storage where a smart pointer may live. It can be read and written atomically from +/// several threads, but doesn't act like a pointer itself. +/// +/// One can be created [`from`] an [`Arc`]. To get the pointer back, use the +/// [`load`](#method.load). +/// +/// # Note +/// +/// This is the common generic implementation. This allows sharing the same code for storing +/// both `Arc` and `Option<Arc>` (and possibly other similar types). +/// +/// In your code, you most probably want to interact with it through the +/// [`ArcSwap`](type.ArcSwap.html) and [`ArcSwapOption`](type.ArcSwapOption.html) aliases. However, +/// the methods they share are described here and are applicable to both of them. That's why the +/// examples here use `ArcSwap` ‒ but they could as well be written with `ArcSwapOption` or +/// `ArcSwapAny`. +/// +/// # Type parameters +/// +/// * `T`: The smart pointer to be kept inside. This crate provides implementation for `Arc<_>` and +/// `Option<Arc<_>>` (`Rc` too, but that one is not practically useful). But third party could +/// provide implementations of the [`RefCnt`] trait and plug in others. +/// * `S`: Chooses the [strategy] used to protect the data inside. They come with various +/// performance trade offs, the default [`DefaultStrategy`] is good rule of thumb for most use +/// cases. +/// +/// # Examples +/// +/// ```rust +/// # use std::sync::Arc; +/// # use arc_swap::ArcSwap; +/// let arc = Arc::new(42); +/// let arc_swap = ArcSwap::from(arc); +/// assert_eq!(42, **arc_swap.load()); +/// // It can be read multiple times +/// assert_eq!(42, **arc_swap.load()); +/// +/// // Put a new one in there +/// let new_arc = Arc::new(0); +/// assert_eq!(42, *arc_swap.swap(new_arc)); +/// assert_eq!(0, **arc_swap.load()); +/// ``` +/// +/// # Known bugs +/// +/// Currently, things like `ArcSwapAny<Option<Option<Arc<_>>>>` (notice the double Option) don't +/// work properly. A proper solution is being looked into +/// ([#81](https://github.com/vorner/arc-swap/issues)). +/// +/// [`Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html +/// [`from`]: https://doc.rust-lang.org/nightly/std/convert/trait.From.html#tymethod.from +/// [`RefCnt`]: trait.RefCnt.html +pub struct ArcSwapAny<T: RefCnt, S: Strategy<T> = DefaultStrategy> { + // Notes: AtomicPtr needs Sized + /// The actual pointer, extracted from the Arc. + ptr: AtomicPtr<T::Base>, + + /// We are basically an Arc in disguise. Inherit parameters from Arc by pretending to contain + /// it. + _phantom_arc: PhantomData<T>, + + /// Strategy to protect the data. + strategy: S, +} + +impl<T: RefCnt, S: Default + Strategy<T>> From<T> for ArcSwapAny<T, S> { + fn from(val: T) -> Self { + Self::with_strategy(val, S::default()) + } +} + +impl<T: RefCnt, S: Strategy<T>> Drop for ArcSwapAny<T, S> { + fn drop(&mut self) { + let ptr = *self.ptr.get_mut(); + unsafe { + // To pay any possible debts + self.strategy.wait_for_readers(ptr, &self.ptr); + // We are getting rid of the one stored ref count + T::dec(ptr); + } + } +} + +impl<T, S: Strategy<T>> Debug for ArcSwapAny<T, S> +where + T: Debug + RefCnt, +{ + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + formatter + .debug_tuple("ArcSwapAny") + .field(&self.load()) + .finish() + } +} + +impl<T, S: Strategy<T>> Display for ArcSwapAny<T, S> +where + T: Display + RefCnt, +{ + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + self.load().fmt(formatter) + } +} + +impl<T: RefCnt + Default, S: Default + Strategy<T>> Default for ArcSwapAny<T, S> { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl<T: RefCnt, S: Strategy<T>> ArcSwapAny<T, S> { + /// Constructs a new storage. + pub fn new(val: T) -> Self + where + S: Default, + { + Self::from(val) + } + + /// Constructs a new storage while customizing the protection strategy. + pub fn with_strategy(val: T, strategy: S) -> Self { + // The AtomicPtr requires *mut in its interface. We are more like *const, so we cast it. + // However, we always go back to *const right away when we get the pointer on the other + // side, so it should be fine. + let ptr = T::into_ptr(val); + Self { + ptr: AtomicPtr::new(ptr), + _phantom_arc: PhantomData, + strategy, + } + } + + /// Extracts the value inside. + pub fn into_inner(mut self) -> T { + let ptr = *self.ptr.get_mut(); + // To pay all the debts + unsafe { self.strategy.wait_for_readers(ptr, &self.ptr) }; + mem::forget(self); + unsafe { T::from_ptr(ptr) } + } + + /// Loads the value. + /// + /// This makes another copy of the held pointer and returns it, atomically (it is + /// safe even when other thread stores into the same instance at the same time). + /// + /// The method is lock-free and wait-free, but usually more expensive than + /// [`load`](#method.load). + pub fn load_full(&self) -> T { + Guard::into_inner(self.load()) + } + + /// Provides a temporary borrow of the object inside. + /// + /// This returns a proxy object allowing access to the thing held inside. However, there's + /// only limited amount of possible cheap proxies in existence for each thread ‒ if more are + /// created, it falls back to equivalent of [`load_full`](#method.load_full) internally. + /// + /// This is therefore a good choice to use for eg. searching a data structure or juggling the + /// pointers around a bit, but not as something to store in larger amounts. The rule of thumb + /// is this is suited for local variables on stack, but not in long-living data structures. + /// + /// # Consistency + /// + /// In case multiple related operations are to be done on the loaded value, it is generally + /// recommended to call `load` just once and keep the result over calling it multiple times. + /// First, keeping it is usually faster. But more importantly, the value can change between the + /// calls to load, returning different objects, which could lead to logical inconsistency. + /// Keeping the result makes sure the same object is used. + /// + /// ```rust + /// # use arc_swap::ArcSwap; + /// struct Point { + /// x: usize, + /// y: usize, + /// } + /// + /// fn print_broken(p: &ArcSwap<Point>) { + /// // This is broken, because the x and y may come from different points, + /// // combining into an invalid point that never existed. + /// println!("X: {}", p.load().x); + /// // If someone changes the content now, between these two loads, we + /// // have a problem + /// println!("Y: {}", p.load().y); + /// } + /// + /// fn print_correct(p: &ArcSwap<Point>) { + /// // Here we take a snapshot of one specific point so both x and y come + /// // from the same one. + /// let point = p.load(); + /// println!("X: {}", point.x); + /// println!("Y: {}", point.y); + /// } + /// # let p = ArcSwap::from_pointee(Point { x: 10, y: 20 }); + /// # print_correct(&p); + /// # print_broken(&p); + /// ``` + #[inline] + pub fn load(&self) -> Guard<T, S> { + let protected = unsafe { self.strategy.load(&self.ptr) }; + Guard { inner: protected } + } + + /// Replaces the value inside this instance. + /// + /// Further loads will yield the new value. Uses [`swap`](#method.swap) internally. + pub fn store(&self, val: T) { + drop(self.swap(val)); + } + + /// Exchanges the value inside this instance. + pub fn swap(&self, new: T) -> T { + let new = T::into_ptr(new); + // AcqRel needed to publish the target of the new pointer and get the target of the old + // one. + // + // SeqCst to synchronize the time lines with the group counters. + let old = self.ptr.swap(new, Ordering::SeqCst); + unsafe { + self.strategy.wait_for_readers(old, &self.ptr); + T::from_ptr(old) + } + } + + /// Swaps the stored Arc if it equals to `current`. + /// + /// If the current value of the `ArcSwapAny` equals to `current`, the `new` is stored inside. + /// If not, nothing happens. + /// + /// The previous value (no matter if the swap happened or not) is returned. Therefore, if the + /// returned value is equal to `current`, the swap happened. You want to do a pointer-based + /// comparison to determine it. + /// + /// In other words, if the caller „guesses“ the value of current correctly, it acts like + /// [`swap`](#method.swap), otherwise it acts like [`load_full`](#method.load_full) (including + /// the limitations). + /// + /// The `current` can be specified as `&Arc`, [`Guard`](struct.Guard.html), + /// [`&Guards`](struct.Guards.html) or as a raw pointer (but _not_ owned `Arc`). See the + /// [`AsRaw`] trait. + pub fn compare_and_swap<C>(&self, current: C, new: T) -> Guard<T, S> + where + C: AsRaw<T::Base>, + S: CaS<T>, + { + let protected = unsafe { self.strategy.compare_and_swap(&self.ptr, current, new) }; + Guard { inner: protected } + } + + /// Read-Copy-Update of the pointer inside. + /// + /// This is useful in read-heavy situations with several threads that sometimes update the data + /// pointed to. The readers can just repeatedly use [`load`](#method.load) without any locking. + /// The writer uses this method to perform the update. + /// + /// In case there's only one thread that does updates or in case the next version is + /// independent of the previous one, simple [`swap`](#method.swap) or [`store`](#method.store) + /// is enough. Otherwise, it may be needed to retry the update operation if some other thread + /// made an update in between. This is what this method does. + /// + /// # Examples + /// + /// This will *not* work as expected, because between loading and storing, some other thread + /// might have updated the value. + /// + /// ```rust + /// # use std::sync::Arc; + /// # + /// # use arc_swap::ArcSwap; + /// # use crossbeam_utils::thread; + /// # + /// let cnt = ArcSwap::from_pointee(0); + /// thread::scope(|scope| { + /// for _ in 0..10 { + /// scope.spawn(|_| { + /// let inner = cnt.load_full(); + /// // Another thread might have stored some other number than what we have + /// // between the load and store. + /// cnt.store(Arc::new(*inner + 1)); + /// }); + /// } + /// }).unwrap(); + /// // This will likely fail: + /// // assert_eq!(10, *cnt.load_full()); + /// ``` + /// + /// This will, but it can call the closure multiple times to retry: + /// + /// ```rust + /// # use arc_swap::ArcSwap; + /// # use crossbeam_utils::thread; + /// # + /// let cnt = ArcSwap::from_pointee(0); + /// thread::scope(|scope| { + /// for _ in 0..10 { + /// scope.spawn(|_| cnt.rcu(|inner| **inner + 1)); + /// } + /// }).unwrap(); + /// assert_eq!(10, *cnt.load_full()); + /// ``` + /// + /// Due to the retries, you might want to perform all the expensive operations *before* the + /// rcu. As an example, if there's a cache of some computations as a map, and the map is cheap + /// to clone but the computations are not, you could do something like this: + /// + /// ```rust + /// # use std::collections::HashMap; + /// # + /// # use arc_swap::ArcSwap; + /// # use once_cell::sync::Lazy; + /// # + /// fn expensive_computation(x: usize) -> usize { + /// x * 2 // Let's pretend multiplication is *really expensive expensive* + /// } + /// + /// type Cache = HashMap<usize, usize>; + /// + /// static CACHE: Lazy<ArcSwap<Cache>> = Lazy::new(|| ArcSwap::default()); + /// + /// fn cached_computation(x: usize) -> usize { + /// let cache = CACHE.load(); + /// if let Some(result) = cache.get(&x) { + /// return *result; + /// } + /// // Not in cache. Compute and store. + /// // The expensive computation goes outside, so it is not retried. + /// let result = expensive_computation(x); + /// CACHE.rcu(|cache| { + /// // The cheaper clone of the cache can be retried if need be. + /// let mut cache = HashMap::clone(&cache); + /// cache.insert(x, result); + /// cache + /// }); + /// result + /// } + /// + /// assert_eq!(42, cached_computation(21)); + /// assert_eq!(42, cached_computation(21)); + /// ``` + /// + /// # The cost of cloning + /// + /// Depending on the size of cache above, the cloning might not be as cheap. You can however + /// use persistent data structures ‒ each modification creates a new data structure, but it + /// shares most of the data with the old one (which is usually accomplished by using `Arc`s + /// inside to share the unchanged values). Something like + /// [`rpds`](https://crates.io/crates/rpds) or [`im`](https://crates.io/crates/im) might do + /// what you need. + pub fn rcu<R, F>(&self, mut f: F) -> T + where + F: FnMut(&T) -> R, + R: Into<T>, + S: CaS<T>, + { + let mut cur = self.load(); + loop { + let new = f(&cur).into(); + let prev = self.compare_and_swap(&*cur, new); + let swapped = ptr_eq(&*cur, &*prev); + if swapped { + return Guard::into_inner(prev); + } else { + cur = prev; + } + } + } + + /// Provides an access to an up to date projection of the carried data. + /// + /// # Motivation + /// + /// Sometimes, an application consists of components. Each component has its own configuration + /// structure. The whole configuration contains all the smaller config parts. + /// + /// For the sake of separation and abstraction, it is not desirable to pass the whole + /// configuration to each of the components. This allows the component to take only access to + /// its own part. + /// + /// # Lifetimes & flexibility + /// + /// This method is not the most flexible way, as the returned type borrows into the `ArcSwap`. + /// To provide access into eg. `Arc<ArcSwap<T>>`, you can create the [`Map`] type directly. See + /// the [`access`] module. + /// + /// # Performance + /// + /// As the provided function is called on each load from the shared storage, it should + /// generally be cheap. It is expected this will usually be just referencing of a field inside + /// the structure. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::Arc; + /// + /// use arc_swap::ArcSwap; + /// use arc_swap::access::Access; + /// + /// struct Cfg { + /// value: usize, + /// } + /// + /// fn print_many_times<V: Access<usize>>(value: V) { + /// for _ in 0..25 { + /// let value = value.load(); + /// println!("{}", *value); + /// } + /// } + /// + /// let shared = ArcSwap::from_pointee(Cfg { value: 0 }); + /// let mapped = shared.map(|c: &Cfg| &c.value); + /// crossbeam_utils::thread::scope(|s| { + /// // Will print some zeroes and some twos + /// s.spawn(|_| print_many_times(mapped)); + /// s.spawn(|_| shared.store(Arc::new(Cfg { value: 2 }))); + /// }).expect("Something panicked in a thread"); + /// ``` + pub fn map<I, R, F>(&self, f: F) -> Map<&Self, I, F> + where + F: Fn(&I) -> &R + Clone, + Self: Access<I>, + { + Map::new(self, f) + } +} + +/// An atomic storage for `Arc`. +/// +/// This is a type alias only. Most of its methods are described on +/// [`ArcSwapAny`](struct.ArcSwapAny.html). +pub type ArcSwap<T> = ArcSwapAny<Arc<T>>; + +impl<T, S: Strategy<Arc<T>>> ArcSwapAny<Arc<T>, S> { + /// A convenience constructor directly from the pointed-to value. + /// + /// Direct equivalent for `ArcSwap::new(Arc::new(val))`. + pub fn from_pointee(val: T) -> Self + where + S: Default, + { + Self::from(Arc::new(val)) + } +} + +/// An atomic storage for `Option<Arc>`. +/// +/// This is very similar to [`ArcSwap`](type.ArcSwap.html), but allows storing NULL values, which +/// is useful in some situations. +/// +/// This is a type alias only. Most of the methods are described on +/// [`ArcSwapAny`](struct.ArcSwapAny.html). Even though the examples there often use `ArcSwap`, +/// they are applicable to `ArcSwapOption` with appropriate changes. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use arc_swap::ArcSwapOption; +/// +/// let shared = ArcSwapOption::from(None); +/// assert!(shared.load_full().is_none()); +/// assert!(shared.swap(Some(Arc::new(42))).is_none()); +/// assert_eq!(42, **shared.load_full().as_ref().unwrap()); +/// ``` +pub type ArcSwapOption<T> = ArcSwapAny<Option<Arc<T>>>; + +impl<T, S: Strategy<Option<Arc<T>>>> ArcSwapAny<Option<Arc<T>>, S> { + /// A convenience constructor directly from a pointed-to value. + /// + /// This just allocates the `Arc` under the hood. + /// + /// # Examples + /// + /// ```rust + /// use arc_swap::ArcSwapOption; + /// + /// let empty: ArcSwapOption<usize> = ArcSwapOption::from_pointee(None); + /// assert!(empty.load().is_none()); + /// let non_empty: ArcSwapOption<usize> = ArcSwapOption::from_pointee(42); + /// assert_eq!(42, **non_empty.load().as_ref().unwrap()); + /// ``` + pub fn from_pointee<V: Into<Option<T>>>(val: V) -> Self + where + S: Default, + { + Self::new(val.into().map(Arc::new)) + } + + /// A convenience constructor for an empty value. + /// + /// This is equivalent to `ArcSwapOption::new(None)`. + pub fn empty() -> Self + where + S: Default, + { + Self::new(None) + } +} + +impl<T> ArcSwapOption<T> { + /// A const-fn equivalent of [empty]. + /// + /// Just like [empty], this creates an `None`-holding `ArcSwapOption`. The [empty] is, however, + /// more general ‒ this is available only for the default strategy, while [empty] is for any + /// [Default]-constructible strategy (current or future one). + /// + /// [empty]: ArcSwapAny::empty + /// + /// # Examples + /// + /// ```rust + /// # use std::sync::Arc; + /// # use arc_swap::ArcSwapOption; + /// static GLOBAL_DATA: ArcSwapOption<usize> = ArcSwapOption::const_empty(); + /// + /// assert!(GLOBAL_DATA.load().is_none()); + /// GLOBAL_DATA.store(Some(Arc::new(42))); + /// assert_eq!(42, **GLOBAL_DATA.load().as_ref().unwrap()); + /// ``` + pub const fn const_empty() -> Self { + Self { + ptr: AtomicPtr::new(ptr::null_mut()), + _phantom_arc: PhantomData, + strategy: HybridStrategy { + _config: DefaultConfig, + }, + } + } +} + +/// An atomic storage that doesn't share the internal generation locks with others. +/// +/// This makes it bigger and it also might suffer contention (on the HW level) if used from many +/// threads at once. On the other hand, it can't block writes in other instances. +/// +/// See the [`IndependentStrategy`] for further details. +// Being phased out. Will deprecate once we verify in production that the new strategy works fine. +#[doc(hidden)] +pub type IndependentArcSwap<T> = ArcSwapAny<Arc<T>, IndependentStrategy>; + +/// Arc swap for the [Weak] pointer. +/// +/// This is similar to [ArcSwap], but it doesn't store [Arc], it stores [Weak]. It doesn't keep the +/// data alive when pointed to. +/// +/// This is a type alias only. Most of the methods are described on the +/// [`ArcSwapAny`](struct.ArcSwapAny.html). +/// +/// Needs the `weak` feature turned on. +/// +/// [Weak]: std::sync::Weak +#[cfg(feature = "weak")] +pub type ArcSwapWeak<T> = ArcSwapAny<std::sync::Weak<T>>; + +macro_rules! t { + ($name: ident, $strategy: ty) => { + #[cfg(test)] + mod $name { + use std::panic; + use std::sync::atomic::{self, AtomicUsize}; + + use adaptive_barrier::{Barrier, PanicMode}; + use crossbeam_utils::thread; + + use super::*; + + const ITERATIONS: usize = 10; + + #[allow(deprecated)] // We use "deprecated" testing strategies in here. + type As<T> = ArcSwapAny<Arc<T>, $strategy>; + #[allow(deprecated)] // We use "deprecated" testing strategies in here. + type Aso<T> = ArcSwapAny<Option<Arc<T>>, $strategy>; + + /// Similar to the one in doc tests of the lib, but more times and more intensive (we + /// want to torture it a bit). + #[test] + #[cfg_attr(miri, ignore)] // Takes like 1 or 2 infinities to run under miri + fn publish() { + const READERS: usize = 2; + for _ in 0..ITERATIONS { + let config = As::<String>::default(); + let ended = AtomicUsize::new(0); + thread::scope(|scope| { + for _ in 0..READERS { + scope.spawn(|_| loop { + let cfg = config.load_full(); + if !cfg.is_empty() { + assert_eq!(*cfg, "New configuration"); + ended.fetch_add(1, Ordering::Relaxed); + return; + } + atomic::spin_loop_hint(); + }); + } + scope.spawn(|_| { + let new_conf = Arc::new("New configuration".to_owned()); + config.store(new_conf); + }); + }) + .unwrap(); + assert_eq!(READERS, ended.load(Ordering::Relaxed)); + let arc = config.load_full(); + assert_eq!(2, Arc::strong_count(&arc)); + assert_eq!(0, Arc::weak_count(&arc)); + } + } + + /// Similar to the doc tests of ArcSwap, but happens more times. + #[test] + fn swap_load() { + for _ in 0..100 { + let arc = Arc::new(42); + let arc_swap = As::from(Arc::clone(&arc)); + assert_eq!(42, **arc_swap.load()); + // It can be read multiple times + assert_eq!(42, **arc_swap.load()); + + // Put a new one in there + let new_arc = Arc::new(0); + assert_eq!(42, *arc_swap.swap(Arc::clone(&new_arc))); + assert_eq!(0, **arc_swap.load()); + // One loaded here, one in the arc_swap, one in new_arc + let loaded = arc_swap.load_full(); + assert_eq!(3, Arc::strong_count(&loaded)); + assert_eq!(0, Arc::weak_count(&loaded)); + // The original got released from the arc_swap + assert_eq!(1, Arc::strong_count(&arc)); + assert_eq!(0, Arc::weak_count(&arc)); + } + } + + /// Two different writers publish two series of values. The readers check that it is + /// always increasing in each serie. + /// + /// For performance, we try to reuse the threads here. + #[test] + fn multi_writers() { + let first_value = Arc::new((0, 0)); + let shared = As::from(Arc::clone(&first_value)); + const WRITER_CNT: usize = 2; + const READER_CNT: usize = 3; + #[cfg(miri)] + const ITERATIONS: usize = 5; + #[cfg(not(miri))] + const ITERATIONS: usize = 100; + const SEQ: usize = 50; + let barrier = Barrier::new(PanicMode::Poison); + thread::scope(|scope| { + for w in 0..WRITER_CNT { + // We need to move w into the closure. But we want to just reference the + // other things. + let mut barrier = barrier.clone(); + let shared = &shared; + let first_value = &first_value; + scope.spawn(move |_| { + for _ in 0..ITERATIONS { + barrier.wait(); + shared.store(Arc::clone(&first_value)); + barrier.wait(); + for i in 0..SEQ { + shared.store(Arc::new((w, i + 1))); + } + } + }); + } + for _ in 0..READER_CNT { + let mut barrier = barrier.clone(); + let shared = &shared; + let first_value = &first_value; + scope.spawn(move |_| { + for _ in 0..ITERATIONS { + barrier.wait(); + barrier.wait(); + let mut previous = [0; WRITER_CNT]; + let mut last = Arc::clone(&first_value); + loop { + let cur = shared.load(); + if Arc::ptr_eq(&last, &cur) { + atomic::spin_loop_hint(); + continue; + } + let (w, s) = **cur; + assert!(previous[w] < s, "{:?} vs {:?}", previous, cur); + previous[w] = s; + last = Guard::into_inner(cur); + if s == SEQ { + break; + } + } + } + }); + } + + drop(barrier); + }) + .unwrap(); + } + + #[test] + fn load_null() { + let shared = Aso::<usize>::default(); + let guard = shared.load(); + assert!(guard.is_none()); + shared.store(Some(Arc::new(42))); + assert_eq!(42, **shared.load().as_ref().unwrap()); + } + + #[test] + fn from_into() { + let a = Arc::new(42); + let shared = As::new(a); + let guard = shared.load(); + let a = shared.into_inner(); + assert_eq!(42, *a); + assert_eq!(2, Arc::strong_count(&a)); + drop(guard); + assert_eq!(1, Arc::strong_count(&a)); + } + + // Note on the Relaxed order here. This should be enough, because there's that + // barrier.wait in between that should do the synchronization of happens-before for us. + // And using SeqCst would probably not help either, as there's nothing else with SeqCst + // here in this test to relate it to. + #[derive(Default)] + struct ReportDrop(Arc<AtomicUsize>); + impl Drop for ReportDrop { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::Relaxed); + } + } + + /// Interaction of two threads about a guard and dropping it. + /// + /// We make sure everything works in timely manner (eg. dropping of stuff) even if multiple + /// threads interact. + /// + /// The idea is: + /// * Thread 1 loads a value. + /// * Thread 2 replaces the shared value. The original value is not destroyed. + /// * Thread 1 drops the guard. The value is destroyed and this is observable in both threads. + #[test] + fn guard_drop_in_thread() { + for _ in 0..ITERATIONS { + let cnt = Arc::new(AtomicUsize::new(0)); + + let shared = As::from_pointee(ReportDrop(cnt.clone())); + assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely"); + // We need the threads to wait for each other at places. + let sync = Barrier::new(PanicMode::Poison); + + thread::scope(|scope| { + scope.spawn({ + let sync = sync.clone(); + |_| { + let mut sync = sync; // Move into the closure + let guard = shared.load(); + sync.wait(); + // Thread 2 replaces the shared value. We wait for it to confirm. + sync.wait(); + drop(guard); + assert_eq!(cnt.load(Ordering::Relaxed), 1, "Value not dropped"); + // Let thread 2 know we already dropped it. + sync.wait(); + } + }); + + scope.spawn(|_| { + let mut sync = sync; + // Thread 1 loads, we wait for that + sync.wait(); + shared.store(Default::default()); + assert_eq!( + cnt.load(Ordering::Relaxed), + 0, + "Dropped while still in use" + ); + // Let thread 2 know we replaced it + sync.wait(); + // Thread 1 drops its guard. We wait for it to confirm. + sync.wait(); + assert_eq!(cnt.load(Ordering::Relaxed), 1, "Value not dropped"); + }); + }) + .unwrap(); + } + } + + /// Check dropping a lease in a different thread than it was created doesn't cause any + /// problems. + #[test] + fn guard_drop_in_another_thread() { + for _ in 0..ITERATIONS { + let cnt = Arc::new(AtomicUsize::new(0)); + let shared = As::from_pointee(ReportDrop(cnt.clone())); + assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely"); + let guard = shared.load(); + + drop(shared); + assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely"); + + thread::scope(|scope| { + scope.spawn(|_| { + drop(guard); + }); + }) + .unwrap(); + + assert_eq!(cnt.load(Ordering::Relaxed), 1, "Not dropped"); + } + } + + #[test] + fn load_option() { + let shared = Aso::from_pointee(42); + // The type here is not needed in real code, it's just addition test the type matches. + let opt: Option<_> = Guard::into_inner(shared.load()); + assert_eq!(42, *opt.unwrap()); + + shared.store(None); + assert!(shared.load().is_none()); + } + + // Check stuff can get formatted + #[test] + fn debug_impl() { + let shared = As::from_pointee(42); + assert_eq!("ArcSwapAny(42)", &format!("{:?}", shared)); + assert_eq!("42", &format!("{:?}", shared.load())); + } + + #[test] + fn display_impl() { + let shared = As::from_pointee(42); + assert_eq!("42", &format!("{}", shared)); + assert_eq!("42", &format!("{}", shared.load())); + } + + // The following "tests" are not run, only compiled. They check that things that should be + // Send/Sync actually are. + fn _check_stuff_is_send_sync() { + let shared = As::from_pointee(42); + let moved = As::from_pointee(42); + let shared_ref = &shared; + let lease = shared.load(); + let lease_ref = &lease; + let lease = shared.load(); + thread::scope(|s| { + s.spawn(move |_| { + let _ = lease; + let _ = lease_ref; + let _ = shared_ref; + let _ = moved; + }); + }) + .unwrap(); + } + + /// We have a callback in RCU. Check what happens if we access the value from within. + #[test] + fn recursive() { + let shared = ArcSwap::from(Arc::new(0)); + + shared.rcu(|i| { + if **i < 10 { + shared.rcu(|i| **i + 1); + } + **i + }); + assert_eq!(10, **shared.load()); + assert_eq!(2, Arc::strong_count(&shared.load_full())); + } + + /// A panic from within the rcu callback should not change anything. + #[test] + fn rcu_panic() { + let shared = ArcSwap::from(Arc::new(0)); + assert!(panic::catch_unwind(|| shared.rcu(|_| -> usize { panic!() })).is_err()); + assert_eq!(1, Arc::strong_count(&shared.swap(Arc::new(42)))); + } + + /// Handling null/none values + #[test] + fn nulls() { + let shared = ArcSwapOption::from(Some(Arc::new(0))); + let orig = shared.swap(None); + assert_eq!(1, Arc::strong_count(&orig.unwrap())); + let null = shared.load(); + assert!(null.is_none()); + let a = Arc::new(42); + let orig = shared.compare_and_swap(ptr::null(), Some(Arc::clone(&a))); + assert!(orig.is_none()); + assert_eq!(2, Arc::strong_count(&a)); + let orig = Guard::into_inner(shared.compare_and_swap(&None::<Arc<_>>, None)); + assert_eq!(3, Arc::strong_count(&a)); + assert!(ptr_eq(&a, &orig)); + } + + #[test] + /// Multiple RCUs interacting. + fn rcu() { + const ITERATIONS: usize = 50; + const THREADS: usize = 10; + let shared = ArcSwap::from(Arc::new(0)); + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..ITERATIONS { + shared.rcu(|old| **old + 1); + } + }); + } + }) + .unwrap(); + assert_eq!(THREADS * ITERATIONS, **shared.load()); + } + + #[test] + /// Make sure the reference count and compare_and_swap works as expected. + fn cas_ref_cnt() { + #[cfg(miri)] + const ITERATIONS: usize = 10; + #[cfg(not(miri))] + const ITERATIONS: usize = 50; + let shared = ArcSwap::from(Arc::new(0)); + for i in 0..ITERATIONS { + let orig = shared.load_full(); + assert_eq!(i, *orig); + if i % 2 == 1 { + // One for orig, one for shared + assert_eq!(2, Arc::strong_count(&orig)); + } + let n1 = Arc::new(i + 1); + // Fill up the slots sometimes + let fillup = || { + if i % 2 == 0 { + Some((0..ITERATIONS).map(|_| shared.load()).collect::<Vec<_>>()) + } else { + None + } + }; + let guards = fillup(); + // Success + let prev = shared.compare_and_swap(&orig, Arc::clone(&n1)); + assert!(ptr_eq(&orig, &prev)); + drop(guards); + // One for orig, one for prev + assert_eq!(2, Arc::strong_count(&orig)); + // One for n1, one for shared + assert_eq!(2, Arc::strong_count(&n1)); + assert_eq!(i + 1, **shared.load()); + let n2 = Arc::new(i); + drop(prev); + let guards = fillup(); + // Failure + let prev = Guard::into_inner(shared.compare_and_swap(&orig, Arc::clone(&n2))); + drop(guards); + assert!(ptr_eq(&n1, &prev)); + // One for orig + assert_eq!(1, Arc::strong_count(&orig)); + // One for n1, one for shared, one for prev + assert_eq!(3, Arc::strong_count(&n1)); + // n2 didn't get increased + assert_eq!(1, Arc::strong_count(&n2)); + assert_eq!(i + 1, **shared.load()); + } + + let a = shared.load_full(); + // One inside shared, one for a + assert_eq!(2, Arc::strong_count(&a)); + drop(shared); + // Only a now + assert_eq!(1, Arc::strong_count(&a)); + } + } + }; +} + +t!(tests_default, DefaultStrategy); +#[cfg(all(feature = "internal-test-strategies", test))] +#[allow(deprecated)] +mod internal_strategies { + use super::*; + t!( + tests_full_slots, + crate::strategy::test_strategies::FillFastSlots + ); +} + +/// These tests assume details about the used strategy. +#[cfg(test)] +mod tests { + use super::*; + + /// Accessing the value inside ArcSwap with Guards (and checks for the reference + /// counts). + #[test] + fn load_cnt() { + let a = Arc::new(0); + let shared = ArcSwap::from(Arc::clone(&a)); + // One in shared, one in a + assert_eq!(2, Arc::strong_count(&a)); + let guard = shared.load(); + assert_eq!(0, **guard); + // The guard doesn't have its own ref count now + assert_eq!(2, Arc::strong_count(&a)); + let guard_2 = shared.load(); + // Unlike with guard, this does not deadlock + shared.store(Arc::new(1)); + // But now, each guard got a full Arc inside it + assert_eq!(3, Arc::strong_count(&a)); + // And when we get rid of them, they disappear + drop(guard_2); + assert_eq!(2, Arc::strong_count(&a)); + let _b = Arc::clone(&guard); + assert_eq!(3, Arc::strong_count(&a)); + // We can drop the guard it came from + drop(guard); + assert_eq!(2, Arc::strong_count(&a)); + let guard = shared.load(); + assert_eq!(1, **guard); + drop(shared); + // We can still use the guard after the shared disappears + assert_eq!(1, **guard); + let ptr = Arc::clone(&guard); + // One in shared, one in guard + assert_eq!(2, Arc::strong_count(&ptr)); + drop(guard); + assert_eq!(1, Arc::strong_count(&ptr)); + } + + /// There can be only limited amount of leases on one thread. Following ones are + /// created, but contain full Arcs. + #[test] + fn lease_overflow() { + #[cfg(miri)] + const GUARD_COUNT: usize = 100; + #[cfg(not(miri))] + const GUARD_COUNT: usize = 1000; + let a = Arc::new(0); + let shared = ArcSwap::from(Arc::clone(&a)); + assert_eq!(2, Arc::strong_count(&a)); + let mut guards = (0..GUARD_COUNT).map(|_| shared.load()).collect::<Vec<_>>(); + let count = Arc::strong_count(&a); + assert!(count > 2); + let guard = shared.load(); + assert_eq!(count + 1, Arc::strong_count(&a)); + drop(guard); + assert_eq!(count, Arc::strong_count(&a)); + // When we delete the first one, it didn't have an Arc in it, so the ref count + // doesn't drop + guards.swap_remove(0); + assert_eq!(count, Arc::strong_count(&a)); + // But new one reuses now vacant the slot and doesn't create a new Arc + let _guard = shared.load(); + assert_eq!(count, Arc::strong_count(&a)); + } +} diff --git a/src/ref_cnt.rs b/src/ref_cnt.rs new file mode 100644 index 0000000..b46070b --- /dev/null +++ b/src/ref_cnt.rs @@ -0,0 +1,175 @@ +use std::mem; +use std::ptr; +use std::rc::Rc; +use std::sync::Arc; + +/// A trait describing smart reference counted pointers. +/// +/// Note that in a way [`Option<Arc<T>>`][Option] is also a smart reference counted pointer, just +/// one that can hold NULL. +/// +/// The trait is unsafe, because a wrong implementation will break the [ArcSwapAny] +/// implementation and lead to UB. +/// +/// This is not actually expected for downstream crate to implement, this is just means to reuse +/// code for [Arc] and [`Option<Arc>`][Option] variants. However, it is theoretically possible (if +/// you have your own [Arc] implementation). +/// +/// It is also implemented for [Rc], but that is not considered very useful (because the +/// [ArcSwapAny] is not `Send` or `Sync`, therefore there's very little advantage for it to be +/// atomic). +/// +/// # Safety +/// +/// Aside from the obvious properties (like that incrementing and decrementing a reference count +/// cancel each out and that having less references tracked than how many things actually point to +/// the value is fine as long as the count doesn't drop to 0), it also must satisfy that if two +/// pointers have the same value, they point to the same object. This is specifically not true for +/// ZSTs, but it is true for `Arc`s of ZSTs, because they have the reference counts just after the +/// value. It would be fine to point to a type-erased version of the same object, though (if one +/// could use this trait with unsized types in the first place). +/// +/// Furthermore, the type should be Pin (eg. if the type is cloned or moved, it should still +/// point/deref to the same place in memory). +/// +/// [Arc]: std::sync::Arc +/// [Rc]: std::rc::Rc +/// [ArcSwapAny]: crate::ArcSwapAny +pub unsafe trait RefCnt: Clone { + /// The base type the pointer points to. + type Base; + + /// Converts the smart pointer into a raw pointer, without affecting the reference count. + /// + /// This can be seen as kind of freezing the pointer ‒ it'll be later converted back using + /// [`from_ptr`](#method.from_ptr). + /// + /// The pointer must point to the value stored (and the value must be the same as one returned + /// by [`as_ptr`](#method.as_ptr). + fn into_ptr(me: Self) -> *mut Self::Base; + + /// Provides a view into the smart pointer as a raw pointer. + /// + /// This must not affect the reference count ‒ the pointer is only borrowed. + fn as_ptr(me: &Self) -> *mut Self::Base; + + /// Converts a raw pointer back into the smart pointer, without affecting the reference count. + /// + /// This is only called on values previously returned by [`into_ptr`](#method.into_ptr). + /// However, it is not guaranteed to be 1:1 relation ‒ `from_ptr` may be called more times than + /// `into_ptr` temporarily provided the reference count never drops under 1 during that time + /// (the implementation sometimes owes a reference). These extra pointers will either be + /// converted back using `into_ptr` or forgotten. + /// + /// # Safety + /// + /// This must not be called by code outside of this crate. + unsafe fn from_ptr(ptr: *const Self::Base) -> Self; + + /// Increments the reference count by one. + /// + /// Return the pointer to the inner thing as a side effect. + fn inc(me: &Self) -> *mut Self::Base { + Self::into_ptr(Self::clone(me)) + } + + /// Decrements the reference count by one. + /// + /// Note this is called on a raw pointer (one previously returned by + /// [`into_ptr`](#method.into_ptr). This may lead to dropping of the reference count to 0 and + /// destruction of the internal pointer. + /// + /// # Safety + /// + /// This must not be called by code outside of this crate. + unsafe fn dec(ptr: *const Self::Base) { + drop(Self::from_ptr(ptr)); + } +} + +unsafe impl<T> RefCnt for Arc<T> { + type Base = T; + fn into_ptr(me: Arc<T>) -> *mut T { + Arc::into_raw(me) as *mut T + } + fn as_ptr(me: &Arc<T>) -> *mut T { + // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same + // intention as + // + // me as &T as *const T as *mut T + // + // We first create a "shallow copy" of me - one that doesn't really own its ref count + // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). + // Then we can use into_raw (which preserves not having the ref count). + // + // We need to "revert" the changes we did. In current std implementation, the combination + // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw + // and that read shall be paired with forget to properly "close the brackets". In future + // versions of STD, these may become something else that's not really no-op (unlikely, but + // possible), so we future-proof it a bit. + + // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads + let ptr = Arc::into_raw(unsafe { std::ptr::read(me) }); + let ptr = ptr as *mut T; + + // SAFETY: We got the pointer from into_raw just above + mem::forget(unsafe { Arc::from_raw(ptr) }); + + ptr + } + unsafe fn from_ptr(ptr: *const T) -> Arc<T> { + Arc::from_raw(ptr) + } +} + +unsafe impl<T> RefCnt for Rc<T> { + type Base = T; + fn into_ptr(me: Rc<T>) -> *mut T { + Rc::into_raw(me) as *mut T + } + fn as_ptr(me: &Rc<T>) -> *mut T { + // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same + // intention as + // + // me as &T as *const T as *mut T + // + // We first create a "shallow copy" of me - one that doesn't really own its ref count + // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). + // Then we can use into_raw (which preserves not having the ref count). + // + // We need to "revert" the changes we did. In current std implementation, the combination + // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw + // and that read shall be paired with forget to properly "close the brackets". In future + // versions of STD, these may become something else that's not really no-op (unlikely, but + // possible), so we future-proof it a bit. + + // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads + let ptr = Rc::into_raw(unsafe { std::ptr::read(me) }); + let ptr = ptr as *mut T; + + // SAFETY: We got the pointer from into_raw just above + mem::forget(unsafe { Rc::from_raw(ptr) }); + + ptr + } + unsafe fn from_ptr(ptr: *const T) -> Rc<T> { + Rc::from_raw(ptr) + } +} + +unsafe impl<T: RefCnt> RefCnt for Option<T> { + type Base = T::Base; + fn into_ptr(me: Option<T>) -> *mut T::Base { + me.map(T::into_ptr).unwrap_or_else(ptr::null_mut) + } + fn as_ptr(me: &Option<T>) -> *mut T::Base { + me.as_ref().map(T::as_ptr).unwrap_or_else(ptr::null_mut) + } + unsafe fn from_ptr(ptr: *const T::Base) -> Option<T> { + if ptr.is_null() { + None + } else { + Some(T::from_ptr(ptr)) + } + } +} diff --git a/src/serde.rs b/src/serde.rs new file mode 100644 index 0000000..95ecf3f --- /dev/null +++ b/src/serde.rs @@ -0,0 +1,132 @@ +use crate::{ArcSwapAny, RefCnt, Strategy}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +impl<T, S> Serialize for ArcSwapAny<T, S> +where + T: RefCnt + Serialize, + S: Strategy<T>, +{ + fn serialize<Ser: Serializer>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error> { + self.load().serialize(serializer) + } +} + +impl<'de, T, S> Deserialize<'de> for ArcSwapAny<T, S> +where + T: RefCnt + Deserialize<'de>, + S: Strategy<T> + Default, +{ + fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> { + Ok(Self::from(T::deserialize(deserializer)?)) + } +} + +#[cfg(test)] +mod tests { + use crate::{ArcSwap, ArcSwapAny, ArcSwapOption, RefCnt}; + use serde_derive::{Deserialize, Serialize}; + use serde_test::{assert_tokens, Token}; + use std::sync::Arc; + + #[derive(Debug, Serialize, Deserialize)] + #[serde(transparent)] + struct ArcSwapAnyEq<T: RefCnt>(ArcSwapAny<T>); + impl<T: RefCnt + PartialEq> PartialEq for ArcSwapAnyEq<T> { + fn eq(&self, other: &Self) -> bool { + self.0.load().eq(&other.0.load()) + } + } + impl<T: RefCnt + PartialEq> Eq for ArcSwapAnyEq<T> {} + + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + struct Foo { + field0: u64, + field1: String, + } + + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + struct Bar { + field0: ArcSwapAnyEq<Arc<u64>>, + field1: ArcSwapAnyEq<Option<Arc<String>>>, + } + + #[test] + fn test_serialize_deserialize() { + let field0 = u64::MAX; + let field1 = "FOO_-0123456789"; + + let data_orig = Foo { + field0, + field1: field1.to_string(), + }; + let data = ArcSwapAnyEq(ArcSwap::from_pointee(data_orig)); + assert_tokens( + &data, + &[ + Token::Struct { + name: "Foo", + len: 2, + }, + Token::Str("field0"), + Token::U64(u64::MAX), + Token::Str("field1"), + Token::String(field1), + Token::StructEnd, + ], + ); + + let data = Bar { + field0: ArcSwapAnyEq(ArcSwap::from_pointee(field0)), + field1: ArcSwapAnyEq(ArcSwapOption::from_pointee(field1.to_string())), + }; + assert_tokens( + &data, + &[ + Token::Struct { + name: "Bar", + len: 2, + }, + Token::Str("field0"), + Token::U64(u64::MAX), + Token::Str("field1"), + Token::Some, + Token::String(field1), + Token::StructEnd, + ], + ); + } + + #[test] + fn test_serialize_deserialize_option() { + let field0 = u64::MAX; + let field1 = "FOO_-0123456789"; + + let data_orig = Foo { + field0, + field1: field1.to_string(), + }; + let data = ArcSwapAnyEq(ArcSwapOption::from_pointee(data_orig)); + assert_tokens( + &data, + &[ + Token::Some, + Token::Struct { + name: "Foo", + len: 2, + }, + Token::Str("field0"), + Token::U64(u64::MAX), + Token::Str("field1"), + Token::String(field1), + Token::StructEnd, + ], + ); + } + + #[test] + fn test_serialize_deserialize_option_none() { + let data = ArcSwapAnyEq(ArcSwapOption::<Foo>::from_pointee(None)); + + assert_tokens(&data, &[Token::None]); + } +} diff --git a/src/strategy/hybrid.rs b/src/strategy/hybrid.rs new file mode 100644 index 0000000..c47d6b8 --- /dev/null +++ b/src/strategy/hybrid.rs @@ -0,0 +1,235 @@ +//! A hybrid strategy. +//! +//! This is based on debts ‒ an Arc may owe a reference, but it is marked in the debt. It is either +//! put back (by stopping using it), or if the pointer is replaced, the writer bumps the reference +//! count and removes the debt. +//! +//! The strategy uses two different slots for the debts. The first ones are faster, but fallible. +//! If they fail (either because there's interference from a writer at the same time, or because +//! they are full), the secondary one that is slower, but always succeeds, is used. In the latter +//! case, the reference is bumped and this secondary debt slot is released, so it is available for +//! further loads. +//! +//! See the [crate::debt] module for the actual slot manipulation. Here we just wrap them into the +//! strategy. + +use std::borrow::Borrow; +use std::mem::{self, ManuallyDrop}; +use std::ops::Deref; +use std::ptr; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::Ordering::*; + +use super::sealed::{CaS, InnerStrategy, Protected}; +use crate::debt::{Debt, LocalNode}; +use crate::ref_cnt::RefCnt; + +pub struct HybridProtection<T: RefCnt> { + debt: Option<&'static Debt>, + ptr: ManuallyDrop<T>, +} + +impl<T: RefCnt> HybridProtection<T> { + pub(super) unsafe fn new(ptr: *const T::Base, debt: Option<&'static Debt>) -> Self { + Self { + debt, + ptr: ManuallyDrop::new(T::from_ptr(ptr)), + } + } + + /// Try getting a dept into a fast slot. + #[inline] + fn attempt(node: &LocalNode, storage: &AtomicPtr<T::Base>) -> Option<Self> { + // Relaxed is good enough here, see the Acquire below + let ptr = storage.load(Relaxed); + // Try to get a debt slot. If not possible, fail. + let debt = node.new_fast(ptr as usize)?; + + // Acquire to get the data. + // + // SeqCst to make sure the storage vs. the debt are well ordered. + let confirm = storage.load(SeqCst); + if ptr == confirm { + // Successfully got a debt + Some(unsafe { Self::new(ptr, Some(debt)) }) + } else if debt.pay::<T>(ptr) { + // It changed in the meantime, we return the debt (that is on the outdated pointer, + // possibly destroyed) and fail. + None + } else { + // It changed in the meantime, but the debt for the previous pointer was already paid + // for by someone else, so we are fine using it. + Some(unsafe { Self::new(ptr, None) }) + } + } + + /// Get a debt slot using the slower but always successful mechanism. + fn fallback(node: &LocalNode, storage: &AtomicPtr<T::Base>) -> Self { + // First, we claim a debt slot and store the address of the atomic pointer there, so the + // writer can optionally help us out with loading and protecting something. + let gen = node.new_helping(storage as *const _ as usize); + // We already synchronized the start of the sequence by SeqCst in the new_helping vs swap on + // the pointer. We just need to make sure to bring the pointee in (this can be newer than + // what we got in the Debt) + let candidate = storage.load(Acquire); + + // Try to replace the debt with our candidate. If it works, we get the debt slot to use. If + // not, we get a replacement value, already protected and a debt to take care of. + match node.confirm_helping(gen, candidate as usize) { + Ok(debt) => { + // The fast path -> we got the debt confirmed alright. + Self::from_inner(unsafe { Self::new(candidate, Some(debt)).into_inner() }) + } + Err((unused_debt, replacement)) => { + // The debt is on the candidate we provided and it is unused, we so we just pay it + // back right away. + if !unused_debt.pay::<T>(candidate) { + unsafe { T::dec(candidate) }; + } + // We got a (possibly) different pointer out. But that one is already protected and + // the slot is paid back. + unsafe { Self::new(replacement as *mut _, None) } + } + } + } + + #[inline] + fn as_ptr(&self) -> *const T::Base { + T::as_ptr(self.ptr.deref()) + } +} + +impl<T: RefCnt> Drop for HybridProtection<T> { + #[inline] + fn drop(&mut self) { + match self.debt.take() { + // We have our own copy of Arc, so we don't need a protection. Do nothing (but release + // the Arc below). + None => (), + // If we owed something, just return the debt. We don't have a pointer owned, so + // nothing to release. + Some(debt) => { + let ptr = T::as_ptr(&self.ptr); + if debt.pay::<T>(ptr) { + return; + } + // But if the debt was already paid for us, we need to release the pointer, as we + // were effectively already in the Unprotected mode. + } + } + // Equivalent to T::dec(ptr) + unsafe { ManuallyDrop::drop(&mut self.ptr) }; + } +} + +impl<T: RefCnt> Protected<T> for HybridProtection<T> { + #[inline] + fn from_inner(ptr: T) -> Self { + Self { + debt: None, + ptr: ManuallyDrop::new(ptr), + } + } + + #[inline] + fn into_inner(mut self) -> T { + // Drop any debt and release any lock held by the given guard and return a + // full-featured value that even can outlive the ArcSwap it originated from. + match self.debt.take() { + None => (), // We have a fully loaded ref-counted pointer. + Some(debt) => { + let ptr = T::inc(&self.ptr); + if !debt.pay::<T>(ptr) { + unsafe { T::dec(ptr) }; + } + } + } + + // The ptr::read & forget is something like a cheating move. We can't move it out, because + // we have a destructor and Rust doesn't allow us to do that. + let inner = unsafe { ptr::read(self.ptr.deref()) }; + mem::forget(self); + inner + } +} + +impl<T: RefCnt> Borrow<T> for HybridProtection<T> { + #[inline] + fn borrow(&self) -> &T { + &self.ptr + } +} + +pub trait Config { + // Mostly for testing, way to disable the fast slo + const USE_FAST: bool; +} + +#[derive(Clone, Default)] +pub struct DefaultConfig; + +impl Config for DefaultConfig { + const USE_FAST: bool = true; +} + +#[derive(Clone, Default)] +pub struct HybridStrategy<Cfg> { + pub(crate) _config: Cfg, +} + +impl<T, Cfg> InnerStrategy<T> for HybridStrategy<Cfg> +where + T: RefCnt, + Cfg: Config, +{ + type Protected = HybridProtection<T>; + unsafe fn load(&self, storage: &AtomicPtr<T::Base>) -> Self::Protected { + LocalNode::with(|node| { + let fast = if Cfg::USE_FAST { + HybridProtection::attempt(node, storage) + } else { + None + }; + fast.unwrap_or_else(|| HybridProtection::fallback(node, storage)) + }) + } + unsafe fn wait_for_readers(&self, old: *const T::Base, storage: &AtomicPtr<T::Base>) { + // The pay_all may need to provide fresh replacement values if someone else is loading from + // this particular storage. We do so by the exact same way, by `load` ‒ it's OK, a writer + // does not hold a slot and the reader doesn't recurse back into writer, so we won't run + // out of slots. + let replacement = || self.load(storage).into_inner(); + Debt::pay_all::<T, _>(old, storage as *const _ as usize, replacement); + } +} + +impl<T: RefCnt, Cfg: Config> CaS<T> for HybridStrategy<Cfg> { + unsafe fn compare_and_swap<C: crate::as_raw::AsRaw<T::Base>>( + &self, + storage: &AtomicPtr<T::Base>, + current: C, + new: T, + ) -> Self::Protected { + loop { + let old = <Self as InnerStrategy<T>>::load(self, storage); + // Observation of their inequality is enough to make a verdict + if old.as_ptr() != current.as_raw() { + return old; + } + // If they are still equal, put the new one in. + let new_raw = T::as_ptr(&new); + if storage + .compare_exchange_weak(current.as_raw(), new_raw, SeqCst, Relaxed) + .is_ok() + { + // We successfully put the new value in. The ref count went in there too. + T::into_ptr(new); + <Self as InnerStrategy<T>>::wait_for_readers(self, old.as_ptr(), storage); + // We just got one ref count out of the storage and we have one in old. We don't + // need two. + T::dec(old.as_ptr()); + return old; + } + } + } +} diff --git a/src/strategy/mod.rs b/src/strategy/mod.rs new file mode 100644 index 0000000..50e8c5a --- /dev/null +++ b/src/strategy/mod.rs @@ -0,0 +1,160 @@ +//! Strategies for protecting the reference counts. +//! +//! There are multiple algorithms how to protect the reference counts while they're being updated +//! by multiple threads, each with its own set of pros and cons. The [`DefaultStrategy`] is used by +//! default and should generally be the least surprising option. It is possible to pick a different +//! strategy. +//! +//! For now, the traits in here are sealed and don't expose any methods to the users of the crate. +//! This is because we are not confident about the details just yet. In the future it may be +//! possible for downstream users to implement their own, but for now it is only so users can +//! choose one of the provided. +//! +//! It is expected that future strategies would come with different capabilities and limitations. +//! In particular, some that are not "tight" in the cleanup (delay the cleanup) or not support the +//! compare and swap operations. +//! +//! Currently, we have these strategies: +//! +//! * [`DefaultStrategy`] (this one is used implicitly) +//! * [`RwLock<()>`][std::sync::RwLock] +//! +//! # Testing +//! +//! Formally, the [`RwLock<()>`][std::sync::RwLock] may be used as a strategy too. It doesn't have +//! the performance characteristics or lock-free guarantees of the others, but it is much simpler +//! and contains less `unsafe` code (actually, less code altogether). Therefore, it can be used for +//! testing purposes and cross-checking. +//! +//! Note that generally, using [`RwLock<Arc<T>>`][std::sync::RwLock] is likely to be better +//! performance wise. So if the goal is to not use third-party unsafe code, only the one in +//! [`std`], that is the better option. This is provided mostly for investigation and testing of +//! [`ArcSwap`] itself or algorithms written to use [`ArcSwap`]. +//! +//! *This is not meant to be used in production code*. +//! +//! [`ArcSwap`]: crate::ArcSwap +//! [`load`]: crate::ArcSwapAny::load + +use std::borrow::Borrow; +use std::sync::atomic::AtomicPtr; + +use crate::ref_cnt::RefCnt; + +pub(crate) mod hybrid; +mod rw_lock; +// Do not use from outside of the crate. +#[cfg(feature = "internal-test-strategies")] +#[doc(hidden)] +pub mod test_strategies; + +use self::hybrid::{DefaultConfig, HybridStrategy}; + +/// The default strategy. +/// +/// It is used by the type aliases [`ArcSwap`][crate::ArcSwap] and +/// [`ArcSwapOption`][crate::ArcSwapOption]. Only the other strategies need to be used explicitly. +/// +/// # Performance characteristics +/// +/// * It is optimized for read-heavy situations, with possibly many concurrent read accesses from +/// multiple threads. Readers don't contend each other at all. +/// * Readers are wait-free (with the exception of at most once in `usize::MAX / 4` accesses, which +/// is only lock-free). +/// * Writers are lock-free. +/// * Reclamation is exact ‒ the resource is released as soon as possible (works like RAII, not +/// like a traditional garbage collector; can contain non-`'static` data). +/// +/// Each thread has a limited number of fast slots (currently 8, but the exact number is not +/// guaranteed). If it holds at most that many [`Guard`]s at once, acquiring them is fast. Once +/// these slots are used up (by holding to these many [`Guard`]s), acquiring more of them will be +/// slightly slower, but still wait-free. +/// +/// If you expect to hold a lot of "handles" to the data around, or hold onto it for a long time, +/// you may want to prefer the [`load_full`][crate::ArcSwapAny::load_full] method. +/// +/// The speed of the fast slots is in the ballpark of locking an *uncontented* mutex. The advantage +/// over the mutex is the stability of speed in the face of contention from other threads ‒ while +/// the performance of mutex goes rapidly down, the slowdown of running out of held slots or heavy +/// concurrent writer thread in the area of single-digit multiples. +/// +/// The ballpark benchmark figures (my older computer) are around these, but you're welcome to run +/// the benchmarks in the git repository or write your own. +/// +/// * Load (both uncontented and contented by other loads): ~30ns +/// * `load_full`: ~50ns uncontented, goes up a bit with other `load_full` in other threads on the +/// same `Arc` value (~80-100ns). +/// * Loads after running out of the slots ‒ about 10-20ns slower than `load_full`. +/// * Stores: Dependent on number of threads, but generally low microseconds. +/// * Loads with heavy concurrent writer (to the same `ArcSwap`): ~250ns. +/// +/// [`load`]: crate::ArcSwapAny::load +/// [`Guard`]: crate::Guard +pub type DefaultStrategy = HybridStrategy<DefaultConfig>; + +/// Strategy for isolating instances. +/// +/// It is similar to [`DefaultStrategy`], however the spin lock is not sharded (therefore multiple +/// concurrent threads might get bigger hit when multiple threads have to fall back). Nevertheless, +/// each instance has a private spin lock, not influencing the other instances. That also makes +/// them bigger in memory. +/// +/// The hazard pointers are still shared between all instances. +/// +/// The purpose of this strategy is meant for cases where a single instance is going to be +/// "tortured" a lot, so it should not overflow to other instances. +/// +/// This too may be changed for something else (but with at least as good guarantees, primarily +/// that other instances won't get influenced by the "torture"). +// Testing if the DefaultStrategy is good enough to replace it fully and then deprecate. +#[doc(hidden)] +pub type IndependentStrategy = DefaultStrategy; + +// TODO: When we are ready to un-seal, should these traits become unsafe? + +pub(crate) mod sealed { + use super::*; + use crate::as_raw::AsRaw; + + pub trait Protected<T>: Borrow<T> { + fn into_inner(self) -> T; + fn from_inner(ptr: T) -> Self; + } + + pub trait InnerStrategy<T: RefCnt> { + // Drop „unlocks“ + type Protected: Protected<T>; + unsafe fn load(&self, storage: &AtomicPtr<T::Base>) -> Self::Protected; + unsafe fn wait_for_readers(&self, old: *const T::Base, storage: &AtomicPtr<T::Base>); + } + + pub trait CaS<T: RefCnt>: InnerStrategy<T> { + unsafe fn compare_and_swap<C: AsRaw<T::Base>>( + &self, + storage: &AtomicPtr<T::Base>, + current: C, + new: T, + ) -> Self::Protected; + } +} + +/// A strategy for protecting the reference counted pointer `T`. +/// +/// This chooses the algorithm for how the reference counts are protected. Note that the user of +/// the crate can't implement the trait and can't access any method; this is hopefully temporary +/// measure to make sure the interface is not part of the stability guarantees of the crate. Once +/// enough experience is gained with implementing various strategies, it will be un-sealed and +/// users will be able to provide their own implementation. +/// +/// For now, the trait works only as a bound to talk about the types that represent strategies. +pub trait Strategy<T: RefCnt>: sealed::InnerStrategy<T> {} +impl<T: RefCnt, S: sealed::InnerStrategy<T>> Strategy<T> for S {} + +/// An extension of the [`Strategy`], allowing for compare and swap operation. +/// +/// The compare and swap operation is "advanced" and not all strategies need to support them. +/// Therefore, it is a separate trait. +/// +/// Similarly, it is not yet made publicly usable or implementable and works only as a bound. +pub trait CaS<T: RefCnt>: sealed::CaS<T> {} +impl<T: RefCnt, S: sealed::CaS<T>> CaS<T> for S {} diff --git a/src/strategy/rw_lock.rs b/src/strategy/rw_lock.rs new file mode 100644 index 0000000..5a3fd48 --- /dev/null +++ b/src/strategy/rw_lock.rs @@ -0,0 +1,62 @@ +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::RwLock; + +use super::sealed::{CaS, InnerStrategy, Protected}; +use crate::as_raw::AsRaw; +use crate::ref_cnt::RefCnt; + +impl<T: RefCnt> Protected<T> for T { + #[inline] + fn from_inner(ptr: T) -> Self { + ptr + } + + #[inline] + fn into_inner(self) -> T { + self + } +} + +impl<T: RefCnt> InnerStrategy<T> for RwLock<()> { + type Protected = T; + unsafe fn load(&self, storage: &AtomicPtr<T::Base>) -> T { + let _guard = self.read().expect("We don't panic in here"); + let ptr = storage.load(Ordering::Acquire); + let ptr = T::from_ptr(ptr as *const T::Base); + T::inc(&ptr); + + ptr + } + + unsafe fn wait_for_readers(&self, _: *const T::Base, _: &AtomicPtr<T::Base>) { + // By acquiring the write lock, we make sure there are no read locks present across it. + drop(self.write().expect("We don't panic in here")); + } +} + +impl<T: RefCnt> CaS<T> for RwLock<()> { + unsafe fn compare_and_swap<C: AsRaw<T::Base>>( + &self, + storage: &AtomicPtr<T::Base>, + current: C, + new: T, + ) -> Self::Protected { + let _lock = self.write(); + let cur = current.as_raw() as *mut T::Base; + let new = T::into_ptr(new); + let swapped = storage.compare_exchange(cur, new, Ordering::AcqRel, Ordering::Relaxed); + let old = match swapped { + Ok(old) => old, + Err(old) => old, + }; + let old = T::from_ptr(old as *const T::Base); + if swapped.is_err() { + // If the new didn't go in, we need to destroy it and increment count in the old that + // we just duplicated + T::inc(&old); + drop(T::from_ptr(new)); + } + drop(current); + old + } +} diff --git a/src/strategy/test_strategies.rs b/src/strategy/test_strategies.rs new file mode 100644 index 0000000..76bffd2 --- /dev/null +++ b/src/strategy/test_strategies.rs @@ -0,0 +1,22 @@ +#![deprecated(note = "Only for internal testing. Do not use")] +#![allow(deprecated)] // We need to allow ourselves the stuff we deprecate here. +//! Some strategies for internal testing. +//! +//! # Warning +//! +//! They come with no guarantees of correctness, stability, performance or anything at all. *DO NOT +//! USE*. + +use super::hybrid::{Config, HybridStrategy}; + +/// Config for no fast slots. +#[derive(Clone, Copy, Default)] +pub struct NoFastSlots; + +impl Config for NoFastSlots { + const USE_FAST: bool = false; +} + +/// A strategy that fills the slots with some crap to make sure we test the fallbacks too. +#[deprecated(note = "Only for internal testing. Do not use")] +pub type FillFastSlots = HybridStrategy<NoFastSlots>; diff --git a/src/weak.rs b/src/weak.rs new file mode 100644 index 0000000..23df26d --- /dev/null +++ b/src/weak.rs @@ -0,0 +1,117 @@ +use std::ptr; +use std::rc::Weak as RcWeak; +use std::sync::Weak; + +use crate::RefCnt; + +unsafe impl<T> RefCnt for Weak<T> { + type Base = T; + fn as_ptr(me: &Self) -> *mut T { + if Weak::ptr_eq(&Weak::new(), me) { + ptr::null_mut() + } else { + Weak::as_ptr(me) as *mut T + } + } + fn into_ptr(me: Self) -> *mut T { + if Weak::ptr_eq(&Weak::new(), &me) { + ptr::null_mut() + } else { + Weak::into_raw(me) as *mut T + } + } + unsafe fn from_ptr(ptr: *const T) -> Self { + if ptr.is_null() { + Weak::new() + } else { + Weak::from_raw(ptr) + } + } +} + +unsafe impl<T> RefCnt for RcWeak<T> { + type Base = T; + fn as_ptr(me: &Self) -> *mut T { + if RcWeak::ptr_eq(&RcWeak::new(), me) { + ptr::null_mut() + } else { + RcWeak::as_ptr(me) as *mut T + } + } + fn into_ptr(me: Self) -> *mut T { + if RcWeak::ptr_eq(&RcWeak::new(), &me) { + ptr::null_mut() + } else { + RcWeak::into_raw(me) as *mut T + } + } + unsafe fn from_ptr(ptr: *const T) -> Self { + if ptr.is_null() { + RcWeak::new() + } else { + RcWeak::from_raw(ptr) + } + } +} + +macro_rules! t { + ($name: ident, $strategy: ty) => { + #[cfg(test)] + mod $name { + use std::sync::{Arc, Weak}; + + use crate::ArcSwapAny; + + #[allow(deprecated)] // We use "deprecated" testing strategies in here. + type ArcSwapWeak<T> = ArcSwapAny<Weak<T>, $strategy>; + + // Convert to weak, push it through the shared and pull it out again. + #[test] + fn there_and_back() { + let data = Arc::new("Hello"); + let shared = ArcSwapWeak::new(Arc::downgrade(&data)); + assert_eq!(1, Arc::strong_count(&data)); + assert_eq!(1, Arc::weak_count(&data)); + let weak = shared.load(); + assert_eq!("Hello", *weak.upgrade().unwrap()); + assert!(Arc::ptr_eq(&data, &weak.upgrade().unwrap())); + } + + // Replace a weak pointer with a NULL one + #[test] + fn reset() { + let data = Arc::new("Hello"); + let shared = ArcSwapWeak::new(Arc::downgrade(&data)); + assert_eq!(1, Arc::strong_count(&data)); + assert_eq!(1, Arc::weak_count(&data)); + + // An empty weak (eg. NULL) + shared.store(Weak::new()); + assert_eq!(1, Arc::strong_count(&data)); + assert_eq!(0, Arc::weak_count(&data)); + + let weak = shared.load(); + assert!(weak.upgrade().is_none()); + } + + // Destroy the underlying data while the weak is still stored inside. Should make it go + // NULL-ish + #[test] + fn destroy() { + let data = Arc::new("Hello"); + let shared = ArcSwapWeak::new(Arc::downgrade(&data)); + + drop(data); + let weak = shared.load(); + assert!(weak.upgrade().is_none()); + } + } + }; +} + +t!(tests_default, crate::DefaultStrategy); +#[cfg(feature = "internal-test-strategies")] +t!( + tests_full_slots, + crate::strategy::test_strategies::FillFastSlots +); diff --git a/tests/random.rs b/tests/random.rs new file mode 100644 index 0000000..5d8d32c --- /dev/null +++ b/tests/random.rs @@ -0,0 +1,125 @@ +//! Let it torture the implementation with some randomized operations. + +use std::mem; +use std::sync::Arc; + +use arc_swap::{ArcSwapAny, DefaultStrategy, IndependentStrategy}; +use once_cell::sync::Lazy; +use proptest::prelude::*; + +#[derive(Copy, Clone, Debug)] +enum OpsInstruction { + Store(usize), + Swap(usize), + LoadFull, + Load, +} + +impl OpsInstruction { + fn random() -> impl Strategy<Value = Self> { + prop_oneof![ + any::<usize>().prop_map(Self::Store), + any::<usize>().prop_map(Self::Swap), + Just(Self::LoadFull), + Just(Self::Load), + ] + } +} + +proptest! {} + +const LIMIT: usize = 5; +#[cfg(not(miri))] +const SIZE: usize = 100; +#[cfg(miri)] +const SIZE: usize = 10; +static ARCS: Lazy<Vec<Arc<usize>>> = Lazy::new(|| (0..LIMIT).map(Arc::new).collect()); + +#[derive(Copy, Clone, Debug)] +enum SelInstruction { + Swap(usize), + Cas(usize, usize), +} + +impl SelInstruction { + fn random() -> impl Strategy<Value = Self> { + prop_oneof![ + (0..LIMIT).prop_map(Self::Swap), + (0..LIMIT, 0..LIMIT).prop_map(|(cur, new)| Self::Cas(cur, new)), + ] + } +} + +// Generate the same tests for bunch of strategies (one module for one strategy) +macro_rules! t { + (@full => $name: ident, $strategy: ty) => { + t!(@compose => $name, $strategy, + #[test] + fn selection( + instructions in proptest::collection::vec(SelInstruction::random(), 1..SIZE), + ) { + let mut bare = Arc::clone(&ARCS[0]); + #[allow(deprecated)] // We use "deprecated" testing strategies in here. + let a = ArcSwapAny::<_, $strategy>::from(Arc::clone(&ARCS[0])); + for ins in instructions { + match ins { + SelInstruction::Swap(idx) => { + let expected = mem::replace(&mut bare, Arc::clone(&ARCS[idx])); + let actual = a.swap(Arc::clone(&ARCS[idx])); + assert!(Arc::ptr_eq(&expected, &actual)); + } + SelInstruction::Cas(cur, new) => { + let expected = Arc::clone(&bare); + if bare == ARCS[cur] { + bare = Arc::clone(&ARCS[new]); + } + let actual = a.compare_and_swap(&ARCS[cur], Arc::clone(&ARCS[new])); + assert!(Arc::ptr_eq(&expected, &actual)); + } + } + } + } + ); + }; + (@nocas => $name: ident, $strategy: ty) => { + t!(@compose => $name, $strategy, ); + }; + (@compose => $name: ident, $strategy: ty, $($extra: tt)*) => { + mod $name { + use super::*; + + proptest! { + $($extra)* + + #[test] + fn ops( + instructions in proptest::collection::vec(OpsInstruction::random(), 1..SIZE), + ) { + use crate::OpsInstruction::*; + let mut m = 0; + #[allow(deprecated)] // We use "deprecated" testing strategies in here. + let a = ArcSwapAny::<_, $strategy>::new(Arc::new(0usize)); + for ins in instructions { + match ins { + Store(v) => { + m = v; + a.store(Arc::new(v)); + } + Swap(v) => { + let old = mem::replace(&mut m, v); + assert_eq!(old, *a.swap(Arc::new(v))); + } + Load => assert_eq!(m, **a.load()), + LoadFull => assert_eq!(m, *a.load_full()), + } + } + } + } + } + }; +} + +t!(@full => default, DefaultStrategy); +t!(@full => independent, IndependentStrategy); +#[cfg(feature = "internal-test-strategies")] +t!(@full => full_slots, arc_swap::strategy::test_strategies::FillFastSlots); diff --git a/tests/stress.rs b/tests/stress.rs new file mode 100644 index 0000000..15e60fc --- /dev/null +++ b/tests/stress.rs @@ -0,0 +1,310 @@ +//! Stress-tests +//! +//! The tests in here try to torture the implementation with multiple threads, in an attempt to +//! discover any possible race condition. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; + +use adaptive_barrier::{Barrier, PanicMode}; +use arc_swap::strategy::{CaS, DefaultStrategy, IndependentStrategy, Strategy}; +use arc_swap::ArcSwapAny; +use crossbeam_utils::thread; +use itertools::Itertools; +use once_cell::sync::Lazy; + +static LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(())); + +/// We want to prevent these tests from running concurrently, because they run multi-threaded. +fn lock() -> MutexGuard<'static, ()> { + LOCK.lock().unwrap_or_else(PoisonError::into_inner) +} + +struct LLNode<S: Strategy<Option<Arc<LLNode<S>>>>> { + next: ArcSwapAny<Option<Arc<LLNode<S>>>, S>, + num: usize, + owner: usize, +} + +/// A test that repeatedly builds a linked list concurrently with multiple threads. +/// +/// The idea here is to stress-test the RCU implementation and see that no items get lost and that +/// the ref counts are correct afterwards. +fn storm_link_list<S>(node_cnt: usize, iters: usize) +where + S: Default + CaS<Option<Arc<LLNode<S>>>> + Send + Sync, +{ + let _lock = lock(); + let head = ArcSwapAny::<_, S>::from(None::<Arc<LLNode<S>>>); + #[cfg(not(miri))] + let cpus = num_cpus::get(); + #[cfg(miri)] + let cpus = 2; + let barr = Barrier::new(PanicMode::Poison); + thread::scope(|scope| { + for thread in 0..cpus { + // We want to borrow these, but that kind-of conflicts with the move closure mode + let mut barr = barr.clone(); + let head = &head; + scope.spawn(move |_| { + let nodes = (0..node_cnt) + .map(|i| LLNode { + next: ArcSwapAny::from(None), + num: i, + owner: thread, + }) + .map(Arc::new) + .collect::<Vec<_>>(); + for iter in 0..iters { + barr.wait(); // Start synchronously + for n in nodes.iter().rev() { + head.rcu(|head| { + n.next.store(head.clone()); // Cloning the optional Arc + Some(Arc::clone(n)) + }); + } + // And do the checks once everyone finishes + barr.wait(); + // First, check that all our numbers are increasing by one and all are present + let mut node = head.load(); + let mut expecting = 0; + while node.is_some() { + // A bit of gymnastics, we don't have NLL yet and we need to persuade the + // borrow checker this is safe. + let next = { + let inner = node.as_ref().unwrap(); + if inner.owner == thread { + assert_eq!(expecting, inner.num); + expecting += 1; + } + inner.next.load() + }; + node = next; + } + assert_eq!(node_cnt, expecting); + // We don't want to count the ref-counts while someone still plays around with + // them and loading. + barr.wait(); + // Now that we've checked we have everything, check that all the nodes have ref + // count 2 ‒ once in the vector, once in the linked list. + for n in &nodes { + assert_eq!( + 2, + Arc::strong_count(n), + "Wrong number of counts in item {} in iteration {}", + n.num, + iter, + ); + } + // Reset the head so we don't mix the runs together, which would create a mess. + // Also, the tails might disturb the ref counts. + barr.wait(); + head.store(None); + nodes.last().unwrap().next.store(None); + } + barr.wait(); + // We went through all the iterations. Dismantle the list and see that everything + // has ref count 1. + head.store(None); + for n in &nodes { + n.next.store(None); + } + barr.wait(); // Wait until everyone resets their own nexts + for n in &nodes { + assert_eq!(1, Arc::strong_count(n)); + } + }); + } + + drop(barr); + }) + .unwrap(); +} + +struct LLNodeCnt<'a> { + next: Option<Arc<LLNodeCnt<'a>>>, + num: usize, + owner: usize, + live_cnt: &'a AtomicUsize, +} + +impl<'a> Drop for LLNodeCnt<'a> { + fn drop(&mut self) { + self.live_cnt.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Test where we build and then deconstruct a linked list using multiple threads. +fn storm_unroll<S>(node_cnt: usize, iters: usize) +where + S: Default + Send + Sync, + for<'a> S: CaS<Option<Arc<LLNodeCnt<'a>>>>, +{ + let _lock = lock(); + + #[cfg(not(miri))] + let cpus = num_cpus::get(); + #[cfg(miri)] + let cpus = 2; + let barr = Barrier::new(PanicMode::Poison); + let global_cnt = AtomicUsize::new(0); + // We plan to create this many nodes during the whole test. + let live_cnt = AtomicUsize::new(cpus * node_cnt * iters); + let head = ArcSwapAny::<_, S>::from(None); + thread::scope(|scope| { + for thread in 0..cpus { + // Borrow these instead of moving. + let head = &head; + let mut barr = barr.clone(); + let global_cnt = &global_cnt; + let live_cnt = &live_cnt; + scope.spawn(move |_| { + for iter in 0..iters { + barr.wait(); + // Create bunch of nodes and put them into the list. + for i in 0..node_cnt { + let mut node = Arc::new(LLNodeCnt { + next: None, + num: i, + owner: thread, + live_cnt, + }); + head.rcu(|head| { + // Clone Option<Arc> + Arc::get_mut(&mut node).unwrap().next = head.clone(); + Arc::clone(&node) + }); + } + if barr.wait().is_leader() { + let mut cnt = 0; + let mut node = head.load_full(); + while let Some(n) = node.as_ref() { + cnt += 1; + node = n.next.clone(); + } + assert_eq!(cnt, node_cnt * cpus); + } + barr.wait(); + // Keep removing items, count how many there are and that they increase in each + // thread's list. + let mut last_seen = vec![node_cnt; cpus]; + let mut cnt = 0; + while let Some(node) = + head.rcu(|head| head.as_ref().and_then(|h| h.next.clone())) + { + assert!(last_seen[node.owner] > node.num); + last_seen[node.owner] = node.num; + cnt += 1; + } + global_cnt.fetch_add(cnt, Ordering::Relaxed); + if barr.wait().is_leader() { + assert_eq!(node_cnt * cpus, global_cnt.swap(0, Ordering::Relaxed)); + } + assert_eq!( + (iters - iter - 1) * node_cnt * cpus, + live_cnt.load(Ordering::Relaxed), + ); + } + }); + } + + drop(barr); + }) + .unwrap(); + // Everything got destroyed properly. + assert_eq!(0, live_cnt.load(Ordering::Relaxed)); +} + +fn load_parallel<S>(iters: usize) +where + S: Default + Strategy<Arc<usize>> + Send + Sync, +{ + let _lock = lock(); + #[cfg(not(miri))] + let cpus = num_cpus::get(); + #[cfg(miri)] + let cpus = 2; + let shared = ArcSwapAny::<_, S>::from(Arc::new(0)); + thread::scope(|scope| { + scope.spawn(|_| { + for i in 0..iters { + shared.store(Arc::new(i)); + } + }); + for _ in 0..cpus { + scope.spawn(|_| { + for _ in 0..iters { + let guards = (0..256).map(|_| shared.load()).collect::<Vec<_>>(); + for (l, h) in guards.iter().tuple_windows() { + assert!(**l <= **h, "{} > {}", l, h); + } + } + }); + } + }) + .unwrap(); + let v = shared.load_full(); + assert_eq!(2, Arc::strong_count(&v)); +} + +#[cfg(not(miri))] +const ITER_SMALL: usize = 100; +#[cfg(not(miri))] +const ITER_MID: usize = 1000; + +#[cfg(miri)] +const ITER_SMALL: usize = 2; +#[cfg(miri)] +const ITER_MID: usize = 5; + +macro_rules! t { + ($name: ident, $strategy: ty) => { + mod $name { + use super::*; + + #[allow(deprecated)] // We use some "deprecated" testing strategies + type Strategy = $strategy; + + #[test] + fn storm_link_list_small() { + storm_link_list::<Strategy>(ITER_SMALL, 5); + } + + #[test] + #[ignore] + fn storm_link_list_large() { + storm_link_list::<Strategy>(10_000, 50); + } + + #[test] + fn storm_unroll_small() { + storm_unroll::<Strategy>(ITER_SMALL, 5); + } + + #[test] + #[ignore] + fn storm_unroll_large() { + storm_unroll::<Strategy>(10_000, 50); + } + + #[test] + fn load_parallel_small() { + load_parallel::<Strategy>(ITER_MID); + } + + #[test] + #[ignore] + fn load_parallel_large() { + load_parallel::<Strategy>(100_000); + } + } + }; +} + +t!(default, DefaultStrategy); +t!(independent, IndependentStrategy); +#[cfg(feature = "internal-test-strategies")] +t!( + full_slots, + arc_swap::strategy::test_strategies::FillFastSlots +); |