aboutsummaryrefslogtreecommitdiff
path: root/src/sync/barrier.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/barrier.rs')
-rw-r--r--src/sync/barrier.rs65
1 files changed, 64 insertions, 1 deletions
diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs
index 0e39dac..2ce1d73 100644
--- a/src/sync/barrier.rs
+++ b/src/sync/barrier.rs
@@ -1,5 +1,7 @@
use crate::loom::sync::Mutex;
use crate::sync::watch;
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::util::trace;
/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
@@ -41,6 +43,8 @@ pub struct Barrier {
state: Mutex<BarrierState>,
wait: watch::Receiver<usize>,
n: usize,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
}
#[derive(Debug)]
@@ -55,6 +59,7 @@ impl Barrier {
///
/// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all
/// tasks at once when the `n`th task calls `wait`.
+ #[track_caller]
pub fn new(mut n: usize) -> Barrier {
let (waker, wait) = crate::sync::watch::channel(0);
@@ -65,6 +70,32 @@ impl Barrier {
n = 1;
}
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Barrier",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ size = n,
+ );
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ arrived = 0,
+ )
+ });
+ resource_span
+ };
+
Barrier {
state: Mutex::new(BarrierState {
waker,
@@ -73,6 +104,8 @@ impl Barrier {
}),
n,
wait,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
}
}
@@ -85,10 +118,24 @@ impl Barrier {
/// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks
/// will receive a result that will return `false` from `is_leader`.
pub async fn wait(&self) -> BarrierWaitResult {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ return trace::async_op(
+ || self.wait_internal(),
+ self.resource_span.clone(),
+ "Barrier::wait",
+ "poll",
+ false,
+ )
+ .await;
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ return self.wait_internal().await;
+ }
+ async fn wait_internal(&self) -> BarrierWaitResult {
// NOTE: we are taking a _synchronous_ lock here.
// It is okay to do so because the critical section is fast and never yields, so it cannot
// deadlock even if another future is concurrently holding the lock.
- // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than
+ // It is _desirable_ to do so as synchronous Mutexes are, at least in theory, faster than
// the asynchronous counter-parts, so we should use them where possible [citation needed].
// NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
// a yield point, and thus marks the returned future as !Send.
@@ -96,7 +143,23 @@ impl Barrier {
let mut state = self.state.lock();
let generation = state.generation;
state.arrived += 1;
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ arrived = 1,
+ arrived.op = "add",
+ );
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ tracing::trace!(
+ target: "runtime::resource::async_op::state_update",
+ arrived = true,
+ );
if state.arrived == self.n {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ tracing::trace!(
+ target: "runtime::resource::async_op::state_update",
+ is_leader = true,
+ );
// we are the leader for this generation
// wake everyone, increment the generation, and return
state