aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mutex.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mutex.rs')
-rw-r--r--src/sync/mutex.rs237
1 files changed, 224 insertions, 13 deletions
diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs
index 4d9f988..024755c 100644
--- a/src/sync/mutex.rs
+++ b/src/sync/mutex.rs
@@ -1,6 +1,8 @@
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
use crate::sync::batch_semaphore as semaphore;
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::util::trace;
use std::cell::UnsafeCell;
use std::error::Error;
@@ -124,6 +126,8 @@ use std::{fmt, marker, mem};
/// [`Send`]: trait@std::marker::Send
/// [`lock`]: method@Mutex::lock
pub struct Mutex<T: ?Sized> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
s: semaphore::Semaphore,
c: UnsafeCell<T>,
}
@@ -137,7 +141,10 @@ pub struct Mutex<T: ?Sized> {
///
/// The lock is automatically released whenever the guard is dropped, at which
/// point `lock` will succeed yet again.
+#[must_use = "if unused the Mutex will immediately unlock"]
pub struct MutexGuard<'a, T: ?Sized> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
lock: &'a Mutex<T>,
}
@@ -157,6 +164,8 @@ pub struct MutexGuard<'a, T: ?Sized> {
///
/// [`Arc`]: std::sync::Arc
pub struct OwnedMutexGuard<T: ?Sized> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
lock: Arc<Mutex<T>>,
}
@@ -191,8 +200,8 @@ unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a
/// `RwLock::try_read` operation will only fail if the lock is currently held
/// by an exclusive writer.
///
-/// `RwLock::try_write` operation will if lock is held by any reader or by an
-/// exclusive writer.
+/// `RwLock::try_write` operation will only fail if the lock is currently held
+/// by any reader or by an exclusive writer.
///
/// [`Mutex::try_lock`]: Mutex::try_lock
/// [`RwLock::try_read`]: fn@super::RwLock::try_read
@@ -242,13 +251,42 @@ impl<T: ?Sized> Mutex<T> {
///
/// let lock = Mutex::new(5);
/// ```
+ #[track_caller]
pub fn new(t: T) -> Self
where
T: Sized,
{
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+
+ tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Mutex",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ )
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let s = resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = false,
+ );
+ semaphore::Semaphore::new(1)
+ });
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let s = semaphore::Semaphore::new(1);
+
Self {
c: UnsafeCell::new(t),
- s: semaphore::Semaphore::new(1),
+ s,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
}
}
@@ -270,6 +308,8 @@ impl<T: ?Sized> Mutex<T> {
Self {
c: UnsafeCell::new(t),
s: semaphore::Semaphore::const_new(1),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span::none(),
}
}
@@ -297,16 +337,50 @@ impl<T: ?Sized> Mutex<T> {
/// }
/// ```
pub async fn lock(&self) -> MutexGuard<'_, T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ trace::async_op(
+ || self.acquire(),
+ self.resource_span.clone(),
+ "Mutex::lock",
+ "poll",
+ false,
+ )
+ .await;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = true,
+ );
+ });
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
self.acquire().await;
- MutexGuard { lock: self }
+
+ MutexGuard {
+ lock: self,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ }
}
- /// Blocking lock this mutex. When the lock has been acquired, function returns a
+ /// Blockingly locks this `Mutex`. When the lock has been acquired, function returns a
/// [`MutexGuard`].
///
/// This method is intended for use cases where you
/// need to use this mutex in asynchronous code as well as in synchronous code.
///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution context.
+ ///
+ /// - If you find yourself in an asynchronous execution context and needing
+ /// to call some (synchronous) function which performs one of these
+ /// `blocking_` operations, then consider wrapping that call inside
+ /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
+ /// (or [`block_in_place()`][crate::task::block_in_place]).
+ ///
/// # Examples
///
/// ```
@@ -316,25 +390,90 @@ impl<T: ?Sized> Mutex<T> {
/// #[tokio::main]
/// async fn main() {
/// let mutex = Arc::new(Mutex::new(1));
+ /// let lock = mutex.lock().await;
///
/// let mutex1 = Arc::clone(&mutex);
- /// let sync_code = tokio::task::spawn_blocking(move || {
+ /// let blocking_task = tokio::task::spawn_blocking(move || {
+ /// // This shall block until the `lock` is released.
/// let mut n = mutex1.blocking_lock();
/// *n = 2;
/// });
///
- /// sync_code.await.unwrap();
+ /// assert_eq!(*lock, 1);
+ /// // Release the lock.
+ /// drop(lock);
///
- /// let n = mutex.lock().await;
+ /// // Await the completion of the blocking task.
+ /// blocking_task.await.unwrap();
+ ///
+ /// // Assert uncontended.
+ /// let n = mutex.try_lock().unwrap();
/// assert_eq!(*n, 2);
/// }
///
/// ```
+ #[track_caller]
#[cfg(feature = "sync")]
pub fn blocking_lock(&self) -> MutexGuard<'_, T> {
crate::future::block_on(self.lock())
}
+ /// Blockingly locks this `Mutex`. When the lock has been acquired, function returns an
+ /// [`OwnedMutexGuard`].
+ ///
+ /// This method is identical to [`Mutex::blocking_lock`], except that the returned
+ /// guard references the `Mutex` with an [`Arc`] rather than by borrowing
+ /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this
+ /// method, and the guard will live for the `'static` lifetime, as it keeps
+ /// the `Mutex` alive by holding an `Arc`.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution context.
+ ///
+ /// - If you find yourself in an asynchronous execution context and needing
+ /// to call some (synchronous) function which performs one of these
+ /// `blocking_` operations, then consider wrapping that call inside
+ /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
+ /// (or [`block_in_place()`][crate::task::block_in_place]).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Mutex;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mutex = Arc::new(Mutex::new(1));
+ /// let lock = mutex.lock().await;
+ ///
+ /// let mutex1 = Arc::clone(&mutex);
+ /// let blocking_task = tokio::task::spawn_blocking(move || {
+ /// // This shall block until the `lock` is released.
+ /// let mut n = mutex1.blocking_lock_owned();
+ /// *n = 2;
+ /// });
+ ///
+ /// assert_eq!(*lock, 1);
+ /// // Release the lock.
+ /// drop(lock);
+ ///
+ /// // Await the completion of the blocking task.
+ /// blocking_task.await.unwrap();
+ ///
+ /// // Assert uncontended.
+ /// let n = mutex.try_lock().unwrap();
+ /// assert_eq!(*n, 2);
+ /// }
+ ///
+ /// ```
+ #[track_caller]
+ #[cfg(feature = "sync")]
+ pub fn blocking_lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
+ crate::future::block_on(self.lock_owned())
+ }
+
/// Locks this mutex, causing the current task to yield until the lock has
/// been acquired. When the lock has been acquired, this returns an
/// [`OwnedMutexGuard`].
@@ -368,8 +507,35 @@ impl<T: ?Sized> Mutex<T> {
///
/// [`Arc`]: std::sync::Arc
pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ trace::async_op(
+ || self.acquire(),
+ self.resource_span.clone(),
+ "Mutex::lock_owned",
+ "poll",
+ false,
+ )
+ .await;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = true,
+ );
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
self.acquire().await;
- OwnedMutexGuard { lock: self }
+
+ OwnedMutexGuard {
+ lock: self,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ }
}
async fn acquire(&self) {
@@ -399,7 +565,21 @@ impl<T: ?Sized> Mutex<T> {
/// ```
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
match self.s.try_acquire(1) {
- Ok(_) => Ok(MutexGuard { lock: self }),
+ Ok(_) => {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = true,
+ );
+ });
+
+ Ok(MutexGuard {
+ lock: self,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: self.resource_span.clone(),
+ })
+ }
Err(_) => Err(TryLockError(())),
}
}
@@ -454,7 +634,24 @@ impl<T: ?Sized> Mutex<T> {
/// # }
pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
match self.s.try_acquire(1) {
- Ok(_) => Ok(OwnedMutexGuard { lock: self }),
+ Ok(_) => {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = true,
+ );
+ });
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+
+ Ok(OwnedMutexGuard {
+ lock: self,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span,
+ })
+ }
Err(_) => Err(TryLockError(())),
}
}
@@ -626,7 +823,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
/// # async fn main() {
/// # let mutex = Mutex::new(0u32);
/// # let guard = mutex.lock().await;
- /// # unlock_and_relock(guard).await;
+ /// # let _guard = unlock_and_relock(guard).await;
/// # }
/// ```
#[inline]
@@ -637,7 +834,14 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
- self.lock.s.release(1)
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = false,
+ );
+ });
+ self.lock.s.release(1);
}
}
@@ -699,6 +903,13 @@ impl<T: ?Sized> OwnedMutexGuard<T> {
impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
fn drop(&mut self) {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ locked = false,
+ );
+ });
self.lock.s.release(1)
}
}