diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/context.rs | 20 | ||||
-rw-r--r-- | src/flavors/array.rs | 8 | ||||
-rw-r--r-- | src/flavors/zero.rs | 63 | ||||
-rw-r--r-- | src/select.rs | 5 | ||||
-rw-r--r-- | src/waker.rs | 51 |
5 files changed, 80 insertions, 67 deletions
diff --git a/src/context.rs b/src/context.rs index e2e8480..7467b80 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,7 +1,8 @@ //! Thread-local context used in select. use std::cell::Cell; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::ptr; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::{self, Thread, ThreadId}; use std::time::Instant; @@ -11,6 +12,7 @@ use crossbeam_utils::Backoff; use crate::select::Selected; /// Thread-local context used in select. +// This is a private API that is used by the select macro. #[derive(Debug, Clone)] pub struct Context { inner: Arc<Inner>, @@ -23,7 +25,7 @@ struct Inner { select: AtomicUsize, /// A slot into which another thread may store a pointer to its `Packet`. - packet: AtomicUsize, + packet: AtomicPtr<()>, /// Thread handle. thread: Thread, @@ -45,7 +47,7 @@ impl Context { } let mut f = Some(f); - let mut f = move |cx: &Context| -> R { + let mut f = |cx: &Context| -> R { let f = f.take().unwrap(); f(cx) }; @@ -69,7 +71,7 @@ impl Context { Context { inner: Arc::new(Inner { select: AtomicUsize::new(Selected::Waiting.into()), - packet: AtomicUsize::new(0), + packet: AtomicPtr::new(ptr::null_mut()), thread: thread::current(), thread_id: thread::current().id(), }), @@ -82,7 +84,7 @@ impl Context { self.inner .select .store(Selected::Waiting.into(), Ordering::Release); - self.inner.packet.store(0, Ordering::Release); + self.inner.packet.store(ptr::null_mut(), Ordering::Release); } /// Attempts to select an operation. @@ -112,19 +114,19 @@ impl Context { /// /// This method must be called after `try_select` succeeds and there is a packet to provide. #[inline] - pub fn store_packet(&self, packet: usize) { - if packet != 0 { + pub fn store_packet(&self, packet: *mut ()) { + if !packet.is_null() { self.inner.packet.store(packet, Ordering::Release); } } /// Waits until a packet is provided and returns it. #[inline] - pub fn wait_packet(&self) -> usize { + pub fn wait_packet(&self) -> *mut () { let backoff = Backoff::new(); loop { let packet = self.inner.packet.load(Ordering::Acquire); - if packet != 0 { + if !packet.is_null() { return packet; } backoff.snooze(); diff --git a/src/flavors/array.rs b/src/flavors/array.rs index c49eef1..871768c 100644 --- a/src/flavors/array.rs +++ b/src/flavors/array.rs @@ -10,7 +10,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::time::Instant; @@ -110,7 +110,7 @@ impl<T> Channel<T> { // Allocate a buffer of `cap` slots initialized // with stamps. let buffer = { - let mut boxed: Box<[Slot<T>]> = (0..cap) + let boxed: Box<[Slot<T>]> = (0..cap) .map(|i| { // Set the stamp to `{ lap: 0, mark: 0, index: i }`. Slot { @@ -119,9 +119,7 @@ impl<T> Channel<T> { } }) .collect(); - let ptr = boxed.as_mut_ptr(); - mem::forget(boxed); - ptr + Box::into_raw(boxed) as *mut Slot<T> }; Channel { diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index 9790b77..4afbd8f 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -6,6 +6,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; +use std::{fmt, ptr}; use crossbeam_utils::Backoff; @@ -16,7 +17,19 @@ use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub(crate) type ZeroToken = usize; +pub struct ZeroToken(*mut ()); + +impl Default for ZeroToken { + fn default() -> Self { + Self(ptr::null_mut()) + } +} + +impl fmt::Debug for ZeroToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&(self.0 as usize), f) + } +} /// A slot for passing one message from a sender to a receiver. struct Packet<T> { @@ -117,10 +130,10 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -130,11 +143,11 @@ impl<T> Channel<T> { /// Writes a message into the packet. pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(msg); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); packet.msg.get().write(Some(msg)); packet.ready.store(true, Ordering::Release); Ok(()) @@ -146,10 +159,10 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; true } else if inner.is_disconnected { - token.zero = 0; + token.zero.0 = ptr::null_mut(); true } else { false @@ -159,11 +172,11 @@ impl<T> Channel<T> { /// Reads a message from the packet. pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { // If there is no packet, the channel is disconnected. - if token.zero == 0 { + if token.zero.0.is_null() { return Err(()); } - let packet = &*(token.zero as *const Packet<T>); + let packet = &*(token.zero.0 as *const Packet<T>); if packet.on_stack { // The message has been in the packet from the beginning, so there is no need to wait @@ -177,7 +190,7 @@ impl<T> Channel<T> { // heap-allocated packet. packet.wait_ready(); let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); + drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); Ok(msg) } } @@ -189,7 +202,7 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -213,7 +226,7 @@ impl<T> Channel<T> { // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.write(token, msg).ok().unwrap(); @@ -228,10 +241,10 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::message_on_stack(msg); + let mut packet = Packet::<T>::message_on_stack(msg); inner .senders - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); inner.receivers.notify(); drop(inner); @@ -266,7 +279,7 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } } else if inner.is_disconnected { @@ -283,7 +296,7 @@ impl<T> Channel<T> { // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { - token.zero = operation.packet; + token.zero.0 = operation.packet; drop(inner); unsafe { return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); @@ -297,10 +310,12 @@ impl<T> Channel<T> { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - let packet = Packet::<T>::empty_on_stack(); - inner - .receivers - .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); + let mut packet = Packet::<T>::empty_on_stack(); + inner.receivers.register_with_packet( + oper, + &mut packet as *mut Packet<T> as *mut (), + cx, + ); inner.senders.notify(); drop(inner); @@ -385,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> { let mut inner = self.0.inner.lock(); inner .receivers - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.senders.notify(); inner.senders.can_select() || inner.is_disconnected } @@ -399,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } @@ -435,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> { let mut inner = self.0.inner.lock(); inner .senders - .register_with_packet(oper, packet as usize, cx); + .register_with_packet(oper, packet as *mut (), cx); inner.receivers.notify(); inner.receivers.can_select() || inner.is_disconnected } @@ -449,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> { } fn accept(&self, token: &mut Token, cx: &Context) -> bool { - token.zero = cx.wait_packet(); + token.zero.0 = cx.wait_packet(); true } diff --git a/src/select.rs b/src/select.rs index 5259328..6103ef4 100644 --- a/src/select.rs +++ b/src/select.rs @@ -19,6 +19,7 @@ use crate::utils; /// `read` or `write`. /// /// Each field contains data associated with a specific channel flavor. +// This is a private API that is used by the select macro. #[derive(Debug, Default)] pub struct Token { pub at: flavors::at::AtToken, @@ -93,6 +94,7 @@ impl Into<usize> for Selected { /// /// This is a handle that assists select in executing an operation, registration, deciding on the /// appropriate deadline for blocking, etc. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. pub trait SelectHandle { /// Attempts to select an operation and returns `true` on success. fn try_select(&self, token: &mut Token) -> bool; @@ -442,6 +444,7 @@ fn run_ready( } /// Attempts to select one of the operations without blocking. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], @@ -458,6 +461,7 @@ pub fn try_select<'a>( } /// Blocks until one of the operations becomes ready and selects it. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], @@ -476,6 +480,7 @@ pub fn select<'a>( } /// Blocks for a limited time until one of the operations becomes ready and selects it. +// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], diff --git a/src/waker.rs b/src/waker.rs index 62defa2..dec73a9 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -1,5 +1,6 @@ //! Waking mechanism for threads blocked on channel operations. +use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{self, ThreadId}; @@ -13,7 +14,7 @@ pub(crate) struct Entry { pub(crate) oper: Operation, /// Optional packet. - pub(crate) packet: usize, + pub(crate) packet: *mut (), /// Context associated with the thread owning this operation. pub(crate) cx: Context, @@ -44,12 +45,12 @@ impl Waker { /// Registers a select operation. #[inline] pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { - self.register_with_packet(oper, 0, cx); + self.register_with_packet(oper, ptr::null_mut(), cx); } /// Registers a select operation and a packet. #[inline] - pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) { + pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { self.selectors.push(Entry { oper, packet, @@ -76,34 +77,26 @@ impl Waker { /// Attempts to find another thread's entry, select the operation, and wake it up. #[inline] pub(crate) fn try_select(&mut self) -> Option<Entry> { - let mut entry = None; - - if !self.selectors.is_empty() { - let thread_id = current_thread_id(); - - for i in 0..self.selectors.len() { + self.selectors + .iter() + .position(|selector| { // Does the entry belong to a different thread? - if self.selectors[i].cx.thread_id() != thread_id { - // Try selecting this operation. - let sel = Selected::Operation(self.selectors[i].oper); - let res = self.selectors[i].cx.try_select(sel); - - if res.is_ok() { + selector.cx.thread_id() != current_thread_id() + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { // Provide the packet. - self.selectors[i].cx.store_packet(self.selectors[i].packet); + selector.cx.store_packet(selector.packet); // Wake the thread up. - self.selectors[i].cx.unpark(); - - // Remove the entry from the queue to keep it clean and improve - // performance. - entry = Some(self.selectors.remove(i)); - break; + selector.cx.unpark(); + true } - } - } - } - - entry + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) } /// Returns `true` if there is an entry which can be selected by the current thread. @@ -125,7 +118,7 @@ impl Waker { pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { self.observers.push(Entry { oper, - packet: 0, + packet: ptr::null_mut(), cx: cx.clone(), }); } @@ -269,7 +262,7 @@ impl SyncWaker { impl Drop for SyncWaker { #[inline] fn drop(&mut self) { - debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true); + debug_assert!(self.is_empty.load(Ordering::SeqCst)); } } |