diff options
Diffstat (limited to 'src/flavors/list.rs')
-rw-r--r-- | src/flavors/list.rs | 116 |
1 files changed, 95 insertions, 21 deletions
diff --git a/src/flavors/list.rs b/src/flavors/list.rs index 532e8b6..5056aa4 100644 --- a/src/flavors/list.rs +++ b/src/flavors/list.rs @@ -151,7 +151,7 @@ impl Default for ListToken { /// /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and /// improve cache efficiency. -pub struct Channel<T> { +pub(crate) struct Channel<T> { /// The head of the channel. head: CachePadded<Position<T>>, @@ -167,7 +167,7 @@ pub struct Channel<T> { impl<T> Channel<T> { /// Creates a new unbounded channel. - pub fn new() -> Self { + pub(crate) fn new() -> Self { Channel { head: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), @@ -183,12 +183,12 @@ impl<T> Channel<T> { } /// Returns a receiver handle to the channel. - pub fn receiver(&self) -> Receiver<'_, T> { + pub(crate) fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. - pub fn sender(&self) -> Sender<'_, T> { + pub(crate) fn sender(&self) -> Sender<'_, T> { Sender(self) } @@ -231,8 +231,8 @@ impl<T> Channel<T> { if self .tail .block - .compare_and_swap(block, new, Ordering::Release) - == block + .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) + .is_ok() { self.head.block.store(new, Ordering::Release); block = new; @@ -276,7 +276,7 @@ impl<T> Channel<T> { } /// Writes a message into the channel. - pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.list.block.is_null() { return Err(msg); @@ -380,7 +380,7 @@ impl<T> Channel<T> { } /// Reads a message from the channel. - pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { if token.list.block.is_null() { // The channel is disconnected. return Err(()); @@ -405,7 +405,7 @@ impl<T> Channel<T> { } /// Attempts to send a message into the channel. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.send(msg, None).map_err(|err| match err { SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), SendTimeoutError::Timeout(_) => unreachable!(), @@ -413,7 +413,11 @@ impl<T> Channel<T> { } /// Sends a message into the channel. - pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { + pub(crate) fn send( + &self, + msg: T, + _deadline: Option<Instant>, + ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); assert!(self.start_send(token)); unsafe { @@ -423,7 +427,7 @@ impl<T> Channel<T> { } /// Attempts to receive a message without blocking. - pub fn try_recv(&self) -> Result<T, TryRecvError> { + pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); if self.start_recv(token) { @@ -434,7 +438,7 @@ impl<T> Channel<T> { } /// Receives a message from the channel. - pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { + pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); loop { // Try receiving a message several times. @@ -486,7 +490,7 @@ impl<T> Channel<T> { } /// Returns the current number of messages inside the channel. - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { loop { // Load the tail index, then load the head index. let mut tail = self.tail.index.load(Ordering::SeqCst); @@ -522,14 +526,14 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - pub fn capacity(&self) -> Option<usize> { + pub(crate) fn capacity(&self) -> Option<usize> { None } - /// Disconnects the channel and wakes up all blocked receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); if tail & MARK_BIT == 0 { @@ -540,20 +544,90 @@ impl<T> Channel<T> { } } + /// Disconnects receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect_receivers(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + // If receivers are dropped first, discard all messages to free + // memory eagerly. + self.discard_all_messages(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// This method should only be called when all receivers are dropped. + fn discard_all_messages(&self) { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + loop { + let offset = (tail >> SHIFT) % LAP; + if offset != BLOCK_CAP { + break; + } + + // New updates to tail will be rejected by MARK_BIT and aborted unless it's + // at boundary. We need to wait for the updates take affect otherwise there + // can be memory leaks. + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + } + + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head >> SHIFT != tail >> SHIFT { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + (*block).wait_next(); + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Acquire); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + head &= !MARK_BIT; + self.head.block.store(ptr::null_mut(), Ordering::Release); + self.head.index.store(head, Ordering::Release); + } + /// Returns `true` if the channel is disconnected. - pub fn is_disconnected(&self) -> bool { + pub(crate) fn is_disconnected(&self) -> bool { self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 } /// Returns `true` if the channel is empty. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { let head = self.head.index.load(Ordering::SeqCst); let tail = self.tail.index.load(Ordering::SeqCst); head >> SHIFT == tail >> SHIFT } /// Returns `true` if the channel is full. - pub fn is_full(&self) -> bool { + pub(crate) fn is_full(&self) -> bool { false } } @@ -597,10 +671,10 @@ impl<T> Drop for Channel<T> { } /// Receiver handle to a channel. -pub struct Receiver<'a, T>(&'a Channel<T>); +pub(crate) struct Receiver<'a, T>(&'a Channel<T>); /// Sender handle to a channel. -pub struct Sender<'a, T>(&'a Channel<T>); +pub(crate) struct Sender<'a, T>(&'a Channel<T>); impl<T> SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { |