diff options
Diffstat (limited to 'src/array_queue.rs')
-rw-r--r-- | src/array_queue.rs | 145 |
1 files changed, 108 insertions, 37 deletions
diff --git a/src/array_queue.rs b/src/array_queue.rs index 5f3061b..e07fde8 100644 --- a/src/array_queue.rs +++ b/src/array_queue.rs @@ -27,9 +27,11 @@ struct Slot<T> { /// /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an -/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit -/// faster than [`SegQueue`]. +/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for +/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue +/// a bit faster than [`SegQueue`]. /// +/// [`force_push`]: ArrayQueue::force_push /// [`SegQueue`]: super::SegQueue /// /// # Examples @@ -120,21 +122,10 @@ impl<T> ArrayQueue<T> { } } - /// Attempts to push an element into the queue. - /// - /// If the queue is full, the element is returned back as an error. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// - /// assert_eq!(q.push(10), Ok(())); - /// assert_eq!(q.push(20), Err(20)); - /// ``` - pub fn push(&self, value: T) -> Result<(), T> { + fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T> + where + F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>, + { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -143,6 +134,16 @@ impl<T> ArrayQueue<T> { let index = tail & (self.one_lap - 1); let lap = tail & !(self.one_lap - 1); + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + // Inspect the corresponding slot. debug_assert!(index < self.buffer.len()); let slot = unsafe { self.buffer.get_unchecked(index) }; @@ -150,16 +151,6 @@ impl<T> ArrayQueue<T> { // If the tail and the stamp match, we may attempt to push. if tail == stamp { - let new_tail = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - // Try moving the tail. match self.tail.compare_exchange_weak( tail, @@ -182,14 +173,7 @@ impl<T> ArrayQueue<T> { } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { atomic::fence(Ordering::SeqCst); - let head = self.head.load(Ordering::Relaxed); - - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the queue is full. - return Err(value); - } - + value = f(value, tail, new_tail, slot)?; backoff.spin(); tail = self.tail.load(Ordering::Relaxed); } else { @@ -200,6 +184,79 @@ impl<T> ArrayQueue<T> { } } + /// Attempts to push an element into the queue. + /// + /// If the queue is full, the element is returned back as an error. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(1); + /// + /// assert_eq!(q.push(10), Ok(())); + /// assert_eq!(q.push(20), Err(20)); + /// ``` + pub fn push(&self, value: T) -> Result<(), T> { + self.push_or_else(value, |v, tail, _, _| { + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the queue is full. + Err(v) + } else { + Ok(v) + } + }) + } + + /// Pushes an element into the queue, replacing the oldest element if necessary. + /// + /// If the queue is full, the oldest element is replaced and returned, + /// otherwise `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(2); + /// + /// assert_eq!(q.force_push(10), None); + /// assert_eq!(q.force_push(20), None); + /// assert_eq!(q.force_push(30), Some(10)); + /// assert_eq!(q.pop(), Some(20)); + /// ``` + pub fn force_push(&self, value: T) -> Option<T> { + self.push_or_else(value, |v, tail, new_tail, slot| { + let head = tail.wrapping_sub(self.one_lap); + let new_head = new_tail.wrapping_sub(self.one_lap); + + // Try moving the head. + if self + .head + .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // Move the tail. + self.tail.store(new_tail, Ordering::SeqCst); + + // Swap the previous value. + let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() }; + + // Update the stamp. + slot.stamp.store(tail + 1, Ordering::Release); + + Err(old) + } else { + Ok(v) + } + }) + .err() + } + /// Attempts to pop an element from the queue. /// /// If the queue is empty, `None` is returned. @@ -387,10 +444,24 @@ impl<T> ArrayQueue<T> { impl<T> Drop for ArrayQueue<T> { fn drop(&mut self) { // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); + let head = *self.head.get_mut(); + let tail = *self.tail.get_mut(); + + let hix = head & (self.one_lap - 1); + let tix = tail & (self.one_lap - 1); + + let len = if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if tail == head { + 0 + } else { + self.cap + }; // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { + for i in 0..len { // Compute the index of the next slot holding a message. let index = if hix + i < self.cap { hix + i |