aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/context.rs20
-rw-r--r--src/flavors/array.rs8
-rw-r--r--src/flavors/zero.rs63
-rw-r--r--src/select.rs5
-rw-r--r--src/waker.rs51
5 files changed, 80 insertions, 67 deletions
diff --git a/src/context.rs b/src/context.rs
index e2e8480..7467b80 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -1,7 +1,8 @@
//! Thread-local context used in select.
use std::cell::Cell;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread, ThreadId};
use std::time::Instant;
@@ -11,6 +12,7 @@ use crossbeam_utils::Backoff;
use crate::select::Selected;
/// Thread-local context used in select.
+// This is a private API that is used by the select macro.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
@@ -23,7 +25,7 @@ struct Inner {
select: AtomicUsize,
/// A slot into which another thread may store a pointer to its `Packet`.
- packet: AtomicUsize,
+ packet: AtomicPtr<()>,
/// Thread handle.
thread: Thread,
@@ -45,7 +47,7 @@ impl Context {
}
let mut f = Some(f);
- let mut f = move |cx: &Context| -> R {
+ let mut f = |cx: &Context| -> R {
let f = f.take().unwrap();
f(cx)
};
@@ -69,7 +71,7 @@ impl Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
- packet: AtomicUsize::new(0),
+ packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread_id: thread::current().id(),
}),
@@ -82,7 +84,7 @@ impl Context {
self.inner
.select
.store(Selected::Waiting.into(), Ordering::Release);
- self.inner.packet.store(0, Ordering::Release);
+ self.inner.packet.store(ptr::null_mut(), Ordering::Release);
}
/// Attempts to select an operation.
@@ -112,19 +114,19 @@ impl Context {
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
- pub fn store_packet(&self, packet: usize) {
- if packet != 0 {
+ pub fn store_packet(&self, packet: *mut ()) {
+ if !packet.is_null() {
self.inner.packet.store(packet, Ordering::Release);
}
}
/// Waits until a packet is provided and returns it.
#[inline]
- pub fn wait_packet(&self) -> usize {
+ pub fn wait_packet(&self) -> *mut () {
let backoff = Backoff::new();
loop {
let packet = self.inner.packet.load(Ordering::Acquire);
- if packet != 0 {
+ if !packet.is_null() {
return packet;
}
backoff.snooze();
diff --git a/src/flavors/array.rs b/src/flavors/array.rs
index c49eef1..871768c 100644
--- a/src/flavors/array.rs
+++ b/src/flavors/array.rs
@@ -10,7 +10,7 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
-use std::mem::{self, MaybeUninit};
+use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::time::Instant;
@@ -110,7 +110,7 @@ impl<T> Channel<T> {
// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer = {
- let mut boxed: Box<[Slot<T>]> = (0..cap)
+ let boxed: Box<[Slot<T>]> = (0..cap)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot {
@@ -119,9 +119,7 @@ impl<T> Channel<T> {
}
})
.collect();
- let ptr = boxed.as_mut_ptr();
- mem::forget(boxed);
- ptr
+ Box::into_raw(boxed) as *mut Slot<T>
};
Channel {
diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs
index 9790b77..4afbd8f 100644
--- a/src/flavors/zero.rs
+++ b/src/flavors/zero.rs
@@ -6,6 +6,7 @@ use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
+use std::{fmt, ptr};
use crossbeam_utils::Backoff;
@@ -16,7 +17,19 @@ use crate::utils::Spinlock;
use crate::waker::Waker;
/// A pointer to a packet.
-pub(crate) type ZeroToken = usize;
+pub struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
@@ -117,10 +130,10 @@ impl<T> Channel<T> {
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -130,11 +143,11 @@ impl<T> Channel<T> {
/// Writes a message into the packet.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(msg);
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
@@ -146,10 +159,10 @@ impl<T> Channel<T> {
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
- token.zero = 0;
+ token.zero.0 = ptr::null_mut();
true
} else {
false
@@ -159,11 +172,11 @@ impl<T> Channel<T> {
/// Reads a message from the packet.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
- if token.zero == 0 {
+ if token.zero.0.is_null() {
return Err(());
}
- let packet = &*(token.zero as *const Packet<T>);
+ let packet = &*(token.zero.0 as *const Packet<T>);
if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
@@ -177,7 +190,7 @@ impl<T> Channel<T> {
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
- drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
+ drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
}
@@ -189,7 +202,7 @@ impl<T> Channel<T> {
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -213,7 +226,7 @@ impl<T> Channel<T> {
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
@@ -228,10 +241,10 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::message_on_stack(msg);
+ let mut packet = Packet::<T>::message_on_stack(msg);
inner
.senders
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);
@@ -266,7 +279,7 @@ impl<T> Channel<T> {
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
@@ -283,7 +296,7 @@ impl<T> Channel<T> {
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
- token.zero = operation.packet;
+ token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
@@ -297,10 +310,12 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
- let packet = Packet::<T>::empty_on_stack();
- inner
- .receivers
- .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
inner.senders.notify();
drop(inner);
@@ -385,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
let mut inner = self.0.inner.lock();
inner
.receivers
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
@@ -399,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}
@@ -435,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> {
let mut inner = self.0.inner.lock();
inner
.senders
- .register_with_packet(oper, packet as usize, cx);
+ .register_with_packet(oper, packet as *mut (), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
@@ -449,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
- token.zero = cx.wait_packet();
+ token.zero.0 = cx.wait_packet();
true
}
diff --git a/src/select.rs b/src/select.rs
index 5259328..6103ef4 100644
--- a/src/select.rs
+++ b/src/select.rs
@@ -19,6 +19,7 @@ use crate::utils;
/// `read` or `write`.
///
/// Each field contains data associated with a specific channel flavor.
+// This is a private API that is used by the select macro.
#[derive(Debug, Default)]
pub struct Token {
pub at: flavors::at::AtToken,
@@ -93,6 +94,7 @@ impl Into<usize> for Selected {
///
/// This is a handle that assists select in executing an operation, registration, deciding on the
/// appropriate deadline for blocking, etc.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
pub trait SelectHandle {
/// Attempts to select an operation and returns `true` on success.
fn try_select(&self, token: &mut Token) -> bool;
@@ -442,6 +444,7 @@ fn run_ready(
}
/// Attempts to select one of the operations without blocking.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
@@ -458,6 +461,7 @@ pub fn try_select<'a>(
}
/// Blocks until one of the operations becomes ready and selects it.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
@@ -476,6 +480,7 @@ pub fn select<'a>(
}
/// Blocks for a limited time until one of the operations becomes ready and selects it.
+// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
diff --git a/src/waker.rs b/src/waker.rs
index 62defa2..dec73a9 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -1,5 +1,6 @@
//! Waking mechanism for threads blocked on channel operations.
+use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, ThreadId};
@@ -13,7 +14,7 @@ pub(crate) struct Entry {
pub(crate) oper: Operation,
/// Optional packet.
- pub(crate) packet: usize,
+ pub(crate) packet: *mut (),
/// Context associated with the thread owning this operation.
pub(crate) cx: Context,
@@ -44,12 +45,12 @@ impl Waker {
/// Registers a select operation.
#[inline]
pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
- self.register_with_packet(oper, 0, cx);
+ self.register_with_packet(oper, ptr::null_mut(), cx);
}
/// Registers a select operation and a packet.
#[inline]
- pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry {
oper,
packet,
@@ -76,34 +77,26 @@ impl Waker {
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
- let mut entry = None;
-
- if !self.selectors.is_empty() {
- let thread_id = current_thread_id();
-
- for i in 0..self.selectors.len() {
+ self.selectors
+ .iter()
+ .position(|selector| {
// Does the entry belong to a different thread?
- if self.selectors[i].cx.thread_id() != thread_id {
- // Try selecting this operation.
- let sel = Selected::Operation(self.selectors[i].oper);
- let res = self.selectors[i].cx.try_select(sel);
-
- if res.is_ok() {
+ selector.cx.thread_id() != current_thread_id()
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
// Provide the packet.
- self.selectors[i].cx.store_packet(self.selectors[i].packet);
+ selector.cx.store_packet(selector.packet);
// Wake the thread up.
- self.selectors[i].cx.unpark();
-
- // Remove the entry from the queue to keep it clean and improve
- // performance.
- entry = Some(self.selectors.remove(i));
- break;
+ selector.cx.unpark();
+ true
}
- }
- }
- }
-
- entry
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
}
/// Returns `true` if there is an entry which can be selected by the current thread.
@@ -125,7 +118,7 @@ impl Waker {
pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
self.observers.push(Entry {
oper,
- packet: 0,
+ packet: ptr::null_mut(),
cx: cx.clone(),
});
}
@@ -269,7 +262,7 @@ impl SyncWaker {
impl Drop for SyncWaker {
#[inline]
fn drop(&mut self) {
- debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
}
}