diff options
Diffstat (limited to 'src/waker.rs')
-rw-r--r-- | src/waker.rs | 51 |
1 files changed, 22 insertions, 29 deletions
diff --git a/src/waker.rs b/src/waker.rs index 62defa2..dec73a9 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -1,5 +1,6 @@ //! Waking mechanism for threads blocked on channel operations. +use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{self, ThreadId}; @@ -13,7 +14,7 @@ pub(crate) struct Entry { pub(crate) oper: Operation, /// Optional packet. - pub(crate) packet: usize, + pub(crate) packet: *mut (), /// Context associated with the thread owning this operation. pub(crate) cx: Context, @@ -44,12 +45,12 @@ impl Waker { /// Registers a select operation. #[inline] pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { - self.register_with_packet(oper, 0, cx); + self.register_with_packet(oper, ptr::null_mut(), cx); } /// Registers a select operation and a packet. #[inline] - pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) { + pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { self.selectors.push(Entry { oper, packet, @@ -76,34 +77,26 @@ impl Waker { /// Attempts to find another thread's entry, select the operation, and wake it up. #[inline] pub(crate) fn try_select(&mut self) -> Option<Entry> { - let mut entry = None; - - if !self.selectors.is_empty() { - let thread_id = current_thread_id(); - - for i in 0..self.selectors.len() { + self.selectors + .iter() + .position(|selector| { // Does the entry belong to a different thread? - if self.selectors[i].cx.thread_id() != thread_id { - // Try selecting this operation. - let sel = Selected::Operation(self.selectors[i].oper); - let res = self.selectors[i].cx.try_select(sel); - - if res.is_ok() { + selector.cx.thread_id() != current_thread_id() + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { // Provide the packet. - self.selectors[i].cx.store_packet(self.selectors[i].packet); + selector.cx.store_packet(selector.packet); // Wake the thread up. - self.selectors[i].cx.unpark(); - - // Remove the entry from the queue to keep it clean and improve - // performance. - entry = Some(self.selectors.remove(i)); - break; + selector.cx.unpark(); + true } - } - } - } - - entry + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) } /// Returns `true` if there is an entry which can be selected by the current thread. @@ -125,7 +118,7 @@ impl Waker { pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { self.observers.push(Entry { oper, - packet: 0, + packet: ptr::null_mut(), cx: cx.clone(), }); } @@ -269,7 +262,7 @@ impl SyncWaker { impl Drop for SyncWaker { #[inline] fn drop(&mut self) { - debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true); + debug_assert!(self.is_empty.load(Ordering::SeqCst)); } } |