aboutsummaryrefslogtreecommitdiff
path: root/src/waker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/waker.rs')
-rw-r--r--src/waker.rs51
1 files changed, 22 insertions, 29 deletions
diff --git a/src/waker.rs b/src/waker.rs
index 62defa2..dec73a9 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -1,5 +1,6 @@
//! Waking mechanism for threads blocked on channel operations.
+use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, ThreadId};
@@ -13,7 +14,7 @@ pub(crate) struct Entry {
pub(crate) oper: Operation,
/// Optional packet.
- pub(crate) packet: usize,
+ pub(crate) packet: *mut (),
/// Context associated with the thread owning this operation.
pub(crate) cx: Context,
@@ -44,12 +45,12 @@ impl Waker {
/// Registers a select operation.
#[inline]
pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
- self.register_with_packet(oper, 0, cx);
+ self.register_with_packet(oper, ptr::null_mut(), cx);
}
/// Registers a select operation and a packet.
#[inline]
- pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry {
oper,
packet,
@@ -76,34 +77,26 @@ impl Waker {
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
- let mut entry = None;
-
- if !self.selectors.is_empty() {
- let thread_id = current_thread_id();
-
- for i in 0..self.selectors.len() {
+ self.selectors
+ .iter()
+ .position(|selector| {
// Does the entry belong to a different thread?
- if self.selectors[i].cx.thread_id() != thread_id {
- // Try selecting this operation.
- let sel = Selected::Operation(self.selectors[i].oper);
- let res = self.selectors[i].cx.try_select(sel);
-
- if res.is_ok() {
+ selector.cx.thread_id() != current_thread_id()
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
// Provide the packet.
- self.selectors[i].cx.store_packet(self.selectors[i].packet);
+ selector.cx.store_packet(selector.packet);
// Wake the thread up.
- self.selectors[i].cx.unpark();
-
- // Remove the entry from the queue to keep it clean and improve
- // performance.
- entry = Some(self.selectors.remove(i));
- break;
+ selector.cx.unpark();
+ true
}
- }
- }
- }
-
- entry
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
}
/// Returns `true` if there is an entry which can be selected by the current thread.
@@ -125,7 +118,7 @@ impl Waker {
pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
self.observers.push(Entry {
oper,
- packet: 0,
+ packet: ptr::null_mut(),
cx: cx.clone(),
});
}
@@ -269,7 +262,7 @@ impl SyncWaker {
impl Drop for SyncWaker {
#[inline]
fn drop(&mut self) {
- debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
}
}