diff options
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | Android.bp | 4 | ||||
-rw-r--r-- | CHANGELOG.md | 10 | ||||
-rw-r--r-- | Cargo.toml | 35 | ||||
-rw-r--r-- | Cargo.toml.orig | 5 | ||||
-rw-r--r-- | METADATA | 11 | ||||
-rw-r--r-- | build.rs | 15 | ||||
-rw-r--r-- | no_atomic.rs | 22 | ||||
-rw-r--r-- | src/array_queue.rs | 107 | ||||
-rw-r--r-- | src/seg_queue.rs | 59 | ||||
-rw-r--r-- | tests/array_queue.rs | 34 | ||||
-rw-r--r-- | tests/seg_queue.rs | 35 |
12 files changed, 262 insertions, 81 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..1c48586 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "2988f873f87d2263a7fd2b9465fb9c28f43a6490" + }, + "path_in_vcs": "crossbeam-queue" +}
\ No newline at end of file @@ -43,7 +43,7 @@ rust_defaults { name: "crossbeam-queue_test_defaults", crate_name: "crossbeam_queue", cargo_env_compat: true, - cargo_pkg_version: "0.3.2", + cargo_pkg_version: "0.3.4", test_suites: ["general-tests"], auto_gen_config: true, edition: "2018", @@ -85,7 +85,7 @@ rust_library { host_supported: true, crate_name: "crossbeam_queue", cargo_env_compat: true, - cargo_pkg_version: "0.3.2", + cargo_pkg_version: "0.3.4", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index 68306c8..bf79c5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ +# Version 0.3.4 + +- Implement `IntoIterator` for `ArrayQueue` and `SegQueue`. (#772) + +# Version 0.3.3 + +- Fix stacked borrows violation in `ArrayQueue` when `-Zmiri-tag-raw-pointers` is enabled. (#763) + # Version 0.3.2 -- Support targets that do not have atomic CAS on stable Rust (#698) +- Support targets that do not have atomic CAS on stable Rust. (#698) # Version 0.3.1 @@ -3,31 +3,41 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" +rust-version = "1.36" name = "crossbeam-queue" -version = "0.3.2" -authors = ["The Crossbeam Project Developers"] +version = "0.3.4" description = "Concurrent queues" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-queue" -documentation = "https://docs.rs/crossbeam-queue" -keywords = ["queue", "mpmc", "lock-free", "producer", "consumer"] -categories = ["concurrency", "data-structures", "no-std"] +keywords = [ + "queue", + "mpmc", + "lock-free", + "producer", + "consumer", +] +categories = [ + "concurrency", + "data-structures", + "no-std", +] license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" + [dependencies.cfg-if] version = "1" [dependencies.crossbeam-utils] version = "0.8.5" default-features = false + [dev-dependencies.rand] version = "0.8" @@ -35,4 +45,7 @@ version = "0.8" alloc = [] default = ["std"] nightly = ["crossbeam-utils/nightly"] -std = ["alloc", "crossbeam-utils/std"] +std = [ + "alloc", + "crossbeam-utils/std", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index dbefdc5..ac68694 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,13 +4,12 @@ name = "crossbeam-queue" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-queue-X.Y.Z" git tag -version = "0.3.2" -authors = ["The Crossbeam Project Developers"] +version = "0.3.4" edition = "2018" +rust-version = "1.36" license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-queue" -documentation = "https://docs.rs/crossbeam-queue" description = "Concurrent queues" keywords = ["queue", "mpmc", "lock-free", "producer", "consumer"] categories = ["concurrency", "data-structures", "no-std"] @@ -7,14 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-queue/crossbeam-queue-0.3.2.crate" + value: "https://static.crates.io/crates/crossbeam-queue/crossbeam-queue-0.3.4.crate" } - version: "0.3.2" - # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same. + version: "0.3.4" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 8 - day: 30 + year: 2022 + month: 3 + day: 1 } } @@ -1,12 +1,21 @@ +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `crossbeam_no_atomic_cas` +// Assume the target does *not* support atomic CAS operations. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + #![warn(rust_2018_idioms)] use std::env; include!("no_atomic.rs"); -// The rustc-cfg strings below are *not* public API. Please let us know by -// opening a GitHub issue if your build environment requires some way to enable -// these cfgs other than by executing our build script. fn main() { let target = match env::var("TARGET") { Ok(target) => target, diff --git a/no_atomic.rs b/no_atomic.rs index 522b3b8..90ac60a 100644 --- a/no_atomic.rs +++ b/no_atomic.rs @@ -3,13 +3,16 @@ const NO_ATOMIC_CAS: &[&str] = &[ "avr-unknown-gnu-atmega328", + "bpfeb-unknown-none", + "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", "riscv32imc-unknown-none-elf", "thumbv4t-none-eabi", "thumbv6m-none-eabi", ]; -#[allow(dead_code)] + +#[allow(dead_code)] // Only crossbeam-utils uses this. const NO_ATOMIC_64: &[&str] = &[ "arm-linux-androideabi", "armebv7r-none-eabi", @@ -18,18 +21,24 @@ const NO_ATOMIC_64: &[&str] = &[ "armv5te-unknown-linux-gnueabi", "armv5te-unknown-linux-musleabi", "armv5te-unknown-linux-uclibceabi", + "armv6k-nintendo-3ds", "armv7r-none-eabi", "armv7r-none-eabihf", + "avr-unknown-gnu-atmega328", "hexagon-unknown-linux-musl", + "m68k-unknown-linux-gnu", "mips-unknown-linux-gnu", "mips-unknown-linux-musl", "mips-unknown-linux-uclibc", + "mipsel-sony-psp", "mipsel-unknown-linux-gnu", "mipsel-unknown-linux-musl", "mipsel-unknown-linux-uclibc", "mipsel-unknown-none", "mipsisa32r6-unknown-linux-gnu", "mipsisa32r6el-unknown-linux-gnu", + "msp430-none-elf", + "powerpc-unknown-freebsd", "powerpc-unknown-linux-gnu", "powerpc-unknown-linux-gnuspe", "powerpc-unknown-linux-musl", @@ -39,18 +48,21 @@ const NO_ATOMIC_64: &[&str] = &[ "powerpc-wrs-vxworks-spe", "riscv32gc-unknown-linux-gnu", "riscv32gc-unknown-linux-musl", + "riscv32i-unknown-none-elf", "riscv32imac-unknown-none-elf", + "riscv32imc-esp-espidf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv6m-none-eabi", "thumbv7em-none-eabi", "thumbv7em-none-eabihf", "thumbv7m-none-eabi", "thumbv8m.base-none-eabi", "thumbv8m.main-none-eabi", "thumbv8m.main-none-eabihf", - "mipsel-sony-psp", - "thumbv4t-none-eabi", - "thumbv6m-none-eabi", ]; -#[allow(dead_code)] + +#[allow(dead_code)] // Only crossbeam-utils uses this. const NO_ATOMIC: &[&str] = &[ "avr-unknown-gnu-atmega328", "msp430-none-elf", diff --git a/src/array_queue.rs b/src/array_queue.rs index ff1efaa..5f3061b 100644 --- a/src/array_queue.rs +++ b/src/array_queue.rs @@ -6,8 +6,7 @@ use alloc::boxed::Box; use core::cell::UnsafeCell; use core::fmt; -use core::marker::PhantomData; -use core::mem::{self, MaybeUninit}; +use core::mem::MaybeUninit; use core::sync::atomic::{self, AtomicUsize, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; @@ -63,16 +62,13 @@ pub struct ArrayQueue<T> { tail: CachePadded<AtomicUsize>, /// The buffer holding slots. - buffer: *mut Slot<T>, + buffer: Box<[Slot<T>]>, /// The queue capacity. cap: usize, /// A stamp with the value of `{ lap: 1, index: 0 }`. one_lap: usize, - - /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`. - _marker: PhantomData<T>, } unsafe impl<T: Send> Sync for ArrayQueue<T> {} @@ -102,20 +98,15 @@ impl<T> ArrayQueue<T> { // Allocate a buffer of `cap` slots initialized // with stamps. - let buffer = { - let mut boxed: Box<[Slot<T>]> = (0..cap) - .map(|i| { - // Set the stamp to `{ lap: 0, index: i }`. - Slot { - stamp: AtomicUsize::new(i), - value: UnsafeCell::new(MaybeUninit::uninit()), - } - }) - .collect(); - let ptr = boxed.as_mut_ptr(); - mem::forget(boxed); - ptr - }; + let buffer: Box<[Slot<T>]> = (0..cap) + .map(|i| { + // Set the stamp to `{ lap: 0, index: i }`. + Slot { + stamp: AtomicUsize::new(i), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + }) + .collect(); // One lap is the smallest power of two greater than `cap`. let one_lap = (cap + 1).next_power_of_two(); @@ -126,7 +117,6 @@ impl<T> ArrayQueue<T> { one_lap, head: CachePadded::new(AtomicUsize::new(head)), tail: CachePadded::new(AtomicUsize::new(tail)), - _marker: PhantomData, } } @@ -154,7 +144,8 @@ impl<T> ArrayQueue<T> { let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. @@ -234,7 +225,8 @@ impl<T> ArrayQueue<T> { let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the the stamp is ahead of the head by 1, we may attempt to pop. @@ -407,23 +399,12 @@ impl<T> Drop for ArrayQueue<T> { }; unsafe { - let p = { - let slot = &mut *self.buffer.add(index); - let value = &mut *slot.value.get(); - value.as_mut_ptr() - }; - p.drop_in_place(); + debug_assert!(index < self.buffer.len()); + let slot = self.buffer.get_unchecked_mut(index); + let value = &mut *slot.value.get(); + value.as_mut_ptr().drop_in_place(); } } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - // Create a slice from the buffer to make - // a fat pointer. Then, use Box::from_raw - // to deallocate it. - let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; - Box::from_raw(ptr); - } } } @@ -432,3 +413,53 @@ impl<T> fmt::Debug for ArrayQueue<T> { f.pad("ArrayQueue { .. }") } } + +impl<T> IntoIterator for ArrayQueue<T> { + type Item = T; + + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { value: self } + } +} + +#[derive(Debug)] +pub struct IntoIter<T> { + value: ArrayQueue<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + let value = &mut self.value; + let head = *value.head.get_mut(); + if value.head.get_mut() != value.tail.get_mut() { + let index = head & (value.one_lap - 1); + let lap = head & !(value.one_lap - 1); + // SAFETY: We have mutable access to this, so we can read without + // worrying about concurrency. Furthermore, we know this is + // initialized because it is the value pointed at by `value.head` + // and this is a non-empty queue. + let val = unsafe { + debug_assert!(index < value.buffer.len()); + let slot = value.buffer.get_unchecked_mut(index); + slot.value.get().read().assume_init() + }; + let new = if index + 1 < value.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(value.one_lap) + }; + *value.head.get_mut() = new; + Option::Some(val) + } else { + Option::None + } + } +} diff --git a/src/seg_queue.rs b/src/seg_queue.rs index 8545541..1767775 100644 --- a/src/seg_queue.rs +++ b/src/seg_queue.rs @@ -484,3 +484,62 @@ impl<T> Default for SegQueue<T> { SegQueue::new() } } + +impl<T> IntoIterator for SegQueue<T> { + type Item = T; + + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { value: self } + } +} + +#[derive(Debug)] +pub struct IntoIter<T> { + value: SegQueue<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + let value = &mut self.value; + let head = *value.head.index.get_mut(); + let tail = *value.tail.index.get_mut(); + if head >> SHIFT == tail >> SHIFT { + None + } else { + let block = *value.head.block.get_mut(); + let offset = (head >> SHIFT) % LAP; + + // SAFETY: We have mutable access to this, so we can read without + // worrying about concurrency. Furthermore, we know this is + // initialized because it is the value pointed at by `value.head` + // and this is a non-empty queue. + let item = unsafe { + let slot = (*block).slots.get_unchecked(offset); + let p = &mut *slot.value.get(); + p.as_mut_ptr().read() + }; + if offset + 1 == BLOCK_CAP { + // Deallocate the block and move to the next one. + // SAFETY: The block is initialized because we've been reading + // from it this entire time. We can drop it b/c everything has + // been read out of it, so nothing is pointing to it anymore. + unsafe { + let next = *(*block).next.get_mut(); + drop(Box::from_raw(block)); + *value.head.block.get_mut() = next; + } + // The last value in a block is empty, so skip it + *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); + // Double-check that we're pointing to the first item in a block. + debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); + } else { + *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); + } + Some(item) + } + } +} diff --git a/tests/array_queue.rs b/tests/array_queue.rs index 63007eb..a23e082 100644 --- a/tests/array_queue.rs +++ b/tests/array_queue.rs @@ -35,28 +35,29 @@ fn len_empty_full() { let q = ArrayQueue::new(2); assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); - assert_eq!(q.is_full(), false); + assert!(q.is_empty()); + assert!(!q.is_full()); q.push(()).unwrap(); assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), false); + assert!(!q.is_empty()); + assert!(!q.is_full()); q.push(()).unwrap(); assert_eq!(q.len(), 2); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), true); + assert!(!q.is_empty()); + assert!(q.is_full()); q.pop().unwrap(); assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); - assert_eq!(q.is_full(), false); + assert!(!q.is_empty()); + assert!(!q.is_full()); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn len() { const COUNT: usize = 25_000; @@ -114,6 +115,7 @@ fn len() { assert_eq!(q.len(), 0); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn spsc() { const COUNT: usize = 100_000; @@ -142,6 +144,7 @@ fn spsc() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn mpmc() { const COUNT: usize = 25_000; @@ -178,6 +181,7 @@ fn mpmc() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { const RUNS: usize = 100; @@ -231,6 +235,9 @@ fn drops() { #[test] fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -248,3 +255,14 @@ fn linearizable() { }) .unwrap(); } + +#[test] +fn into_iter() { + let q = ArrayQueue::new(100); + for i in 0..100 { + q.push(i).unwrap(); + } + for (i, j) in q.into_iter().enumerate() { + assert_eq!(i, j); + } +} diff --git a/tests/seg_queue.rs b/tests/seg_queue.rs index 63df9a0..f1304ed 100644 --- a/tests/seg_queue.rs +++ b/tests/seg_queue.rs @@ -20,17 +20,17 @@ fn len_empty_full() { let q = SegQueue::new(); assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); + assert!(q.is_empty()); q.push(()); assert_eq!(q.len(), 1); - assert_eq!(q.is_empty(), false); + assert!(!q.is_empty()); q.pop().unwrap(); assert_eq!(q.len(), 0); - assert_eq!(q.is_empty(), true); + assert!(q.is_empty()); } #[test] @@ -52,6 +52,7 @@ fn len() { assert_eq!(q.len(), 0); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn spsc() { const COUNT: usize = 100_000; @@ -79,6 +80,7 @@ fn spsc() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn mpmc() { const COUNT: usize = 25_000; @@ -115,8 +117,11 @@ fn mpmc() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + const RUNS: usize = 100; + static DROPS: AtomicUsize = AtomicUsize::new(0); #[derive(Debug, PartialEq)] @@ -130,7 +135,7 @@ fn drops() { let mut rng = thread_rng(); - for _ in 0..100 { + for _ in 0..RUNS { let steps = rng.gen_range(0..10_000); let additional = rng.gen_range(0..1000); @@ -161,3 +166,25 @@ fn drops() { assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); } } + +#[test] +fn into_iter() { + let q = SegQueue::new(); + for i in 0..100 { + q.push(i); + } + for (i, j) in q.into_iter().enumerate() { + assert_eq!(i, j); + } +} + +#[test] +fn into_iter_drop() { + let q = SegQueue::new(); + for i in 0..100 { + q.push(i); + } + for (i, j) in q.into_iter().enumerate().take(50) { + assert_eq!(i, j); + } +} |