aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/once_lock.rs103
-rw-r--r--src/sync/parker.rs6
-rw-r--r--src/sync/sharded_lock.rs22
-rw-r--r--src/sync/wait_group.rs5
5 files changed, 124 insertions, 14 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index eeb740c..f9eec71 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -4,6 +4,8 @@
//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads.
//! * [`WaitGroup`], for synchronizing the beginning or end of some computation.
+#[cfg(not(crossbeam_loom))]
+mod once_lock;
mod parker;
#[cfg(not(crossbeam_loom))]
mod sharded_lock;
diff --git a/src/sync/once_lock.rs b/src/sync/once_lock.rs
new file mode 100644
index 0000000..c1fefc9
--- /dev/null
+++ b/src/sync/once_lock.rs
@@ -0,0 +1,103 @@
+// Based on unstable std::sync::OnceLock.
+//
+// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs
+
+use core::cell::UnsafeCell;
+use core::mem::MaybeUninit;
+use core::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Once;
+
+pub(crate) struct OnceLock<T> {
+ once: Once,
+ // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized.
+ is_initialized: AtomicBool,
+ value: UnsafeCell<MaybeUninit<T>>,
+ // Unlike std::sync::OnceLock, we don't need PhantomData here because
+ // we don't use #[may_dangle].
+}
+
+unsafe impl<T: Sync + Send> Sync for OnceLock<T> {}
+unsafe impl<T: Send> Send for OnceLock<T> {}
+
+impl<T> OnceLock<T> {
+ /// Creates a new empty cell.
+ #[must_use]
+ pub(crate) const fn new() -> Self {
+ Self {
+ once: Once::new(),
+ is_initialized: AtomicBool::new(false),
+ value: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ }
+
+ /// Gets the contents of the cell, initializing it with `f` if the cell
+ /// was empty.
+ ///
+ /// Many threads may call `get_or_init` concurrently with different
+ /// initializing functions, but it is guaranteed that only one function
+ /// will be executed.
+ ///
+ /// # Panics
+ ///
+ /// If `f` panics, the panic is propagated to the caller, and the cell
+ /// remains uninitialized.
+ ///
+ /// It is an error to reentrantly initialize the cell from `f`. The
+ /// exact outcome is unspecified. Current implementation deadlocks, but
+ /// this may be changed to a panic in the future.
+ pub(crate) fn get_or_init<F>(&self, f: F) -> &T
+ where
+ F: FnOnce() -> T,
+ {
+ // Fast path check
+ if self.is_initialized() {
+ // SAFETY: The inner value has been initialized
+ return unsafe { self.get_unchecked() };
+ }
+ self.initialize(f);
+
+ debug_assert!(self.is_initialized());
+
+ // SAFETY: The inner value has been initialized
+ unsafe { self.get_unchecked() }
+ }
+
+ #[inline]
+ fn is_initialized(&self) -> bool {
+ self.is_initialized.load(Ordering::Acquire)
+ }
+
+ #[cold]
+ fn initialize<F>(&self, f: F)
+ where
+ F: FnOnce() -> T,
+ {
+ let slot = self.value.get().cast::<T>();
+ let is_initialized = &self.is_initialized;
+
+ self.once.call_once(|| {
+ let value = f();
+ unsafe {
+ slot.write(value);
+ }
+ is_initialized.store(true, Ordering::Release);
+ });
+ }
+
+ /// # Safety
+ ///
+ /// The value must be initialized
+ unsafe fn get_unchecked(&self) -> &T {
+ debug_assert!(self.is_initialized());
+ &*self.value.get().cast::<T>()
+ }
+}
+
+impl<T> Drop for OnceLock<T> {
+ fn drop(&mut self) {
+ if self.is_initialized() {
+ // SAFETY: The inner value has been initialized
+ unsafe { self.value.get().cast::<T>().drop_in_place() };
+ }
+ }
+}
diff --git a/src/sync/parker.rs b/src/sync/parker.rs
index 531f5a5..e791c44 100644
--- a/src/sync/parker.rs
+++ b/src/sync/parker.rs
@@ -44,6 +44,7 @@ use std::time::{Duration, Instant};
///
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
+/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// [`park`]: Parker::park
@@ -241,6 +242,7 @@ impl Unparker {
///
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// [`park`]: Parker::park
@@ -262,7 +264,7 @@ impl Unparker {
/// # let _ = unsafe { Unparker::from_raw(raw) };
/// ```
pub fn into_raw(this: Unparker) -> *const () {
- Arc::into_raw(this.inner) as *const ()
+ Arc::into_raw(this.inner).cast::<()>()
}
/// Converts a raw pointer into an `Unparker`.
@@ -284,7 +286,7 @@ impl Unparker {
/// ```
pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
Unparker {
- inner: Arc::from_raw(ptr as *const Inner),
+ inner: Arc::from_raw(ptr.cast::<Inner>()),
}
}
}
diff --git a/src/sync/sharded_lock.rs b/src/sync/sharded_lock.rs
index 163d34c..b43c55e 100644
--- a/src/sync/sharded_lock.rs
+++ b/src/sync/sharded_lock.rs
@@ -9,8 +9,8 @@ use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread::{self, ThreadId};
+use crate::sync::once_lock::OnceLock;
use crate::CachePadded;
-use lazy_static::lazy_static;
/// The number of shards per sharded lock. Must be a power of two.
const NUM_SHARDS: usize = 8;
@@ -583,12 +583,16 @@ struct ThreadIndices {
next_index: usize,
}
-lazy_static! {
- static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
- mapping: HashMap::new(),
- free_list: Vec::new(),
- next_index: 0,
- });
+fn thread_indices() -> &'static Mutex<ThreadIndices> {
+ static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
+ fn init() -> Mutex<ThreadIndices> {
+ Mutex::new(ThreadIndices {
+ mapping: HashMap::new(),
+ free_list: Vec::new(),
+ next_index: 0,
+ })
+ }
+ THREAD_INDICES.get_or_init(init)
}
/// A registration of a thread with an index.
@@ -601,7 +605,7 @@ struct Registration {
impl Drop for Registration {
fn drop(&mut self) {
- let mut indices = THREAD_INDICES.lock().unwrap();
+ let mut indices = thread_indices().lock().unwrap();
indices.mapping.remove(&self.thread_id);
indices.free_list.push(self.index);
}
@@ -610,7 +614,7 @@ impl Drop for Registration {
thread_local! {
static REGISTRATION: Registration = {
let thread_id = thread::current().id();
- let mut indices = THREAD_INDICES.lock().unwrap();
+ let mut indices = thread_indices().lock().unwrap();
let index = match indices.free_list.pop() {
Some(i) => i,
diff --git a/src/sync/wait_group.rs b/src/sync/wait_group.rs
index 4206ee4..19d6074 100644
--- a/src/sync/wait_group.rs
+++ b/src/sync/wait_group.rs
@@ -1,6 +1,3 @@
-// Necessary for using `Mutex<usize>` for conditional variables
-#![allow(clippy::mutex_atomic)]
-
use crate::primitive::sync::{Arc, Condvar, Mutex};
use std::fmt;
@@ -42,6 +39,7 @@ use std::fmt;
///
/// // Block until all threads have finished their work.
/// wg.wait();
+/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// [`Barrier`]: std::sync::Barrier
@@ -100,6 +98,7 @@ impl WaitGroup {
///
/// // Block until both threads have reached `wait()`.
/// wg.wait();
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn wait(self) {
if *self.inner.count.lock().unwrap() == 1 {