aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc/chan.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc/chan.rs')
-rw-r--r--src/sync/mpsc/chan.rs64
1 files changed, 42 insertions, 22 deletions
diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs
index c3007de..edd3e95 100644
--- a/src/sync/mpsc/chan.rs
+++ b/src/sync/mpsc/chan.rs
@@ -2,15 +2,14 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use crate::park::thread::CachedParkThread;
-use crate::park::Park;
+use crate::runtime::park::CachedParkThread;
use crate::sync::mpsc::error::TryRecvError;
-use crate::sync::mpsc::list;
+use crate::sync::mpsc::{bounded, list, unbounded};
use crate::sync::notify::Notify;
use std::fmt;
use std::process;
-use std::sync::atomic::Ordering::{AcqRel, Relaxed};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
@@ -46,7 +45,7 @@ pub(crate) trait Semaphore {
fn is_closed(&self) -> bool;
}
-struct Chan<T, S> {
+pub(super) struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,
@@ -129,6 +128,30 @@ impl<T, S> Tx<T, S> {
Tx { inner: chan }
}
+ pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
+ self.inner.clone()
+ }
+
+ // Returns the upgraded channel or None if the upgrade failed.
+ pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
+ let mut tx_count = chan.tx_count.load(Acquire);
+
+ loop {
+ if tx_count == 0 {
+ // channel is closed
+ return None;
+ }
+
+ match chan
+ .tx_count
+ .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
+ {
+ Ok(_) => return Some(Tx { inner: chan }),
+ Err(prev_count) => tx_count = prev_count,
+ }
+ }
+ }
+
pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}
@@ -220,7 +243,7 @@ impl<T, S: Semaphore> Rx<T, S> {
use super::block::Read::*;
// Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
@@ -301,13 +324,13 @@ impl<T, S: Semaphore> Rx<T, S> {
// Park the thread until the problematic send has completed.
let mut park = CachedParkThread::new();
- let waker = park.unpark().into_waker();
+ let waker = park.waker().unwrap();
loop {
self.inner.rx_waker.register_by_ref(&waker);
// It is possible that the problematic send has now completed,
// so we have to check for messages again.
try_recv!();
- park.park().expect("park failed");
+ park.park();
}
})
}
@@ -345,7 +368,7 @@ impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
- // Safety: the only owner of the rx fields is Chan, and eing
+ // Safety: the only owner of the rx fields is Chan, and being
// inside its own Drop means we're the last ones to touch it.
self.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
@@ -358,32 +381,29 @@ impl<T, S> Drop for Chan<T, S> {
// ===== impl Semaphore for (::Semaphore, capacity) =====
-impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
+impl Semaphore for bounded::Semaphore {
fn add_permit(&self) {
- self.0.release(1)
+ self.semaphore.release(1)
}
fn is_idle(&self) -> bool {
- self.0.available_permits() == self.1
+ self.semaphore.available_permits() == self.bound
}
fn close(&self) {
- self.0.close();
+ self.semaphore.close();
}
fn is_closed(&self) -> bool {
- self.0.is_closed()
+ self.semaphore.is_closed()
}
}
// ===== impl Semaphore for AtomicUsize =====
-use std::sync::atomic::Ordering::{Acquire, Release};
-use std::usize;
-
-impl Semaphore for AtomicUsize {
+impl Semaphore for unbounded::Semaphore {
fn add_permit(&self) {
- let prev = self.fetch_sub(2, Release);
+ let prev = self.0.fetch_sub(2, Release);
if prev >> 1 == 0 {
// Something went wrong
@@ -392,14 +412,14 @@ impl Semaphore for AtomicUsize {
}
fn is_idle(&self) -> bool {
- self.load(Acquire) >> 1 == 0
+ self.0.load(Acquire) >> 1 == 0
}
fn close(&self) {
- self.fetch_or(1, Release);
+ self.0.fetch_or(1, Release);
}
fn is_closed(&self) -> bool {
- self.load(Acquire) & 1 == 1
+ self.0.load(Acquire) & 1 == 1
}
}