aboutsummaryrefslogtreecommitdiff
path: root/src/flavors/list.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/flavors/list.rs')
-rw-r--r--src/flavors/list.rs116
1 files changed, 95 insertions, 21 deletions
diff --git a/src/flavors/list.rs b/src/flavors/list.rs
index 532e8b6..5056aa4 100644
--- a/src/flavors/list.rs
+++ b/src/flavors/list.rs
@@ -151,7 +151,7 @@ impl Default for ListToken {
///
/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
/// improve cache efficiency.
-pub struct Channel<T> {
+pub(crate) struct Channel<T> {
/// The head of the channel.
head: CachePadded<Position<T>>,
@@ -167,7 +167,7 @@ pub struct Channel<T> {
impl<T> Channel<T> {
/// Creates a new unbounded channel.
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
@@ -183,12 +183,12 @@ impl<T> Channel<T> {
}
/// Returns a receiver handle to the channel.
- pub fn receiver(&self) -> Receiver<'_, T> {
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
- pub fn sender(&self) -> Sender<'_, T> {
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
@@ -231,8 +231,8 @@ impl<T> Channel<T> {
if self
.tail
.block
- .compare_and_swap(block, new, Ordering::Release)
- == block
+ .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
@@ -276,7 +276,7 @@ impl<T> Channel<T> {
}
/// Writes a message into the channel.
- pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
@@ -380,7 +380,7 @@ impl<T> Channel<T> {
}
/// Reads a message from the channel.
- pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
// The channel is disconnected.
return Err(());
@@ -405,7 +405,7 @@ impl<T> Channel<T> {
}
/// Attempts to send a message into the channel.
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.send(msg, None).map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
@@ -413,7 +413,11 @@ impl<T> Channel<T> {
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ _deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
@@ -423,7 +427,7 @@ impl<T> Channel<T> {
}
/// Attempts to receive a message without blocking.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
@@ -434,7 +438,7 @@ impl<T> Channel<T> {
}
/// Receives a message from the channel.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
@@ -486,7 +490,7 @@ impl<T> Channel<T> {
}
/// Returns the current number of messages inside the channel.
- pub fn len(&self) -> usize {
+ pub(crate) fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
let mut tail = self.tail.index.load(Ordering::SeqCst);
@@ -522,14 +526,14 @@ impl<T> Channel<T> {
}
/// Returns the capacity of the channel.
- pub fn capacity(&self) -> Option<usize> {
+ pub(crate) fn capacity(&self) -> Option<usize> {
None
}
- /// Disconnects the channel and wakes up all blocked receivers.
+ /// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
@@ -540,20 +544,90 @@ impl<T> Channel<T> {
}
}
+ /// Disconnects receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ // If receivers are dropped first, discard all messages to free
+ // memory eagerly.
+ self.discard_all_messages();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Discards all messages.
+ ///
+ /// This method should only be called when all receivers are dropped.
+ fn discard_all_messages(&self) {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ loop {
+ let offset = (tail >> SHIFT) % LAP;
+ if offset != BLOCK_CAP {
+ break;
+ }
+
+ // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+ // at boundary. We need to wait for the updates take affect otherwise there
+ // can be memory leaks.
+ backoff.snooze();
+ tail = self.tail.index.load(Ordering::Acquire);
+ }
+
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head >> SHIFT != tail >> SHIFT {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ (*block).wait_next();
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Acquire);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ head &= !MARK_BIT;
+ self.head.block.store(ptr::null_mut(), Ordering::Release);
+ self.head.index.store(head, Ordering::Release);
+ }
+
/// Returns `true` if the channel is disconnected.
- pub fn is_disconnected(&self) -> bool {
+ pub(crate) fn is_disconnected(&self) -> bool {
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
/// Returns `true` if the channel is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the channel is full.
- pub fn is_full(&self) -> bool {
+ pub(crate) fn is_full(&self) -> bool {
false
}
}
@@ -597,10 +671,10 @@ impl<T> Drop for Channel<T> {
}
/// Receiver handle to a channel.
-pub struct Receiver<'a, T>(&'a Channel<T>);
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
-pub struct Sender<'a, T>(&'a Channel<T>);
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {