aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc/block.rs')
-rw-r--r--src/sync/mpsc/block.rs143
1 files changed, 103 insertions, 40 deletions
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs
index 58f4a9f..39c3e1b 100644
--- a/src/sync/mpsc/block.rs
+++ b/src/sync/mpsc/block.rs
@@ -1,6 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
+use std::alloc::Layout;
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
@@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
pub(crate) struct Block<T> {
+ /// The header fields.
+ header: BlockHeader<T>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+/// Extra fields for a `Block<T>`.
+struct BlockHeader<T> {
/// The start index of this block.
///
/// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
@@ -24,11 +36,6 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: UnsafeCell<usize>,
-
- /// Array containing values pushed into the block. Values are stored in a
- /// continuous array in order to improve cache line behavior when reading.
- /// The values must be manually dropped.
- values: Values<T>,
}
pub(crate) enum Read<T> {
@@ -36,6 +43,7 @@ pub(crate) enum Read<T> {
Closed,
}
+#[repr(transparent)]
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
@@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize {
SLOT_MASK & slot_index
}
+generate_addr_of_methods! {
+ impl<T> Block<T> {
+ unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
+ &self.header
+ }
+
+ unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
+ &self.values
+ }
+ }
+}
+
impl<T> Block<T> {
- pub(crate) fn new(start_index: usize) -> Block<T> {
- Block {
- // The absolute index in the channel of the first slot in the block.
- start_index,
+ pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
+ unsafe {
+ // Allocate the block on the heap.
+ // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
+ let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
+ let block = match NonNull::new(block) {
+ Some(block) => block,
+ None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
+ };
+
+ // Write the header to the block.
+ Block::addr_of_header(block).as_ptr().write(BlockHeader {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
- // Pointer to the next block in the linked list.
- next: AtomicPtr::new(ptr::null_mut()),
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
- ready_slots: AtomicUsize::new(0),
+ ready_slots: AtomicUsize::new(0),
- observed_tail_position: UnsafeCell::new(0),
+ observed_tail_position: UnsafeCell::new(0),
+ });
- // Value storage
- values: unsafe { Values::uninitialized() },
+ // Initialize the values array.
+ Values::initialize(Block::addr_of_values(block));
+
+ // Convert the pointer to a `Box`.
+ // Safety: The raw pointer was allocated using the global allocator, and with
+ // the layout for a `Block<T>`, so it's valid to convert it to box.
+ Box::from_raw(block.as_ptr())
}
}
/// Returns `true` if the block matches the given index.
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
- self.start_index == index
+ self.header.start_index == index
}
/// Returns the number of blocks between `self` and the block at the
@@ -101,7 +137,7 @@ impl<T> Block<T> {
/// `start_index` must represent a block *after* `self`.
pub(crate) fn distance(&self, other_index: usize) -> usize {
debug_assert!(offset(other_index) == 0);
- other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
}
/// Reads the value at the given offset.
@@ -116,7 +152,7 @@ impl<T> Block<T> {
pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
let offset = offset(slot_index);
- let ready_bits = self.ready_slots.load(Acquire);
+ let ready_bits = self.header.ready_slots.load(Acquire);
if !is_ready(ready_bits, offset) {
if is_tx_closed(ready_bits) {
@@ -156,7 +192,7 @@ impl<T> Block<T> {
/// Signal to the receiver that the sender half of the list is closed.
pub(crate) unsafe fn tx_close(&self) {
- self.ready_slots.fetch_or(TX_CLOSED, Release);
+ self.header.ready_slots.fetch_or(TX_CLOSED, Release);
}
/// Resets the block to a blank state. This enables reusing blocks in the
@@ -169,9 +205,9 @@ impl<T> Block<T> {
/// * All slots are empty.
/// * The caller holds a unique pointer to the block.
pub(crate) unsafe fn reclaim(&mut self) {
- self.start_index = 0;
- self.next = AtomicPtr::new(ptr::null_mut());
- self.ready_slots = AtomicUsize::new(0);
+ self.header.start_index = 0;
+ self.header.next = AtomicPtr::new(ptr::null_mut());
+ self.header.ready_slots = AtomicUsize::new(0);
}
/// Releases the block to the rx half for freeing.
@@ -187,19 +223,20 @@ impl<T> Block<T> {
pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
// Track the observed tail_position. Any sender targeting a greater
// tail_position is guaranteed to not access this block.
- self.observed_tail_position
+ self.header
+ .observed_tail_position
.with_mut(|ptr| *ptr = tail_position);
// Set the released bit, signalling to the receiver that it is safe to
// free the block's memory as soon as all slots **prior** to
// `observed_tail_position` have been filled.
- self.ready_slots.fetch_or(RELEASED, Release);
+ self.header.ready_slots.fetch_or(RELEASED, Release);
}
/// Mark a slot as ready
fn set_ready(&self, slot: usize) {
let mask = 1 << slot;
- self.ready_slots.fetch_or(mask, Release);
+ self.header.ready_slots.fetch_or(mask, Release);
}
/// Returns `true` when all slots have their `ready` bits set.
@@ -214,25 +251,31 @@ impl<T> Block<T> {
/// single atomic cell. However, this could have negative impact on cache
/// behavior as there would be many more mutations to a single slot.
pub(crate) fn is_final(&self) -> bool {
- self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
}
/// Returns the `observed_tail_position` value, if set
pub(crate) fn observed_tail_position(&self) -> Option<usize> {
- if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
None
} else {
- Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ Some(
+ self.header
+ .observed_tail_position
+ .with(|ptr| unsafe { *ptr }),
+ )
}
}
/// Loads the next block
pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
- let ret = NonNull::new(self.next.load(ordering));
+ let ret = NonNull::new(self.header.next.load(ordering));
debug_assert!(unsafe {
- ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
- .unwrap_or(true)
+ ret.map(|block| {
+ block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
+ })
+ .unwrap_or(true)
});
ret
@@ -260,9 +303,10 @@ impl<T> Block<T> {
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
- block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+ block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);
let next_ptr = self
+ .header
.next
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);
@@ -291,7 +335,7 @@ impl<T> Block<T> {
// Create the new block. It is assumed that the block will become the
// next one after `&self`. If this turns out to not be the case,
// `start_index` is updated accordingly.
- let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+ let new_block = Block::new(self.header.start_index + BLOCK_CAP);
let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
@@ -308,7 +352,8 @@ impl<T> Block<T> {
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(
- self.next
+ self.header
+ .next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);
@@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool {
}
impl<T> Values<T> {
- unsafe fn uninitialized() -> Values<T> {
- let mut vals = MaybeUninit::uninit();
-
+ /// Initialize a `Values` struct from a pointer.
+ ///
+ /// # Safety
+ ///
+ /// The raw pointer must be valid for writing a `Values<T>`.
+ unsafe fn initialize(_value: NonNull<Values<T>>) {
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
- let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
+ let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}
-
- Values(vals.assume_init())
}
}
@@ -383,3 +429,20 @@ impl<T> ops::Index<usize> for Values<T> {
self.0.index(index)
}
}
+
+#[cfg(all(test, not(loom)))]
+#[test]
+fn assert_no_stack_overflow() {
+ // https://github.com/tokio-rs/tokio/issues/5293
+
+ struct Foo {
+ _a: [u8; 2_000_000],
+ }
+
+ assert_eq!(
+ Layout::new::<MaybeUninit<Block<Foo>>>(),
+ Layout::new::<Block<Foo>>()
+ );
+
+ let _block = Block::<Foo>::new(0);
+}