diff options
Diffstat (limited to 'src/sync/semaphore.rs')
-rw-r--r-- | src/sync/semaphore.rs | 141 |
1 files changed, 133 insertions, 8 deletions
diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs index 839b523..6e5a1a8 100644 --- a/src/sync/semaphore.rs +++ b/src/sync/semaphore.rs @@ -1,5 +1,7 @@ use super::batch_semaphore as ll; // low level implementation use super::{AcquireError, TryAcquireError}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::sync::Arc; /// Counting semaphore performing asynchronous permit acquisition. @@ -77,6 +79,8 @@ use std::sync::Arc; pub struct Semaphore { /// The low level semaphore ll_sem: ll::Semaphore, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } /// A permit from the semaphore. @@ -119,10 +123,41 @@ fn bounds() { } impl Semaphore { + /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >>> 3`. + /// + /// Exceeding this limit typically results in a panic. + pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS; + /// Creates a new semaphore with the initial number of permits. + /// + /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`]. + #[track_caller] pub fn new(permits: usize) -> Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + inherits_child_attrs = true, + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let ll_sem = ll::Semaphore::new(permits); + Self { - ll_sem: ll::Semaphore::new(permits), + ll_sem, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -139,9 +174,16 @@ impl Semaphore { #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new(permits: usize) -> Self { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Self { ll_sem: ll::Semaphore::const_new(permits), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { + ll_sem: ll::Semaphore::const_new(permits), + }; } /// Returns the current number of available permits. @@ -151,7 +193,7 @@ impl Semaphore { /// Adds `n` new permits to the semaphore. /// - /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded. + /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded. pub fn add_permits(&self, n: usize) { self.ll_sem.release(n); } @@ -191,7 +233,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(SemaphorePermit { sem: self, permits: 1, @@ -227,7 +280,19 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many", + "poll", + true, + ) + .await?; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] self.ll_sem.acquire(n).await?; + Ok(SemaphorePermit { sem: self, permits: n, @@ -350,7 +415,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: 1, @@ -403,7 +479,18 @@ impl Semaphore { self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError> { - self.ll_sem.acquire(n).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(n); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: n, @@ -540,6 +627,25 @@ impl<'a> SemaphorePermit<'a> { pub fn forget(mut self) { self.permits = 0; } + + /// Merge two [`SemaphorePermit`] instances together, consuming `other` + /// without releasing the permits it holds. + /// + /// Permits held by both `self` and `other` are released when `self` drops. + /// + /// # Panics + /// + /// This function panics if permits from different [`Semaphore`] instances + /// are merged. + #[track_caller] + pub fn merge(&mut self, mut other: Self) { + assert!( + std::ptr::eq(self.sem, other.sem), + "merging permits from different semaphore instances" + ); + self.permits += other.permits; + other.permits = 0; + } } impl OwnedSemaphorePermit { @@ -549,9 +655,28 @@ impl OwnedSemaphorePermit { pub fn forget(mut self) { self.permits = 0; } + + /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other` + /// without releasing the permits it holds. + /// + /// Permits held by both `self` and `other` are released when `self` drops. + /// + /// # Panics + /// + /// This function panics if permits from different [`Semaphore`] instances + /// are merged. + #[track_caller] + pub fn merge(&mut self, mut other: Self) { + assert!( + Arc::ptr_eq(&self.sem, &other.sem), + "merging permits from different semaphore instances" + ); + self.permits += other.permits; + other.permits = 0; + } } -impl<'a> Drop for SemaphorePermit<'_> { +impl Drop for SemaphorePermit<'_> { fn drop(&mut self) { self.sem.add_permits(self.permits as usize); } |