diff options
Diffstat (limited to 'src/sync/mpsc/chan.rs')
-rw-r--r-- | src/sync/mpsc/chan.rs | 64 |
1 files changed, 42 insertions, 22 deletions
diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs index c3007de..edd3e95 100644 --- a/src/sync/mpsc/chan.rs +++ b/src/sync/mpsc/chan.rs @@ -2,15 +2,14 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use crate::park::thread::CachedParkThread; -use crate::park::Park; +use crate::runtime::park::CachedParkThread; use crate::sync::mpsc::error::TryRecvError; -use crate::sync::mpsc::list; +use crate::sync::mpsc::{bounded, list, unbounded}; use crate::sync::notify::Notify; use std::fmt; use std::process; -use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; @@ -46,7 +45,7 @@ pub(crate) trait Semaphore { fn is_closed(&self) -> bool; } -struct Chan<T, S> { +pub(super) struct Chan<T, S> { /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, @@ -129,6 +128,30 @@ impl<T, S> Tx<T, S> { Tx { inner: chan } } + pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> { + self.inner.clone() + } + + // Returns the upgraded channel or None if the upgrade failed. + pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> { + let mut tx_count = chan.tx_count.load(Acquire); + + loop { + if tx_count == 0 { + // channel is closed + return None; + } + + match chan + .tx_count + .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire) + { + Ok(_) => return Some(Tx { inner: chan }), + Err(prev_count) => tx_count = prev_count, + } + } + } + pub(super) fn semaphore(&self) -> &S { &self.inner.semaphore } @@ -220,7 +243,7 @@ impl<T, S: Semaphore> Rx<T, S> { use super::block::Read::*; // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -301,13 +324,13 @@ impl<T, S: Semaphore> Rx<T, S> { // Park the thread until the problematic send has completed. let mut park = CachedParkThread::new(); - let waker = park.unpark().into_waker(); + let waker = park.waker().unwrap(); loop { self.inner.rx_waker.register_by_ref(&waker); // It is possible that the problematic send has now completed, // so we have to check for messages again. try_recv!(); - park.park().expect("park failed"); + park.park(); } }) } @@ -345,7 +368,7 @@ impl<T, S> Drop for Chan<T, S> { fn drop(&mut self) { use super::block::Read::Value; - // Safety: the only owner of the rx fields is Chan, and eing + // Safety: the only owner of the rx fields is Chan, and being // inside its own Drop means we're the last ones to touch it. self.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -358,32 +381,29 @@ impl<T, S> Drop for Chan<T, S> { // ===== impl Semaphore for (::Semaphore, capacity) ===== -impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { +impl Semaphore for bounded::Semaphore { fn add_permit(&self) { - self.0.release(1) + self.semaphore.release(1) } fn is_idle(&self) -> bool { - self.0.available_permits() == self.1 + self.semaphore.available_permits() == self.bound } fn close(&self) { - self.0.close(); + self.semaphore.close(); } fn is_closed(&self) -> bool { - self.0.is_closed() + self.semaphore.is_closed() } } // ===== impl Semaphore for AtomicUsize ===== -use std::sync::atomic::Ordering::{Acquire, Release}; -use std::usize; - -impl Semaphore for AtomicUsize { +impl Semaphore for unbounded::Semaphore { fn add_permit(&self) { - let prev = self.fetch_sub(2, Release); + let prev = self.0.fetch_sub(2, Release); if prev >> 1 == 0 { // Something went wrong @@ -392,14 +412,14 @@ impl Semaphore for AtomicUsize { } fn is_idle(&self) -> bool { - self.load(Acquire) >> 1 == 0 + self.0.load(Acquire) >> 1 == 0 } fn close(&self) { - self.fetch_or(1, Release); + self.0.fetch_or(1, Release); } fn is_closed(&self) -> bool { - self.load(Acquire) & 1 == 1 + self.0.load(Acquire) & 1 == 1 } } |