aboutsummaryrefslogtreecommitdiff
path: root/src/array_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/array_queue.rs')
-rw-r--r--src/array_queue.rs145
1 files changed, 108 insertions, 37 deletions
diff --git a/src/array_queue.rs b/src/array_queue.rs
index 5f3061b..e07fde8 100644
--- a/src/array_queue.rs
+++ b/src/array_queue.rs
@@ -27,9 +27,11 @@ struct Slot<T> {
///
/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
-/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
-/// faster than [`SegQueue`].
+/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
+/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
+/// a bit faster than [`SegQueue`].
///
+/// [`force_push`]: ArrayQueue::force_push
/// [`SegQueue`]: super::SegQueue
///
/// # Examples
@@ -120,21 +122,10 @@ impl<T> ArrayQueue<T> {
}
}
- /// Attempts to push an element into the queue.
- ///
- /// If the queue is full, the element is returned back as an error.
- ///
- /// # Examples
- ///
- /// ```
- /// use crossbeam_queue::ArrayQueue;
- ///
- /// let q = ArrayQueue::new(1);
- ///
- /// assert_eq!(q.push(10), Ok(()));
- /// assert_eq!(q.push(20), Err(20));
- /// ```
- pub fn push(&self, value: T) -> Result<(), T> {
+ fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
+ where
+ F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
+ {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
@@ -143,6 +134,16 @@ impl<T> ArrayQueue<T> {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);
+ let new_tail = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, index: index + 1 }`.
+ tail + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
@@ -150,16 +151,6 @@ impl<T> ArrayQueue<T> {
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
- let new_tail = if index + 1 < self.cap {
- // Same lap, incremented index.
- // Set to `{ lap: lap, index: index + 1 }`.
- tail + 1
- } else {
- // One lap forward, index wraps around to zero.
- // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
- lap.wrapping_add(self.one_lap)
- };
-
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
@@ -182,14 +173,7 @@ impl<T> ArrayQueue<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
- let head = self.head.load(Ordering::Relaxed);
-
- // If the head lags one lap behind the tail as well...
- if head.wrapping_add(self.one_lap) == tail {
- // ...then the queue is full.
- return Err(value);
- }
-
+ value = f(value, tail, new_tail, slot)?;
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
@@ -200,6 +184,79 @@ impl<T> ArrayQueue<T> {
}
}
+ /// Attempts to push an element into the queue.
+ ///
+ /// If the queue is full, the element is returned back as an error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_queue::ArrayQueue;
+ ///
+ /// let q = ArrayQueue::new(1);
+ ///
+ /// assert_eq!(q.push(10), Ok(()));
+ /// assert_eq!(q.push(20), Err(20));
+ /// ```
+ pub fn push(&self, value: T) -> Result<(), T> {
+ self.push_or_else(value, |v, tail, _, _| {
+ let head = self.head.load(Ordering::Relaxed);
+
+ // If the head lags one lap behind the tail as well...
+ if head.wrapping_add(self.one_lap) == tail {
+ // ...then the queue is full.
+ Err(v)
+ } else {
+ Ok(v)
+ }
+ })
+ }
+
+ /// Pushes an element into the queue, replacing the oldest element if necessary.
+ ///
+ /// If the queue is full, the oldest element is replaced and returned,
+ /// otherwise `None` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_queue::ArrayQueue;
+ ///
+ /// let q = ArrayQueue::new(2);
+ ///
+ /// assert_eq!(q.force_push(10), None);
+ /// assert_eq!(q.force_push(20), None);
+ /// assert_eq!(q.force_push(30), Some(10));
+ /// assert_eq!(q.pop(), Some(20));
+ /// ```
+ pub fn force_push(&self, value: T) -> Option<T> {
+ self.push_or_else(value, |v, tail, new_tail, slot| {
+ let head = tail.wrapping_sub(self.one_lap);
+ let new_head = new_tail.wrapping_sub(self.one_lap);
+
+ // Try moving the head.
+ if self
+ .head
+ .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
+ .is_ok()
+ {
+ // Move the tail.
+ self.tail.store(new_tail, Ordering::SeqCst);
+
+ // Swap the previous value.
+ let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
+
+ // Update the stamp.
+ slot.stamp.store(tail + 1, Ordering::Release);
+
+ Err(old)
+ } else {
+ Ok(v)
+ }
+ })
+ .err()
+ }
+
/// Attempts to pop an element from the queue.
///
/// If the queue is empty, `None` is returned.
@@ -387,10 +444,24 @@ impl<T> ArrayQueue<T> {
impl<T> Drop for ArrayQueue<T> {
fn drop(&mut self) {
// Get the index of the head.
- let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
+ let head = *self.head.get_mut();
+ let tail = *self.tail.get_mut();
+
+ let hix = head & (self.one_lap - 1);
+ let tix = tail & (self.one_lap - 1);
+
+ let len = if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if tail == head {
+ 0
+ } else {
+ self.cap
+ };
// Loop over all slots that hold a message and drop them.
- for i in 0..self.len() {
+ for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i