diff options
Diffstat (limited to 'src/sync/mutex.rs')
-rw-r--r-- | src/sync/mutex.rs | 237 |
1 files changed, 224 insertions, 13 deletions
diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 4d9f988..024755c 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -1,6 +1,8 @@ #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] use crate::sync::batch_semaphore as semaphore; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::cell::UnsafeCell; use std::error::Error; @@ -124,6 +126,8 @@ use std::{fmt, marker, mem}; /// [`Send`]: trait@std::marker::Send /// [`lock`]: method@Mutex::lock pub struct Mutex<T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, s: semaphore::Semaphore, c: UnsafeCell<T>, } @@ -137,7 +141,10 @@ pub struct Mutex<T: ?Sized> { /// /// The lock is automatically released whenever the guard is dropped, at which /// point `lock` will succeed yet again. +#[must_use = "if unused the Mutex will immediately unlock"] pub struct MutexGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: &'a Mutex<T>, } @@ -157,6 +164,8 @@ pub struct MutexGuard<'a, T: ?Sized> { /// /// [`Arc`]: std::sync::Arc pub struct OwnedMutexGuard<T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: Arc<Mutex<T>>, } @@ -191,8 +200,8 @@ unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a /// `RwLock::try_read` operation will only fail if the lock is currently held /// by an exclusive writer. /// -/// `RwLock::try_write` operation will if lock is held by any reader or by an -/// exclusive writer. +/// `RwLock::try_write` operation will only fail if the lock is currently held +/// by any reader or by an exclusive writer. /// /// [`Mutex::try_lock`]: Mutex::try_lock /// [`RwLock::try_read`]: fn@super::RwLock::try_read @@ -242,13 +251,42 @@ impl<T: ?Sized> Mutex<T> { /// /// let lock = Mutex::new(5); /// ``` + #[track_caller] pub fn new(t: T) -> Self where T: Sized, { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Mutex", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + semaphore::Semaphore::new(1) + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let s = semaphore::Semaphore::new(1); + Self { c: UnsafeCell::new(t), - s: semaphore::Semaphore::new(1), + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -270,6 +308,8 @@ impl<T: ?Sized> Mutex<T> { Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::const_new(1), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -297,16 +337,50 @@ impl<T: ?Sized> Mutex<T> { /// } /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] self.acquire().await; - MutexGuard { lock: self } + + MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + } } - /// Blocking lock this mutex. When the lock has been acquired, function returns a + /// Blockingly locks this `Mutex`. When the lock has been acquired, function returns a /// [`MutexGuard`]. /// /// This method is intended for use cases where you /// need to use this mutex in asynchronous code as well as in synchronous code. /// + /// # Panics + /// + /// This function panics if called within an asynchronous execution context. + /// + /// - If you find yourself in an asynchronous execution context and needing + /// to call some (synchronous) function which performs one of these + /// `blocking_` operations, then consider wrapping that call inside + /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking] + /// (or [`block_in_place()`][crate::task::block_in_place]). + /// /// # Examples /// /// ``` @@ -316,25 +390,90 @@ impl<T: ?Sized> Mutex<T> { /// #[tokio::main] /// async fn main() { /// let mutex = Arc::new(Mutex::new(1)); + /// let lock = mutex.lock().await; /// /// let mutex1 = Arc::clone(&mutex); - /// let sync_code = tokio::task::spawn_blocking(move || { + /// let blocking_task = tokio::task::spawn_blocking(move || { + /// // This shall block until the `lock` is released. /// let mut n = mutex1.blocking_lock(); /// *n = 2; /// }); /// - /// sync_code.await.unwrap(); + /// assert_eq!(*lock, 1); + /// // Release the lock. + /// drop(lock); /// - /// let n = mutex.lock().await; + /// // Await the completion of the blocking task. + /// blocking_task.await.unwrap(); + /// + /// // Assert uncontended. + /// let n = mutex.try_lock().unwrap(); /// assert_eq!(*n, 2); /// } /// /// ``` + #[track_caller] #[cfg(feature = "sync")] pub fn blocking_lock(&self) -> MutexGuard<'_, T> { crate::future::block_on(self.lock()) } + /// Blockingly locks this `Mutex`. When the lock has been acquired, function returns an + /// [`OwnedMutexGuard`]. + /// + /// This method is identical to [`Mutex::blocking_lock`], except that the returned + /// guard references the `Mutex` with an [`Arc`] rather than by borrowing + /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this + /// method, and the guard will live for the `'static` lifetime, as it keeps + /// the `Mutex` alive by holding an `Arc`. + /// + /// # Panics + /// + /// This function panics if called within an asynchronous execution context. + /// + /// - If you find yourself in an asynchronous execution context and needing + /// to call some (synchronous) function which performs one of these + /// `blocking_` operations, then consider wrapping that call inside + /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking] + /// (or [`block_in_place()`][crate::task::block_in_place]). + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::Mutex; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Arc::new(Mutex::new(1)); + /// let lock = mutex.lock().await; + /// + /// let mutex1 = Arc::clone(&mutex); + /// let blocking_task = tokio::task::spawn_blocking(move || { + /// // This shall block until the `lock` is released. + /// let mut n = mutex1.blocking_lock_owned(); + /// *n = 2; + /// }); + /// + /// assert_eq!(*lock, 1); + /// // Release the lock. + /// drop(lock); + /// + /// // Await the completion of the blocking task. + /// blocking_task.await.unwrap(); + /// + /// // Assert uncontended. + /// let n = mutex.try_lock().unwrap(); + /// assert_eq!(*n, 2); + /// } + /// + /// ``` + #[track_caller] + #[cfg(feature = "sync")] + pub fn blocking_lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> { + crate::future::block_on(self.lock_owned()) + } + /// Locks this mutex, causing the current task to yield until the lock has /// been acquired. When the lock has been acquired, this returns an /// [`OwnedMutexGuard`]. @@ -368,8 +507,35 @@ impl<T: ?Sized> Mutex<T> { /// /// [`Arc`]: std::sync::Arc pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock_owned", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] self.acquire().await; - OwnedMutexGuard { lock: self } + + OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } async fn acquire(&self) { @@ -399,7 +565,21 @@ impl<T: ?Sized> Mutex<T> { /// ``` pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(MutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + Ok(MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + }) + } Err(_) => Err(TryLockError(())), } } @@ -454,7 +634,24 @@ impl<T: ?Sized> Mutex<T> { /// # } pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(OwnedMutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + Ok(OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + }) + } Err(_) => Err(TryLockError(())), } } @@ -626,7 +823,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { /// # async fn main() { /// # let mutex = Mutex::new(0u32); /// # let guard = mutex.lock().await; - /// # unlock_and_relock(guard).await; + /// # let _guard = unlock_and_relock(guard).await; /// # } /// ``` #[inline] @@ -637,7 +834,14 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl<T: ?Sized> Drop for MutexGuard<'_, T> { fn drop(&mut self) { - self.lock.s.release(1) + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); + self.lock.s.release(1); } } @@ -699,6 +903,13 @@ impl<T: ?Sized> OwnedMutexGuard<T> { impl<T: ?Sized> Drop for OwnedMutexGuard<T> { fn drop(&mut self) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); self.lock.s.release(1) } } |