diff options
author | Jason Macnak <natsu@google.com> | 2021-09-02 17:04:54 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-09-02 17:04:54 +0000 |
commit | 65d6e889d55d59dd988c261d934a8bcd53d7a39c (patch) | |
tree | 4b825dc642cb6eb9a060e54bf8d69288fbee4904 | |
parent | fcdc67c2534f80015265d95e5998b9dc38425f09 (diff) | |
parent | 2dc1b6d1db7d4dd227a22e26e3087e2c710747b4 (diff) | |
download | crossbeam-queue-65d6e889d55d59dd988c261d934a8bcd53d7a39c.tar.gz |
Revert "Import crossbeam-queue rust crate" am: 2dc1b6d1db
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/crossbeam-queue/+/1818473
Change-Id: I18afcaf02282aa429468a467d577eb8f7e21d2ba
-rw-r--r-- | Android.bp | 108 | ||||
-rw-r--r-- | CHANGELOG.md | 42 | ||||
-rw-r--r-- | Cargo.toml | 38 | ||||
-rw-r--r-- | Cargo.toml.orig | 49 | ||||
l--------- | LICENSE | 1 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 27 | ||||
-rw-r--r-- | METADATA | 20 | ||||
-rw-r--r-- | MODULE_LICENSE_APACHE2 | 0 | ||||
-rw-r--r-- | OWNERS | 1 | ||||
-rw-r--r-- | README.md | 54 | ||||
-rw-r--r-- | build.rs | 32 | ||||
-rw-r--r-- | no_atomic.rs | 59 | ||||
-rw-r--r-- | src/array_queue.rs | 434 | ||||
-rw-r--r-- | src/lib.rs | 34 | ||||
-rw-r--r-- | src/seg_queue.rs | 486 | ||||
-rw-r--r-- | tests/array_queue.rs | 250 | ||||
-rw-r--r-- | tests/seg_queue.rs | 163 |
18 files changed, 0 insertions, 1999 deletions
diff --git a/Android.bp b/Android.bp deleted file mode 100644 index 51832d2..0000000 --- a/Android.bp +++ /dev/null @@ -1,108 +0,0 @@ -// This file is generated by cargo2android.py --run --device --dependencies --tests. -// Do not modify this file as changes will be overridden on upgrade. - - - -rust_defaults { - name: "crossbeam-queue_test_defaults", - crate_name: "crossbeam_queue", - srcs: ["src/lib.rs"], - test_suites: ["general-tests"], - auto_gen_config: true, - edition: "2018", - features: [ - "alloc", - "default", - "std", - ], - rustlibs: [ - "libcfg_if", - "libcrossbeam_utils", - "librand", - ], -} - -rust_test_host { - name: "crossbeam-queue_host_test_src_lib", - defaults: ["crossbeam-queue_test_defaults"], - test_options: { - unit_test: true, - }, -} - -rust_test { - name: "crossbeam-queue_device_test_src_lib", - defaults: ["crossbeam-queue_test_defaults"], -} - -rust_defaults { - name: "crossbeam-queue_test_defaults_crossbeam_queue", - crate_name: "crossbeam_queue", - test_suites: ["general-tests"], - auto_gen_config: true, - edition: "2018", - features: [ - "alloc", - "default", - "std", - ], - rustlibs: [ - "libcfg_if", - "libcrossbeam_queue", - "libcrossbeam_utils", - "librand", - ], -} - -rust_test_host { - name: "crossbeam-queue_host_test_tests_array_queue", - defaults: ["crossbeam-queue_test_defaults_crossbeam_queue"], - srcs: ["tests/array_queue.rs"], - test_options: { - unit_test: true, - }, -} - -rust_test { - name: "crossbeam-queue_device_test_tests_array_queue", - defaults: ["crossbeam-queue_test_defaults_crossbeam_queue"], - srcs: ["tests/array_queue.rs"], -} - -rust_test_host { - name: "crossbeam-queue_host_test_tests_seg_queue", - defaults: ["crossbeam-queue_test_defaults_crossbeam_queue"], - srcs: ["tests/seg_queue.rs"], - test_options: { - unit_test: true, - }, -} - -rust_test { - name: "crossbeam-queue_device_test_tests_seg_queue", - defaults: ["crossbeam-queue_test_defaults_crossbeam_queue"], - srcs: ["tests/seg_queue.rs"], -} - -rust_library { - name: "libcrossbeam_queue", - host_supported: true, - crate_name: "crossbeam_queue", - cargo_env_compat: true, - cargo_pkg_version: "0.3.2", - srcs: ["src/lib.rs"], - edition: "2018", - features: [ - "alloc", - "default", - "std", - ], - rustlibs: [ - "libcfg_if", - "libcrossbeam_utils", - ], - apex_available: [ - "//apex_available:platform", - "com.android.virt", - ], -} diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 68306c8..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,42 +0,0 @@ -# Version 0.3.2 - -- Support targets that do not have atomic CAS on stable Rust (#698) - -# Version 0.3.1 - -- Make `SegQueue::new` const fn. (#584) -- Change license to "MIT OR Apache-2.0". - -# Version 0.3.0 - -- Bump the minimum supported Rust version to 1.36. -- Remove `PushError` and `PopError`. - -# Version 0.2.3 - -- Fix bug in release (yanking 0.2.2) - -# Version 0.2.2 - -- Fix unsoundness issues by adopting `MaybeUninit`. (#458) - -# Version 0.2.1 - -- Add `no_std` support. - -# Version 0.2.0 - -- Bump the minimum required version to 1.28. -- Bump `crossbeam-utils` to `0.7`. - -# Version 0.1.2 - -- Update `crossbeam-utils` to `0.6.5`. - -# Version 0.1.1 - -- Update `crossbeam-utils` to `0.6.4`. - -# Version 0.1.0 - -- Initial version with `ArrayQueue` and `SegQueue`. diff --git a/Cargo.toml b/Cargo.toml deleted file mode 100644 index dc16305..0000000 --- a/Cargo.toml +++ /dev/null @@ -1,38 +0,0 @@ -# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO -# -# When uploading crates to the registry Cargo will automatically -# "normalize" Cargo.toml files for maximal compatibility -# with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies -# -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) - -[package] -edition = "2018" -name = "crossbeam-queue" -version = "0.3.2" -authors = ["The Crossbeam Project Developers"] -description = "Concurrent queues" -homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-queue" -documentation = "https://docs.rs/crossbeam-queue" -keywords = ["queue", "mpmc", "lock-free", "producer", "consumer"] -categories = ["concurrency", "data-structures", "no-std"] -license = "MIT OR Apache-2.0" -repository = "https://github.com/crossbeam-rs/crossbeam" -[dependencies.cfg-if] -version = "1" - -[dependencies.crossbeam-utils] -version = "0.8.5" -default-features = false -[dev-dependencies.rand] -version = "0.8" - -[features] -alloc = [] -default = ["std"] -nightly = ["crossbeam-utils/nightly"] -std = ["alloc", "crossbeam-utils/std"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig deleted file mode 100644 index dbefdc5..0000000 --- a/Cargo.toml.orig +++ /dev/null @@ -1,49 +0,0 @@ -[package] -name = "crossbeam-queue" -# When publishing a new version: -# - Update CHANGELOG.md -# - Update README.md -# - Create "crossbeam-queue-X.Y.Z" git tag -version = "0.3.2" -authors = ["The Crossbeam Project Developers"] -edition = "2018" -license = "MIT OR Apache-2.0" -repository = "https://github.com/crossbeam-rs/crossbeam" -homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-queue" -documentation = "https://docs.rs/crossbeam-queue" -description = "Concurrent queues" -keywords = ["queue", "mpmc", "lock-free", "producer", "consumer"] -categories = ["concurrency", "data-structures", "no-std"] - -[features] -default = ["std"] - -# Enable to use APIs that require `std`. -# This is enabled by default. -std = ["alloc", "crossbeam-utils/std"] - -# Enable to use APIs that require `alloc`. -# This is enabled by default and also enabled if the `std` feature is enabled. -# -# NOTE: Disabling both `std` *and* `alloc` features is not supported yet. -alloc = [] - -# These features are no longer used. -# TODO: remove in the next major version. -# Enable to use of unstable functionality. -# This is disabled by default and requires recent nightly compiler. -# -# NOTE: This feature is outside of the normal semver guarantees and minor or -# patch versions of crossbeam may make breaking changes to them at any time. -nightly = ["crossbeam-utils/nightly"] - -[dependencies] -cfg-if = "1" - -[dependencies.crossbeam-utils] -version = "0.8.5" -path = "../crossbeam-utils" -default-features = false - -[dev-dependencies] -rand = "0.8" diff --git a/LICENSE b/LICENSE deleted file mode 120000 index 6b579aa..0000000 --- a/LICENSE +++ /dev/null @@ -1 +0,0 @@ -LICENSE-APACHE
\ No newline at end of file diff --git a/LICENSE-APACHE b/LICENSE-APACHE deleted file mode 100644 index 16fe87b..0000000 --- a/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT deleted file mode 100644 index 068d491..0000000 --- a/LICENSE-MIT +++ /dev/null @@ -1,27 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2019 The Crossbeam Project Developers - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/METADATA b/METADATA deleted file mode 100644 index 37b8868..0000000 --- a/METADATA +++ /dev/null @@ -1,20 +0,0 @@ -name: "crossbeam-queue" -description: "Concurrent queues" -third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/crossbeam-queue" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-queue/crossbeam-queue-0.3.2.crate" - } - version: "0.3.2" - # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same. - license_type: NOTICE - last_upgrade_date { - year: 2021 - month: 8 - day: 30 - } -} diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2 deleted file mode 100644 index e69de29..0000000 --- a/MODULE_LICENSE_APACHE2 +++ /dev/null @@ -1 +0,0 @@ -include platform/prebuilts/rust:master:/OWNERS diff --git a/README.md b/README.md deleted file mode 100644 index 2f30b39..0000000 --- a/README.md +++ /dev/null @@ -1,54 +0,0 @@ -# Crossbeam Queue - -[![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( -https://github.com/crossbeam-rs/crossbeam/actions) -[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( -https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-queue#license) -[![Cargo](https://img.shields.io/crates/v/crossbeam-queue.svg)]( -https://crates.io/crates/crossbeam-queue) -[![Documentation](https://docs.rs/crossbeam-queue/badge.svg)]( -https://docs.rs/crossbeam-queue) -[![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)]( -https://www.rust-lang.org) -[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) - -This crate provides concurrent queues that can be shared among threads: - -* [`ArrayQueue`], a bounded MPMC queue that allocates a fixed-capacity buffer on construction. -* [`SegQueue`], an unbounded MPMC queue that allocates small buffers, segments, on demand. - -Everything in this crate can be used in `no_std` environments, provided that `alloc` feature is -enabled. - -[`ArrayQueue`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/struct.ArrayQueue.html -[`SegQueue`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/struct.SegQueue.html - -## Usage - -Add this to your `Cargo.toml`: - -```toml -[dependencies] -crossbeam-queue = "0.3" -``` - -## Compatibility - -Crossbeam Queue supports stable Rust releases going back at least six months, -and every time the minimum supported Rust version is increased, a new minor -version is released. Currently, the minimum supported Rust version is 1.36. - -## License - -Licensed under either of - - * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) - -at your option. - -#### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in the work by you, as defined in the Apache-2.0 license, shall be -dual licensed as above, without any additional terms or conditions. diff --git a/build.rs b/build.rs deleted file mode 100644 index 4ef1279..0000000 --- a/build.rs +++ /dev/null @@ -1,32 +0,0 @@ -#![warn(rust_2018_idioms)] - -use std::env; - -include!("no_atomic.rs"); - -// The rustc-cfg strings below are *not* public API. Please let us know by -// opening a GitHub issue if your build environment requires some way to enable -// these cfgs other than by executing our build script. -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=crossbeam_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic.rs"); -} diff --git a/no_atomic.rs b/no_atomic.rs deleted file mode 100644 index 522b3b8..0000000 --- a/no_atomic.rs +++ /dev/null @@ -1,59 +0,0 @@ -// This file is @generated by no_atomic.sh. -// It is not intended for manual editing. - -const NO_ATOMIC_CAS: &[&str] = &[ - "avr-unknown-gnu-atmega328", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32imc-unknown-none-elf", - "thumbv4t-none-eabi", - "thumbv6m-none-eabi", -]; -#[allow(dead_code)] -const NO_ATOMIC_64: &[&str] = &[ - "arm-linux-androideabi", - "armebv7r-none-eabi", - "armebv7r-none-eabihf", - "armv4t-unknown-linux-gnueabi", - "armv5te-unknown-linux-gnueabi", - "armv5te-unknown-linux-musleabi", - "armv5te-unknown-linux-uclibceabi", - "armv7r-none-eabi", - "armv7r-none-eabihf", - "hexagon-unknown-linux-musl", - "mips-unknown-linux-gnu", - "mips-unknown-linux-musl", - "mips-unknown-linux-uclibc", - "mipsel-unknown-linux-gnu", - "mipsel-unknown-linux-musl", - "mipsel-unknown-linux-uclibc", - "mipsel-unknown-none", - "mipsisa32r6-unknown-linux-gnu", - "mipsisa32r6el-unknown-linux-gnu", - "powerpc-unknown-linux-gnu", - "powerpc-unknown-linux-gnuspe", - "powerpc-unknown-linux-musl", - "powerpc-unknown-netbsd", - "powerpc-unknown-openbsd", - "powerpc-wrs-vxworks", - "powerpc-wrs-vxworks-spe", - "riscv32gc-unknown-linux-gnu", - "riscv32gc-unknown-linux-musl", - "riscv32imac-unknown-none-elf", - "thumbv7em-none-eabi", - "thumbv7em-none-eabihf", - "thumbv7m-none-eabi", - "thumbv8m.base-none-eabi", - "thumbv8m.main-none-eabi", - "thumbv8m.main-none-eabihf", - "mipsel-sony-psp", - "thumbv4t-none-eabi", - "thumbv6m-none-eabi", -]; -#[allow(dead_code)] -const NO_ATOMIC: &[&str] = &[ - "avr-unknown-gnu-atmega328", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32imc-unknown-none-elf", -]; diff --git a/src/array_queue.rs b/src/array_queue.rs deleted file mode 100644 index ff1efaa..0000000 --- a/src/array_queue.rs +++ /dev/null @@ -1,434 +0,0 @@ -//! The implementation is based on Dmitry Vyukov's bounded MPMC queue. -//! -//! Source: -//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> - -use alloc::boxed::Box; -use core::cell::UnsafeCell; -use core::fmt; -use core::marker::PhantomData; -use core::mem::{self, MaybeUninit}; -use core::sync::atomic::{self, AtomicUsize, Ordering}; - -use crossbeam_utils::{Backoff, CachePadded}; - -/// A slot in a queue. -struct Slot<T> { - /// The current stamp. - /// - /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, - /// this node will be next read from. - stamp: AtomicUsize, - - /// The value in this slot. - value: UnsafeCell<MaybeUninit<T>>, -} - -/// A bounded multi-producer multi-consumer queue. -/// -/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed -/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an -/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit -/// faster than [`SegQueue`]. -/// -/// [`SegQueue`]: super::SegQueue -/// -/// # Examples -/// -/// ``` -/// use crossbeam_queue::ArrayQueue; -/// -/// let q = ArrayQueue::new(2); -/// -/// assert_eq!(q.push('a'), Ok(())); -/// assert_eq!(q.push('b'), Ok(())); -/// assert_eq!(q.push('c'), Err('c')); -/// assert_eq!(q.pop(), Some('a')); -/// ``` -pub struct ArrayQueue<T> { - /// The head of the queue. - /// - /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a - /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. - /// - /// Elements are popped from the head of the queue. - head: CachePadded<AtomicUsize>, - - /// The tail of the queue. - /// - /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a - /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. - /// - /// Elements are pushed into the tail of the queue. - tail: CachePadded<AtomicUsize>, - - /// The buffer holding slots. - buffer: *mut Slot<T>, - - /// The queue capacity. - cap: usize, - - /// A stamp with the value of `{ lap: 1, index: 0 }`. - one_lap: usize, - - /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`. - _marker: PhantomData<T>, -} - -unsafe impl<T: Send> Sync for ArrayQueue<T> {} -unsafe impl<T: Send> Send for ArrayQueue<T> {} - -impl<T> ArrayQueue<T> { - /// Creates a new bounded queue with the given capacity. - /// - /// # Panics - /// - /// Panics if the capacity is zero. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::<i32>::new(100); - /// ``` - pub fn new(cap: usize) -> ArrayQueue<T> { - assert!(cap > 0, "capacity must be non-zero"); - - // Head is initialized to `{ lap: 0, index: 0 }`. - // Tail is initialized to `{ lap: 0, index: 0 }`. - let head = 0; - let tail = 0; - - // Allocate a buffer of `cap` slots initialized - // with stamps. - let buffer = { - let mut boxed: Box<[Slot<T>]> = (0..cap) - .map(|i| { - // Set the stamp to `{ lap: 0, index: i }`. - Slot { - stamp: AtomicUsize::new(i), - value: UnsafeCell::new(MaybeUninit::uninit()), - } - }) - .collect(); - let ptr = boxed.as_mut_ptr(); - mem::forget(boxed); - ptr - }; - - // One lap is the smallest power of two greater than `cap`. - let one_lap = (cap + 1).next_power_of_two(); - - ArrayQueue { - buffer, - cap, - one_lap, - head: CachePadded::new(AtomicUsize::new(head)), - tail: CachePadded::new(AtomicUsize::new(tail)), - _marker: PhantomData, - } - } - - /// Attempts to push an element into the queue. - /// - /// If the queue is full, the element is returned back as an error. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// - /// assert_eq!(q.push(10), Ok(())); - /// assert_eq!(q.push(20), Err(20)); - /// ``` - pub fn push(&self, value: T) -> Result<(), T> { - let backoff = Backoff::new(); - let mut tail = self.tail.load(Ordering::Relaxed); - - loop { - // Deconstruct the tail. - let index = tail & (self.one_lap - 1); - let lap = tail & !(self.one_lap - 1); - - // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the tail and the stamp match, we may attempt to push. - if tail == stamp { - let new_tail = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - - // Try moving the tail. - match self.tail.compare_exchange_weak( - tail, - new_tail, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Write the value into the slot and update the stamp. - unsafe { - slot.value.get().write(MaybeUninit::new(value)); - } - slot.stamp.store(tail + 1, Ordering::Release); - return Ok(()); - } - Err(t) => { - tail = t; - backoff.spin(); - } - } - } else if stamp.wrapping_add(self.one_lap) == tail + 1 { - atomic::fence(Ordering::SeqCst); - let head = self.head.load(Ordering::Relaxed); - - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the queue is full. - return Err(value); - } - - backoff.spin(); - tail = self.tail.load(Ordering::Relaxed); - } else { - // Snooze because we need to wait for the stamp to get updated. - backoff.snooze(); - tail = self.tail.load(Ordering::Relaxed); - } - } - } - - /// Attempts to pop an element from the queue. - /// - /// If the queue is empty, `None` is returned. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// assert_eq!(q.push(10), Ok(())); - /// - /// assert_eq!(q.pop(), Some(10)); - /// assert!(q.pop().is_none()); - /// ``` - pub fn pop(&self) -> Option<T> { - let backoff = Backoff::new(); - let mut head = self.head.load(Ordering::Relaxed); - - loop { - // Deconstruct the head. - let index = head & (self.one_lap - 1); - let lap = head & !(self.one_lap - 1); - - // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the the stamp is ahead of the head by 1, we may attempt to pop. - if head + 1 == stamp { - let new = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, index: index + 1 }`. - head + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - - // Try moving the head. - match self.head.compare_exchange_weak( - head, - new, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Read the value from the slot and update the stamp. - let msg = unsafe { slot.value.get().read().assume_init() }; - slot.stamp - .store(head.wrapping_add(self.one_lap), Ordering::Release); - return Some(msg); - } - Err(h) => { - head = h; - backoff.spin(); - } - } - } else if stamp == head { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.load(Ordering::Relaxed); - - // If the tail equals the head, that means the channel is empty. - if tail == head { - return None; - } - - backoff.spin(); - head = self.head.load(Ordering::Relaxed); - } else { - // Snooze because we need to wait for the stamp to get updated. - backoff.snooze(); - head = self.head.load(Ordering::Relaxed); - } - } - } - - /// Returns the capacity of the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::<i32>::new(100); - /// - /// assert_eq!(q.capacity(), 100); - /// ``` - pub fn capacity(&self) -> usize { - self.cap - } - - /// Returns `true` if the queue is empty. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(100); - /// - /// assert!(q.is_empty()); - /// q.push(1).unwrap(); - /// assert!(!q.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - let head = self.head.load(Ordering::SeqCst); - let tail = self.tail.load(Ordering::SeqCst); - - // Is the tail lagging one lap behind head? - // Is the tail equal to the head? - // - // Note: If the head changes just before we load the tail, that means there was a moment - // when the channel was not empty, so it is safe to just return `false`. - tail == head - } - - /// Returns `true` if the queue is full. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// - /// assert!(!q.is_full()); - /// q.push(1).unwrap(); - /// assert!(q.is_full()); - /// ``` - pub fn is_full(&self) -> bool { - let tail = self.tail.load(Ordering::SeqCst); - let head = self.head.load(Ordering::SeqCst); - - // Is the head lagging one lap behind tail? - // - // Note: If the tail changes just before we load the head, that means there was a moment - // when the queue was not full, so it is safe to just return `false`. - head.wrapping_add(self.one_lap) == tail - } - - /// Returns the number of elements in the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(100); - /// assert_eq!(q.len(), 0); - /// - /// q.push(10).unwrap(); - /// assert_eq!(q.len(), 1); - /// - /// q.push(20).unwrap(); - /// assert_eq!(q.len(), 2); - /// ``` - pub fn len(&self) -> usize { - loop { - // Load the tail, then load the head. - let tail = self.tail.load(Ordering::SeqCst); - let head = self.head.load(Ordering::SeqCst); - - // If the tail didn't change, we've got consistent values to work with. - if self.tail.load(Ordering::SeqCst) == tail { - let hix = head & (self.one_lap - 1); - let tix = tail & (self.one_lap - 1); - - return if hix < tix { - tix - hix - } else if hix > tix { - self.cap - hix + tix - } else if tail == head { - 0 - } else { - self.cap - }; - } - } - } -} - -impl<T> Drop for ArrayQueue<T> { - fn drop(&mut self) { - // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); - - // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap { - hix + i - } else { - hix + i - self.cap - }; - - unsafe { - let p = { - let slot = &mut *self.buffer.add(index); - let value = &mut *slot.value.get(); - value.as_mut_ptr() - }; - p.drop_in_place(); - } - } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - // Create a slice from the buffer to make - // a fat pointer. Then, use Box::from_raw - // to deallocate it. - let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; - Box::from_raw(ptr); - } - } -} - -impl<T> fmt::Debug for ArrayQueue<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("ArrayQueue { .. }") - } -} diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 846d7c2..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,34 +0,0 @@ -//! Concurrent queues. -//! -//! This crate provides concurrent queues that can be shared among threads: -//! -//! * [`ArrayQueue`], a bounded MPMC queue that allocates a fixed-capacity buffer on construction. -//! * [`SegQueue`], an unbounded MPMC queue that allocates small buffers, segments, on demand. - -#![doc(test( - no_crate_inject, - attr( - deny(warnings, rust_2018_idioms), - allow(dead_code, unused_assignments, unused_variables) - ) -))] -#![warn( - missing_docs, - missing_debug_implementations, - rust_2018_idioms, - unreachable_pub -)] -#![cfg_attr(not(feature = "std"), no_std)] - -#[cfg(not(crossbeam_no_atomic_cas))] -cfg_if::cfg_if! { - if #[cfg(feature = "alloc")] { - extern crate alloc; - - mod array_queue; - mod seg_queue; - - pub use self::array_queue::ArrayQueue; - pub use self::seg_queue::SegQueue; - } -} diff --git a/src/seg_queue.rs b/src/seg_queue.rs deleted file mode 100644 index 8545541..0000000 --- a/src/seg_queue.rs +++ /dev/null @@ -1,486 +0,0 @@ -use alloc::boxed::Box; -use core::cell::UnsafeCell; -use core::fmt; -use core::marker::PhantomData; -use core::mem::MaybeUninit; -use core::ptr; -use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; - -use crossbeam_utils::{Backoff, CachePadded}; - -// Bits indicating the state of a slot: -// * If a value has been written into the slot, `WRITE` is set. -// * If a value has been read from the slot, `READ` is set. -// * If the block is being destroyed, `DESTROY` is set. -const WRITE: usize = 1; -const READ: usize = 2; -const DESTROY: usize = 4; - -// Each block covers one "lap" of indices. -const LAP: usize = 32; -// The maximum number of values a block can hold. -const BLOCK_CAP: usize = LAP - 1; -// How many lower bits are reserved for metadata. -const SHIFT: usize = 1; -// Indicates that the block is not the last one. -const HAS_NEXT: usize = 1; - -/// A slot in a block. -struct Slot<T> { - /// The value. - value: UnsafeCell<MaybeUninit<T>>, - - /// The state of the slot. - state: AtomicUsize, -} - -impl<T> Slot<T> { - /// Waits until a value is written into the slot. - fn wait_write(&self) { - let backoff = Backoff::new(); - while self.state.load(Ordering::Acquire) & WRITE == 0 { - backoff.snooze(); - } - } -} - -/// A block in a linked list. -/// -/// Each block in the list can hold up to `BLOCK_CAP` values. -struct Block<T> { - /// The next block in the linked list. - next: AtomicPtr<Block<T>>, - - /// Slots for values. - slots: [Slot<T>; BLOCK_CAP], -} - -impl<T> Block<T> { - /// Creates an empty block that starts at `start_index`. - fn new() -> Block<T> { - // SAFETY: This is safe because: - // [1] `Block::next` (AtomicPtr) may be safely zero initialized. - // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. - // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it - // holds a MaybeUninit. - // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. - unsafe { MaybeUninit::zeroed().assume_init() } - } - - /// Waits until the next pointer is set. - fn wait_next(&self) -> *mut Block<T> { - let backoff = Backoff::new(); - loop { - let next = self.next.load(Ordering::Acquire); - if !next.is_null() { - return next; - } - backoff.snooze(); - } - } - - /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. - unsafe fn destroy(this: *mut Block<T>, start: usize) { - // It is not necessary to set the `DESTROY` bit in the last slot because that slot has - // begun destruction of the block. - for i in start..BLOCK_CAP - 1 { - let slot = (*this).slots.get_unchecked(i); - - // Mark the `DESTROY` bit if a thread is still using the slot. - if slot.state.load(Ordering::Acquire) & READ == 0 - && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 - { - // If a thread is still using the slot, it will continue destruction of the block. - return; - } - } - - // No thread is using the block, now it is safe to destroy it. - drop(Box::from_raw(this)); - } -} - -/// A position in a queue. -struct Position<T> { - /// The index in the queue. - index: AtomicUsize, - - /// The block in the linked list. - block: AtomicPtr<Block<T>>, -} - -/// An unbounded multi-producer multi-consumer queue. -/// -/// This queue is implemented as a linked list of segments, where each segment is a small buffer -/// that can hold a handful of elements. There is no limit to how many elements can be in the queue -/// at a time. However, since segments need to be dynamically allocated as elements get pushed, -/// this queue is somewhat slower than [`ArrayQueue`]. -/// -/// [`ArrayQueue`]: super::ArrayQueue -/// -/// # Examples -/// -/// ``` -/// use crossbeam_queue::SegQueue; -/// -/// let q = SegQueue::new(); -/// -/// q.push('a'); -/// q.push('b'); -/// -/// assert_eq!(q.pop(), Some('a')); -/// assert_eq!(q.pop(), Some('b')); -/// assert!(q.pop().is_none()); -/// ``` -pub struct SegQueue<T> { - /// The head of the queue. - head: CachePadded<Position<T>>, - - /// The tail of the queue. - tail: CachePadded<Position<T>>, - - /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. - _marker: PhantomData<T>, -} - -unsafe impl<T: Send> Send for SegQueue<T> {} -unsafe impl<T: Send> Sync for SegQueue<T> {} - -impl<T> SegQueue<T> { - /// Creates a new unbounded queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::SegQueue; - /// - /// let q = SegQueue::<i32>::new(); - /// ``` - pub const fn new() -> SegQueue<T> { - SegQueue { - head: CachePadded::new(Position { - block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), - }), - tail: CachePadded::new(Position { - block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), - }), - _marker: PhantomData, - } - } - - /// Pushes an element into the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::SegQueue; - /// - /// let q = SegQueue::new(); - /// - /// q.push(10); - /// q.push(20); - /// ``` - pub fn push(&self, value: T) { - let backoff = Backoff::new(); - let mut tail = self.tail.index.load(Ordering::Acquire); - let mut block = self.tail.block.load(Ordering::Acquire); - let mut next_block = None; - - loop { - // Calculate the offset of the index into the block. - let offset = (tail >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - tail = self.tail.index.load(Ordering::Acquire); - block = self.tail.block.load(Ordering::Acquire); - continue; - } - - // If we're going to have to install the next block, allocate it in advance in order to - // make the wait for other threads as short as possible. - if offset + 1 == BLOCK_CAP && next_block.is_none() { - next_block = Some(Box::new(Block::<T>::new())); - } - - // If this is the first push operation, we need to allocate the first block. - if block.is_null() { - let new = Box::into_raw(Box::new(Block::<T>::new())); - - if self - .tail - .block - .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - self.head.block.store(new, Ordering::Release); - block = new; - } else { - next_block = unsafe { Some(Box::from_raw(new)) }; - tail = self.tail.index.load(Ordering::Acquire); - block = self.tail.block.load(Ordering::Acquire); - continue; - } - } - - let new_tail = tail + (1 << SHIFT); - - // Try advancing the tail forward. - match self.tail.index.compare_exchange_weak( - tail, - new_tail, - Ordering::SeqCst, - Ordering::Acquire, - ) { - Ok(_) => unsafe { - // If we've reached the end of the block, install the next one. - if offset + 1 == BLOCK_CAP { - let next_block = Box::into_raw(next_block.unwrap()); - let next_index = new_tail.wrapping_add(1 << SHIFT); - - self.tail.block.store(next_block, Ordering::Release); - self.tail.index.store(next_index, Ordering::Release); - (*block).next.store(next_block, Ordering::Release); - } - - // Write the value into the slot. - let slot = (*block).slots.get_unchecked(offset); - slot.value.get().write(MaybeUninit::new(value)); - slot.state.fetch_or(WRITE, Ordering::Release); - - return; - }, - Err(t) => { - tail = t; - block = self.tail.block.load(Ordering::Acquire); - backoff.spin(); - } - } - } - } - - /// Pops an element from the queue. - /// - /// If the queue is empty, `None` is returned. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::SegQueue; - /// - /// let q = SegQueue::new(); - /// - /// q.push(10); - /// assert_eq!(q.pop(), Some(10)); - /// assert!(q.pop().is_none()); - /// ``` - pub fn pop(&self) -> Option<T> { - let backoff = Backoff::new(); - let mut head = self.head.index.load(Ordering::Acquire); - let mut block = self.head.block.load(Ordering::Acquire); - - loop { - // Calculate the offset of the index into the block. - let offset = (head >> SHIFT) % LAP; - - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { - backoff.snooze(); - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); - continue; - } - - let mut new_head = head + (1 << SHIFT); - - if new_head & HAS_NEXT == 0 { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::Relaxed); - - // If the tail equals the head, that means the queue is empty. - if head >> SHIFT == tail >> SHIFT { - return None; - } - - // If head and tail are not in the same block, set `HAS_NEXT` in head. - if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { - new_head |= HAS_NEXT; - } - } - - // The block can be null here only if the first push operation is in progress. In that - // case, just wait until it gets initialized. - if block.is_null() { - backoff.snooze(); - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); - continue; - } - - // Try moving the head index forward. - match self.head.index.compare_exchange_weak( - head, - new_head, - Ordering::SeqCst, - Ordering::Acquire, - ) { - Ok(_) => unsafe { - // If we've reached the end of the block, move to the next one. - if offset + 1 == BLOCK_CAP { - let next = (*block).wait_next(); - let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); - if !(*next).next.load(Ordering::Relaxed).is_null() { - next_index |= HAS_NEXT; - } - - self.head.block.store(next, Ordering::Release); - self.head.index.store(next_index, Ordering::Release); - } - - // Read the value. - let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let value = slot.value.get().read().assume_init(); - - // Destroy the block if we've reached the end, or if another thread wanted to - // destroy but couldn't because we were busy reading from the slot. - if offset + 1 == BLOCK_CAP { - Block::destroy(block, 0); - } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { - Block::destroy(block, offset + 1); - } - - return Some(value); - }, - Err(h) => { - head = h; - block = self.head.block.load(Ordering::Acquire); - backoff.spin(); - } - } - } - } - - /// Returns `true` if the queue is empty. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::SegQueue; - /// - /// let q = SegQueue::new(); - /// - /// assert!(q.is_empty()); - /// q.push(1); - /// assert!(!q.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - let head = self.head.index.load(Ordering::SeqCst); - let tail = self.tail.index.load(Ordering::SeqCst); - head >> SHIFT == tail >> SHIFT - } - - /// Returns the number of elements in the queue. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::SegQueue; - /// - /// let q = SegQueue::new(); - /// assert_eq!(q.len(), 0); - /// - /// q.push(10); - /// assert_eq!(q.len(), 1); - /// - /// q.push(20); - /// assert_eq!(q.len(), 2); - /// ``` - pub fn len(&self) -> usize { - loop { - // Load the tail index, then load the head index. - let mut tail = self.tail.index.load(Ordering::SeqCst); - let mut head = self.head.index.load(Ordering::SeqCst); - - // If the tail index didn't change, we've got consistent indices to work with. - if self.tail.index.load(Ordering::SeqCst) == tail { - // Erase the lower bits. - tail &= !((1 << SHIFT) - 1); - head &= !((1 << SHIFT) - 1); - - // Fix up indices if they fall onto block ends. - if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { - tail = tail.wrapping_add(1 << SHIFT); - } - if (head >> SHIFT) & (LAP - 1) == LAP - 1 { - head = head.wrapping_add(1 << SHIFT); - } - - // Rotate indices so that head falls into the first block. - let lap = (head >> SHIFT) / LAP; - tail = tail.wrapping_sub((lap * LAP) << SHIFT); - head = head.wrapping_sub((lap * LAP) << SHIFT); - - // Remove the lower bits. - tail >>= SHIFT; - head >>= SHIFT; - - // Return the difference minus the number of blocks between tail and head. - return tail - head - tail / LAP; - } - } - } -} - -impl<T> Drop for SegQueue<T> { - fn drop(&mut self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let mut tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); - - // Erase the lower bits. - head &= !((1 << SHIFT) - 1); - tail &= !((1 << SHIFT) - 1); - - unsafe { - // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. - while head != tail { - let offset = (head >> SHIFT) % LAP; - - if offset < BLOCK_CAP { - // Drop the value in the slot. - let slot = (*block).slots.get_unchecked(offset); - let p = &mut *slot.value.get(); - p.as_mut_ptr().drop_in_place(); - } else { - // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); - drop(Box::from_raw(block)); - block = next; - } - - head = head.wrapping_add(1 << SHIFT); - } - - // Deallocate the last remaining block. - if !block.is_null() { - drop(Box::from_raw(block)); - } - } - } -} - -impl<T> fmt::Debug for SegQueue<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("SegQueue { .. }") - } -} - -impl<T> Default for SegQueue<T> { - fn default() -> SegQueue<T> { - SegQueue::new() - } -} diff --git a/tests/array_queue.rs b/tests/array_queue.rs deleted file mode 100644 index 63007eb..0000000 --- a/tests/array_queue.rs +++ /dev/null @@ -1,250 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -use crossbeam_queue::ArrayQueue; -use crossbeam_utils::thread::scope; -use rand::{thread_rng, Rng}; - -#[test] -fn smoke() { - let q = ArrayQueue::new(1); - - q.push(7).unwrap(); - assert_eq!(q.pop(), Some(7)); - - q.push(8).unwrap(); - assert_eq!(q.pop(), Some(8)); - assert!(q.pop().is_none()); -} - -#[test] -fn capacity() { - for i in 1..10 { - let q = ArrayQueue::<i32>::new(i); - assert_eq!(q.capacity(), i); - } -} - -#[test] -#[should_panic(expected = "capacity must be non-zero")] -fn zero_capacity() { - let _ = ArrayQueue::<i32>::new(0); -} - -#[test] -fn len_empty_full() { - let q = ArrayQueue::new(2); - - assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); - assert_eq!(q.is_full(), false); - - q.push(()).unwrap(); - - assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), false); - - q.push(()).unwrap(); - - assert_eq!(q.len(), 2); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), true); - - q.pop().unwrap(); - - assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), false); -} - -#[test] -fn len() { - const COUNT: usize = 25_000; - const CAP: usize = 1000; - - let q = ArrayQueue::new(CAP); - assert_eq!(q.len(), 0); - - for _ in 0..CAP / 10 { - for i in 0..50 { - q.push(i).unwrap(); - assert_eq!(q.len(), i + 1); - } - - for i in 0..50 { - q.pop().unwrap(); - assert_eq!(q.len(), 50 - i - 1); - } - } - assert_eq!(q.len(), 0); - - for i in 0..CAP { - q.push(i).unwrap(); - assert_eq!(q.len(), i + 1); - } - - for _ in 0..CAP { - q.pop().unwrap(); - } - assert_eq!(q.len(), 0); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - loop { - if let Some(x) = q.pop() { - assert_eq!(x, i); - break; - } - } - let len = q.len(); - assert!(len <= CAP); - } - }); - - scope.spawn(|_| { - for i in 0..COUNT { - while q.push(i).is_err() {} - let len = q.len(); - assert!(len <= CAP); - } - }); - }) - .unwrap(); - assert_eq!(q.len(), 0); -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - let q = ArrayQueue::new(3); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - loop { - if let Some(x) = q.pop() { - assert_eq!(x, i); - break; - } - } - } - assert!(q.pop().is_none()); - }); - - scope.spawn(|_| { - for i in 0..COUNT { - while q.push(i).is_err() {} - } - }); - }) - .unwrap(); -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let q = ArrayQueue::<usize>::new(3); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - let n = loop { - if let Some(x) = q.pop() { - break x; - } - }; - v[n].fetch_add(1, Ordering::SeqCst); - } - }); - } - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - while q.push(i).is_err() {} - } - }); - } - }) - .unwrap(); - - for c in v { - assert_eq!(c.load(Ordering::SeqCst), THREADS); - } -} - -#[test] -fn drops() { - const RUNS: usize = 100; - - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..50); - - DROPS.store(0, Ordering::SeqCst); - let q = ArrayQueue::new(50); - - scope(|scope| { - scope.spawn(|_| { - for _ in 0..steps { - while q.pop().is_none() {} - } - }); - - scope.spawn(|_| { - for _ in 0..steps { - while q.push(DropCounter).is_err() { - DROPS.fetch_sub(1, Ordering::SeqCst); - } - } - }); - }) - .unwrap(); - - for _ in 0..additional { - q.push(DropCounter).unwrap(); - } - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(q); - assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); - } -} - -#[test] -fn linearizable() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let q = ArrayQueue::new(THREADS); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - while q.push(0).is_err() {} - q.pop().unwrap(); - } - }); - } - }) - .unwrap(); -} diff --git a/tests/seg_queue.rs b/tests/seg_queue.rs deleted file mode 100644 index 63df9a0..0000000 --- a/tests/seg_queue.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -use crossbeam_queue::SegQueue; -use crossbeam_utils::thread::scope; -use rand::{thread_rng, Rng}; - -#[test] -fn smoke() { - let q = SegQueue::new(); - q.push(7); - assert_eq!(q.pop(), Some(7)); - - q.push(8); - assert_eq!(q.pop(), Some(8)); - assert!(q.pop().is_none()); -} - -#[test] -fn len_empty_full() { - let q = SegQueue::new(); - - assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); - - q.push(()); - - assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); - - q.pop().unwrap(); - - assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); -} - -#[test] -fn len() { - let q = SegQueue::new(); - - assert_eq!(q.len(), 0); - - for i in 0..50 { - q.push(i); - assert_eq!(q.len(), i + 1); - } - - for i in 0..50 { - q.pop().unwrap(); - assert_eq!(q.len(), 50 - i - 1); - } - - assert_eq!(q.len(), 0); -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - let q = SegQueue::new(); - - scope(|scope| { - scope.spawn(|_| { - for i in 0..COUNT { - loop { - if let Some(x) = q.pop() { - assert_eq!(x, i); - break; - } - } - } - assert!(q.pop().is_none()); - }); - scope.spawn(|_| { - for i in 0..COUNT { - q.push(i); - } - }); - }) - .unwrap(); -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const THREADS: usize = 4; - - let q = SegQueue::<usize>::new(); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); - - scope(|scope| { - for _ in 0..THREADS { - scope.spawn(|_| { - for _ in 0..COUNT { - let n = loop { - if let Some(x) = q.pop() { - break x; - } - }; - v[n].fetch_add(1, Ordering::SeqCst); - } - }); - } - for _ in 0..THREADS { - scope.spawn(|_| { - for i in 0..COUNT { - q.push(i); - } - }); - } - }) - .unwrap(); - - for c in v { - assert_eq!(c.load(Ordering::SeqCst), THREADS); - } -} - -#[test] -fn drops() { - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..100 { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..1000); - - DROPS.store(0, Ordering::SeqCst); - let q = SegQueue::new(); - - scope(|scope| { - scope.spawn(|_| { - for _ in 0..steps { - while q.pop().is_none() {} - } - }); - - scope.spawn(|_| { - for _ in 0..steps { - q.push(DropCounter); - } - }); - }) - .unwrap(); - - for _ in 0..additional { - q.push(DropCounter); - } - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(q); - assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); - } -} |