diff options
Diffstat (limited to 'src/sync/mpsc/block.rs')
-rw-r--r-- | src/sync/mpsc/block.rs | 143 |
1 files changed, 103 insertions, 40 deletions
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs index 58f4a9f..39c3e1b 100644 --- a/src/sync/mpsc/block.rs +++ b/src/sync/mpsc/block.rs @@ -1,6 +1,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; +use std::alloc::Layout; use std::mem::MaybeUninit; use std::ops; use std::ptr::{self, NonNull}; @@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; /// /// Each block in the list can hold up to `BLOCK_CAP` messages. pub(crate) struct Block<T> { + /// The header fields. + header: BlockHeader<T>, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values<T>, +} + +/// Extra fields for a `Block<T>`. +struct BlockHeader<T> { /// The start index of this block. /// /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. @@ -24,11 +36,6 @@ pub(crate) struct Block<T> { /// The observed `tail_position` value *after* the block has been passed by /// `block_tail`. observed_tail_position: UnsafeCell<usize>, - - /// Array containing values pushed into the block. Values are stored in a - /// continuous array in order to improve cache line behavior when reading. - /// The values must be manually dropped. - values: Values<T>, } pub(crate) enum Read<T> { @@ -36,6 +43,7 @@ pub(crate) enum Read<T> { Closed, } +#[repr(transparent)] struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]); use super::BLOCK_CAP; @@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize { SLOT_MASK & slot_index } +generate_addr_of_methods! { + impl<T> Block<T> { + unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> { + &self.header + } + + unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> { + &self.values + } + } +} + impl<T> Block<T> { - pub(crate) fn new(start_index: usize) -> Block<T> { - Block { - // The absolute index in the channel of the first slot in the block. - start_index, + pub(crate) fn new(start_index: usize) -> Box<Block<T>> { + unsafe { + // Allocate the block on the heap. + // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header. + let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>; + let block = match NonNull::new(block) { + Some(block) => block, + None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()), + }; + + // Write the header to the block. + Block::addr_of_header(block).as_ptr().write(BlockHeader { + // The absolute index in the channel of the first slot in the block. + start_index, - // Pointer to the next block in the linked list. - next: AtomicPtr::new(ptr::null_mut()), + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), - ready_slots: AtomicUsize::new(0), + ready_slots: AtomicUsize::new(0), - observed_tail_position: UnsafeCell::new(0), + observed_tail_position: UnsafeCell::new(0), + }); - // Value storage - values: unsafe { Values::uninitialized() }, + // Initialize the values array. + Values::initialize(Block::addr_of_values(block)); + + // Convert the pointer to a `Box`. + // Safety: The raw pointer was allocated using the global allocator, and with + // the layout for a `Block<T>`, so it's valid to convert it to box. + Box::from_raw(block.as_ptr()) } } /// Returns `true` if the block matches the given index. pub(crate) fn is_at_index(&self, index: usize) -> bool { debug_assert!(offset(index) == 0); - self.start_index == index + self.header.start_index == index } /// Returns the number of blocks between `self` and the block at the @@ -101,7 +137,7 @@ impl<T> Block<T> { /// `start_index` must represent a block *after* `self`. pub(crate) fn distance(&self, other_index: usize) -> usize { debug_assert!(offset(other_index) == 0); - other_index.wrapping_sub(self.start_index) / BLOCK_CAP + other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP } /// Reads the value at the given offset. @@ -116,7 +152,7 @@ impl<T> Block<T> { pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> { let offset = offset(slot_index); - let ready_bits = self.ready_slots.load(Acquire); + let ready_bits = self.header.ready_slots.load(Acquire); if !is_ready(ready_bits, offset) { if is_tx_closed(ready_bits) { @@ -156,7 +192,7 @@ impl<T> Block<T> { /// Signal to the receiver that the sender half of the list is closed. pub(crate) unsafe fn tx_close(&self) { - self.ready_slots.fetch_or(TX_CLOSED, Release); + self.header.ready_slots.fetch_or(TX_CLOSED, Release); } /// Resets the block to a blank state. This enables reusing blocks in the @@ -169,9 +205,9 @@ impl<T> Block<T> { /// * All slots are empty. /// * The caller holds a unique pointer to the block. pub(crate) unsafe fn reclaim(&mut self) { - self.start_index = 0; - self.next = AtomicPtr::new(ptr::null_mut()); - self.ready_slots = AtomicUsize::new(0); + self.header.start_index = 0; + self.header.next = AtomicPtr::new(ptr::null_mut()); + self.header.ready_slots = AtomicUsize::new(0); } /// Releases the block to the rx half for freeing. @@ -187,19 +223,20 @@ impl<T> Block<T> { pub(crate) unsafe fn tx_release(&self, tail_position: usize) { // Track the observed tail_position. Any sender targeting a greater // tail_position is guaranteed to not access this block. - self.observed_tail_position + self.header + .observed_tail_position .with_mut(|ptr| *ptr = tail_position); // Set the released bit, signalling to the receiver that it is safe to // free the block's memory as soon as all slots **prior** to // `observed_tail_position` have been filled. - self.ready_slots.fetch_or(RELEASED, Release); + self.header.ready_slots.fetch_or(RELEASED, Release); } /// Mark a slot as ready fn set_ready(&self, slot: usize) { let mask = 1 << slot; - self.ready_slots.fetch_or(mask, Release); + self.header.ready_slots.fetch_or(mask, Release); } /// Returns `true` when all slots have their `ready` bits set. @@ -214,25 +251,31 @@ impl<T> Block<T> { /// single atomic cell. However, this could have negative impact on cache /// behavior as there would be many more mutations to a single slot. pub(crate) fn is_final(&self) -> bool { - self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK } /// Returns the `observed_tail_position` value, if set pub(crate) fn observed_tail_position(&self) -> Option<usize> { - if 0 == RELEASED & self.ready_slots.load(Acquire) { + if 0 == RELEASED & self.header.ready_slots.load(Acquire) { None } else { - Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + Some( + self.header + .observed_tail_position + .with(|ptr| unsafe { *ptr }), + ) } } /// Loads the next block pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> { - let ret = NonNull::new(self.next.load(ordering)); + let ret = NonNull::new(self.header.next.load(ordering)); debug_assert!(unsafe { - ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) - .unwrap_or(true) + ret.map(|block| { + block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP) + }) + .unwrap_or(true) }); ret @@ -260,9 +303,10 @@ impl<T> Block<T> { success: Ordering, failure: Ordering, ) -> Result<(), NonNull<Block<T>>> { - block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self + .header .next .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) .unwrap_or_else(|x| x); @@ -291,7 +335,7 @@ impl<T> Block<T> { // Create the new block. It is assumed that the block will become the // next one after `&self`. If this turns out to not be the case, // `start_index` is updated accordingly. - let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + let new_block = Block::new(self.header.start_index + BLOCK_CAP); let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; @@ -308,7 +352,8 @@ impl<T> Block<T> { // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. let next = NonNull::new( - self.next + self.header + .next .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) .unwrap_or_else(|x| x), ); @@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool { } impl<T> Values<T> { - unsafe fn uninitialized() -> Values<T> { - let mut vals = MaybeUninit::uninit(); - + /// Initialize a `Values` struct from a pointer. + /// + /// # Safety + /// + /// The raw pointer must be valid for writing a `Values<T>`. + unsafe fn initialize(_value: NonNull<Values<T>>) { // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>; + let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>; for i in 0..BLOCK_CAP { p.add(i) .write(UnsafeCell::new(MaybeUninit::uninit())); } } - - Values(vals.assume_init()) } } @@ -383,3 +429,20 @@ impl<T> ops::Index<usize> for Values<T> { self.0.index(index) } } + +#[cfg(all(test, not(loom)))] +#[test] +fn assert_no_stack_overflow() { + // https://github.com/tokio-rs/tokio/issues/5293 + + struct Foo { + _a: [u8; 2_000_000], + } + + assert_eq!( + Layout::new::<MaybeUninit<Block<Foo>>>(), + Layout::new::<Block<Foo>>() + ); + + let _block = Block::<Foo>::new(0); +} |