diff options
author | Haibo Huang <hhb@google.com> | 2021-01-08 09:28:15 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-01-08 09:28:15 +0000 |
commit | 2a4b501d2542baa253eb6d681255d084ae600562 (patch) | |
tree | ba0351c97d9ed66538c4c26015dd411b0ad9f0be | |
parent | a07647964f99c2e99620b88eb3f9b7d58fec4efc (diff) | |
parent | 7cd98bbfd7a374a005eda423eeace5539ec0a066 (diff) | |
download | thread_local-2a4b501d2542baa253eb6d681255d084ae600562.tar.gz |
Upgrade rust/crates/thread_local to 1.1.0 am: 7cd98bbfd7
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/thread_local/+/1543627
MUST ONLY BE SUBMITTED BY AUTOMERGER
Change-Id: I5976728b25e530a067e4aaf05b0df6e2f10954f8
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | .travis.yml | 6 | ||||
-rw-r--r-- | Cargo.toml | 13 | ||||
-rw-r--r-- | Cargo.toml.orig | 12 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | README.md | 12 | ||||
-rw-r--r-- | TEST_MAPPING | 7 | ||||
-rw-r--r-- | benches/thread_local.rs | 36 | ||||
-rw-r--r-- | src/cached.rs | 103 | ||||
-rw-r--r-- | src/lib.rs | 383 | ||||
-rw-r--r-- | src/thread_id.rs | 110 | ||||
-rw-r--r-- | src/unreachable.rs | 35 |
12 files changed, 335 insertions, 394 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 4fc0fe6..6382fe2 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "34011020194908f3aa852cac59a83e81a325767e" + "sha1": "d9e93cbc8b8ff57351c65eb432323808da67744d" } } diff --git a/.travis.yml b/.travis.yml index e835e9f..1698520 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ rust: - nightly - beta - stable -- 1.27.2 +- 1.28.0 before_script: - | @@ -15,12 +15,12 @@ before_script: script: - travis-cargo build - travis-cargo test -- travis-cargo bench +# Criterion doesn't build on 1.28.0 +- travis-cargo --skip 1.28.0 bench -- --features criterion - travis-cargo doc -- --no-deps after_success: - travis-cargo --only nightly doc-upload - env: global: - TRAVIS_CARGO_NIGHTLY_FEATURE="" @@ -12,7 +12,7 @@ [package] name = "thread_local" -version = "1.0.1" +version = "1.1.0" authors = ["Amanieu d'Antras <amanieu@gmail.com>"] description = "Per-object thread-local storage" documentation = "https://amanieu.github.io/thread_local-rs/thread_local/index.html" @@ -20,7 +20,18 @@ readme = "README.md" keywords = ["thread_local", "concurrent", "thread"] license = "Apache-2.0/MIT" repository = "https://github.com/Amanieu/thread_local-rs" + +[[bench]] +name = "thread_local" +harness = false +required-features = ["criterion"] +[dependencies.criterion] +version = "0.3.3" +optional = true + [dependencies.lazy_static] version = "1.0" + +[dev-dependencies] [badges.travis-ci] repository = "Amanieu/thread_local-rs" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 83921af..d81a344 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "thread_local" -version = "1.0.1" +version = "1.1.0" authors = ["Amanieu d'Antras <amanieu@gmail.com>"] description = "Per-object thread-local storage" documentation = "https://amanieu.github.io/thread_local-rs/thread_local/index.html" @@ -14,3 +14,13 @@ travis-ci = { repository = "Amanieu/thread_local-rs" } [dependencies] lazy_static = "1.0" + +# This is actually a dev-dependency, see https://github.com/rust-lang/cargo/issues/1596 +criterion = { version = "0.3.3", optional = true } + +[dev-dependencies] + +[[bench]] +name = "thread_local" +required-features = ["criterion"] +harness = false @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/thread_local/thread_local-1.0.1.crate" + value: "https://static.crates.io/crates/thread_local/thread_local-1.1.0.crate" } - version: "1.0.1" + version: "1.1.0" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 3 - day: 31 + year: 2021 + month: 1 + day: 7 } } @@ -3,10 +3,10 @@ thread_local [![Build Status](https://travis-ci.org/Amanieu/thread_local-rs.svg?branch=master)](https://travis-ci.org/Amanieu/thread_local-rs) [![Crates.io](https://img.shields.io/crates/v/thread_local.svg)](https://crates.io/crates/thread_local) -This library provides the `ThreadLocal` and `CachedThreadLocal` types which -allow a separate copy of an object to be used for each thread. This allows for -per-object thread-local storage, unlike the standard library's `thread_local!` -macro which only allows static thread-local storage. +This library provides the `ThreadLocal` type which allow a separate copy of an +object to be used for each thread. This allows for per-object thread-local +storage, unlike the standard library's `thread_local!` macro which only allows +static thread-local storage. [Documentation](https://amanieu.github.io/thread_local-rs/thread_local/index.html) @@ -25,6 +25,10 @@ and this to your crate root: extern crate thread_local; ``` +## Minimum Rust version + +This crate's minimum supported Rust version (MSRV) is 1.28.0. + ## License Licensed under either of diff --git a/TEST_MAPPING b/TEST_MAPPING index 1412bd3..9e7c1a6 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -4,13 +4,6 @@ { "host": true, "name": "thread_local_host_test_src_lib" - }, - { - "host": true, - "name": "libsqlite3-sys_host_test_src_lib" - }, - { - "name": "libsqlite3-sys_device_test_src_lib" } ] } diff --git a/benches/thread_local.rs b/benches/thread_local.rs index 4cfaacd..ccad665 100644 --- a/benches/thread_local.rs +++ b/benches/thread_local.rs @@ -1,18 +1,28 @@ -#![feature(test)] - +extern crate criterion; extern crate thread_local; -extern crate test; -use thread_local::{ThreadLocal, CachedThreadLocal}; +use criterion::{black_box, BatchSize}; -#[bench] -fn thread_local(b: &mut test::Bencher) { - let local = ThreadLocal::new(); - b.iter(|| { let _: &i32 = local.get_or(|| Box::new(0)); }); -} +use thread_local::ThreadLocal; + +fn main() { + let mut c = criterion::Criterion::default().configure_from_args(); + + c.bench_function("get", |b| { + let local = ThreadLocal::new(); + local.get_or(|| Box::new(0)); + b.iter(|| { + black_box(local.get()); + }); + }); -#[bench] -fn cached_thread_local(b: &mut test::Bencher) { - let local = CachedThreadLocal::new(); - b.iter(|| { let _: &i32 = local.get_or(|| Box::new(0)); }); + c.bench_function("insert", |b| { + b.iter_batched_ref( + ThreadLocal::new, + |local| { + black_box(local.get_or(|| 0)); + }, + BatchSize::SmallInput, + ) + }); } diff --git a/src/cached.rs b/src/cached.rs index ab43c86..16f6516 100644 --- a/src/cached.rs +++ b/src/cached.rs @@ -1,25 +1,19 @@ +#![allow(deprecated)] + use super::{IntoIter, IterMut, ThreadLocal}; -use std::cell::UnsafeCell; use std::fmt; use std::panic::UnwindSafe; -use std::sync::atomic::{AtomicUsize, Ordering}; -use thread_id; -use unreachable::{UncheckedOptionExt, UncheckedResultExt}; +use std::usize; -/// Wrapper around `ThreadLocal` which adds a fast path for a single thread. +/// Wrapper around [`ThreadLocal`]. /// -/// This has the same API as `ThreadLocal`, but will register the first thread -/// that sets a value as its owner. All accesses by the owner will go through -/// a special fast path which is much faster than the normal `ThreadLocal` path. +/// This used to add a fast path for a single thread, however that has been +/// obsoleted by performance improvements to [`ThreadLocal`] itself. +#[deprecated(since = "1.1.0", note = "Use `ThreadLocal` instead")] pub struct CachedThreadLocal<T: Send> { - owner: AtomicUsize, - local: UnsafeCell<Option<Box<T>>>, - global: ThreadLocal<T>, + inner: ThreadLocal<T>, } -// CachedThreadLocal is always Sync, even if T isn't -unsafe impl<T: Send> Sync for CachedThreadLocal<T> {} - impl<T: Send> Default for CachedThreadLocal<T> { fn default() -> CachedThreadLocal<T> { CachedThreadLocal::new() @@ -28,71 +22,38 @@ impl<T: Send> Default for CachedThreadLocal<T> { impl<T: Send> CachedThreadLocal<T> { /// Creates a new empty `CachedThreadLocal`. + #[inline] pub fn new() -> CachedThreadLocal<T> { CachedThreadLocal { - owner: AtomicUsize::new(0), - local: UnsafeCell::new(None), - global: ThreadLocal::new(), + inner: ThreadLocal::new(), } } /// Returns the element for the current thread, if it exists. + #[inline] pub fn get(&self) -> Option<&T> { - let id = thread_id::get(); - let owner = self.owner.load(Ordering::Relaxed); - if owner == id { - return unsafe { Some((*self.local.get()).as_ref().unchecked_unwrap()) }; - } - if owner == 0 { - return None; - } - self.global.get_fast(id) + self.inner.get() } /// Returns the element for the current thread, or creates it if it doesn't /// exist. - #[inline(always)] + #[inline] pub fn get_or<F>(&self, create: F) -> &T where F: FnOnce() -> T, { - unsafe { - self.get_or_try(|| Ok::<T, ()>(create())) - .unchecked_unwrap_ok() - } + self.inner.get_or(create) } /// Returns the element for the current thread, or creates it if it doesn't /// exist. If `create` fails, that error is returned and no element is /// added. + #[inline] pub fn get_or_try<F, E>(&self, create: F) -> Result<&T, E> where F: FnOnce() -> Result<T, E>, { - let id = thread_id::get(); - let owner = self.owner.load(Ordering::Relaxed); - if owner == id { - return Ok(unsafe { (*self.local.get()).as_ref().unchecked_unwrap() }); - } - self.get_or_try_slow(id, owner, create) - } - - #[cold] - #[inline(never)] - fn get_or_try_slow<F, E>(&self, id: usize, owner: usize, create: F) -> Result<&T, E> - where - F: FnOnce() -> Result<T, E>, - { - if owner == 0 && self.owner.compare_and_swap(0, id, Ordering::Relaxed) == 0 { - unsafe { - (*self.local.get()) = Some(Box::new(create()?)); - return Ok((*self.local.get()).as_ref().unchecked_unwrap()); - } - } - match self.global.get_fast(id) { - Some(x) => Ok(x), - None => Ok(self.global.insert(id, Box::new(create()?), true)), - } + self.inner.get_or_try(create) } /// Returns a mutable iterator over the local values of all threads. @@ -100,10 +61,10 @@ impl<T: Send> CachedThreadLocal<T> { /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other /// threads are currently accessing their associated values. + #[inline] pub fn iter_mut(&mut self) -> CachedIterMut<T> { CachedIterMut { - local: unsafe { (*self.local.get()).as_mut().map(|x| &mut **x) }, - global: self.global.iter_mut(), + inner: self.inner.iter_mut(), } } @@ -113,8 +74,9 @@ impl<T: Send> CachedThreadLocal<T> { /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other /// threads are currently accessing their associated values. + #[inline] pub fn clear(&mut self) { - *self = CachedThreadLocal::new(); + self.inner.clear(); } } @@ -124,8 +86,7 @@ impl<T: Send> IntoIterator for CachedThreadLocal<T> { fn into_iter(self) -> CachedIntoIter<T> { CachedIntoIter { - local: unsafe { (*self.local.get()).take().map(|x| *x) }, - global: self.global.into_iter(), + inner: self.inner.into_iter(), } } } @@ -156,42 +117,44 @@ impl<T: Send + fmt::Debug> fmt::Debug for CachedThreadLocal<T> { impl<T: Send + UnwindSafe> UnwindSafe for CachedThreadLocal<T> {} /// Mutable iterator over the contents of a `CachedThreadLocal`. +#[deprecated(since = "1.1.0", note = "Use `IterMut` instead")] pub struct CachedIterMut<'a, T: Send + 'a> { - local: Option<&'a mut T>, - global: IterMut<'a, T>, + inner: IterMut<'a, T>, } impl<'a, T: Send + 'a> Iterator for CachedIterMut<'a, T> { type Item = &'a mut T; + #[inline] fn next(&mut self) -> Option<&'a mut T> { - self.local.take().or_else(|| self.global.next()) + self.inner.next() } + #[inline] fn size_hint(&self) -> (usize, Option<usize>) { - let len = self.global.size_hint().0 + self.local.is_some() as usize; - (len, Some(len)) + self.inner.size_hint() } } impl<'a, T: Send + 'a> ExactSizeIterator for CachedIterMut<'a, T> {} /// An iterator that moves out of a `CachedThreadLocal`. +#[deprecated(since = "1.1.0", note = "Use `IntoIter` instead")] pub struct CachedIntoIter<T: Send> { - local: Option<T>, - global: IntoIter<T>, + inner: IntoIter<T>, } impl<T: Send> Iterator for CachedIntoIter<T> { type Item = T; + #[inline] fn next(&mut self) -> Option<T> { - self.local.take().or_else(|| self.global.next()) + self.inner.next() } + #[inline] fn size_hint(&self) -> (usize, Option<usize>) { - let len = self.global.size_hint().0 + self.local.is_some() as usize; - (len, Some(len)) + self.inner.size_hint() } } @@ -20,11 +20,6 @@ //! only be done if you have mutable access to the `ThreadLocal` object, which //! guarantees that you are the only thread currently accessing it. //! -//! A `CachedThreadLocal` type is also provided which wraps a `ThreadLocal` but -//! also uses a special fast path for the first thread that writes into it. The -//! fast path has very low overhead (<1ns per access) while keeping the same -//! performance as `ThreadLocal` for other threads. -//! //! Note that since thread IDs are recycled when a thread exits, it is possible //! for one thread to retrieve the object of another thread. Since this can only //! occur after a thread has exited this does not lead to any race conditions. @@ -69,57 +64,55 @@ //! ``` #![warn(missing_docs)] +#![allow(clippy::mutex_atomic)] #[macro_use] extern crate lazy_static; +mod cached; mod thread_id; mod unreachable; -mod cached; +#[allow(deprecated)] pub use cached::{CachedIntoIter, CachedIterMut, CachedThreadLocal}; use std::cell::UnsafeCell; use std::fmt; use std::marker::PhantomData; +use std::mem; use std::panic::UnwindSafe; -use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Mutex; +use thread_id::Thread; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; +// Use usize::BITS once it has stabilized and the MSRV has been bumped. +#[cfg(target_pointer_width = "16")] +const POINTER_WIDTH: u8 = 16; +#[cfg(target_pointer_width = "32")] +const POINTER_WIDTH: u8 = 32; +#[cfg(target_pointer_width = "64")] +const POINTER_WIDTH: u8 = 64; + +/// The total number of buckets stored in each thread local. +const BUCKETS: usize = (POINTER_WIDTH + 1) as usize; + /// Thread-local variable wrapper /// /// See the [module-level documentation](index.html) for more. pub struct ThreadLocal<T: Send> { - // Pointer to the current top-level hash table - table: AtomicPtr<Table<T>>, - - // Lock used to guard against concurrent modifications. This is only taken - // while writing to the table, not when reading from it. This also guards - // the counter for the total number of values in the hash table. + /// The buckets in the thread local. The nth bucket contains `2^(n-1)` + /// elements. Each bucket is lazily allocated. + buckets: [AtomicPtr<UnsafeCell<Option<T>>>; BUCKETS], + + /// Lock used to guard against concurrent modifications. This is taken when + /// there is a possibility of allocating a new bucket, which only occurs + /// when inserting values. This also guards the counter for the total number + /// of values in the thread local. lock: Mutex<usize>, } -struct Table<T: Send> { - // Hash entries for the table - entries: Box<[TableEntry<T>]>, - - // Number of bits used for the hash function - hash_bits: usize, - - // Previous table, half the size of the current one - prev: Option<Box<Table<T>>>, -} - -struct TableEntry<T: Send> { - // Current owner of this entry, or 0 if this is an empty entry - owner: AtomicUsize, - - // The object associated with this entry. This is only ever accessed by the - // owner of the entry. - data: UnsafeCell<Option<Box<T>>>, -} - // ThreadLocal is always Sync, even if T isn't unsafe impl<T: Send> Sync for ThreadLocal<T> {} @@ -131,56 +124,63 @@ impl<T: Send> Default for ThreadLocal<T> { impl<T: Send> Drop for ThreadLocal<T> { fn drop(&mut self) { - unsafe { - Box::from_raw(self.table.load(Ordering::Relaxed)); - } - } -} + let mut bucket_size = 1; + + // Free each non-null bucket + for (i, bucket) in self.buckets.iter_mut().enumerate() { + let bucket_ptr = *bucket.get_mut(); + + let this_bucket_size = bucket_size; + if i != 0 { + bucket_size <<= 1; + } + + if bucket_ptr.is_null() { + continue; + } -// Implementation of Clone for TableEntry, needed to make vec![] work -impl<T: Send> Clone for TableEntry<T> { - fn clone(&self) -> TableEntry<T> { - TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), + unsafe { Box::from_raw(std::slice::from_raw_parts_mut(bucket_ptr, this_bucket_size)) }; } } } -// Hash function for the thread id -#[cfg(target_pointer_width = "32")] -#[inline] -fn hash(id: usize, bits: usize) -> usize { - id.wrapping_mul(0x9E3779B9) >> (32 - bits) -} -#[cfg(target_pointer_width = "64")] -#[inline] -fn hash(id: usize, bits: usize) -> usize { - id.wrapping_mul(0x9E37_79B9_7F4A_7C15) >> (64 - bits) -} - impl<T: Send> ThreadLocal<T> { /// Creates a new empty `ThreadLocal`. pub fn new() -> ThreadLocal<T> { - let entry = TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), - }; - let table = Table { - entries: vec![entry; 2].into_boxed_slice(), - hash_bits: 1, - prev: None, - }; + Self::with_capacity(2) + } + + /// Creates a new `ThreadLocal` with an initial capacity. If less than the capacity threads + /// access the thread local it will never reallocate. The capacity may be rounded up to the + /// nearest power of two. + pub fn with_capacity(capacity: usize) -> ThreadLocal<T> { + let allocated_buckets = capacity + .checked_sub(1) + .map(|c| usize::from(POINTER_WIDTH) - (c.leading_zeros() as usize) + 1) + .unwrap_or(0); + + let mut buckets = [ptr::null_mut(); BUCKETS]; + let mut bucket_size = 1; + for (i, bucket) in buckets[..allocated_buckets].iter_mut().enumerate() { + *bucket = allocate_bucket::<T>(bucket_size); + + if i != 0 { + bucket_size <<= 1; + } + } + ThreadLocal { - table: AtomicPtr::new(Box::into_raw(Box::new(table))), + // Safety: AtomicPtr has the same representation as a pointer and arrays have the same + // representation as a sequence of their inner type. + buckets: unsafe { mem::transmute(buckets) }, lock: Mutex::new(0), } } /// Returns the element for the current thread, if it exists. pub fn get(&self) -> Option<&T> { - let id = thread_id::get(); - self.get_fast(id) + let thread = thread_id::get(); + self.get_inner(thread) } /// Returns the element for the current thread, or creates it if it doesn't @@ -202,117 +202,64 @@ impl<T: Send> ThreadLocal<T> { where F: FnOnce() -> Result<T, E>, { - let id = thread_id::get(); - match self.get_fast(id) { + let thread = thread_id::get(); + match self.get_inner(thread) { Some(x) => Ok(x), - None => Ok(self.insert(id, Box::new(create()?), true)), - } - } - - // Simple hash table lookup function - fn lookup(id: usize, table: &Table<T>) -> Option<&UnsafeCell<Option<Box<T>>>> { - // Because we use a Mutex to prevent concurrent modifications (but not - // reads) of the hash table, we can avoid any memory barriers here. No - // elements between our hash bucket and our value can have been modified - // since we inserted our thread-local value into the table. - for entry in table.entries.iter().cycle().skip(hash(id, table.hash_bits)) { - let owner = entry.owner.load(Ordering::Relaxed); - if owner == id { - return Some(&entry.data); - } - if owner == 0 { - return None; - } + None => Ok(self.insert(thread, create()?)), } - unreachable!(); } - // Fast path: try to find our thread in the top-level hash table - fn get_fast(&self, id: usize) -> Option<&T> { - let table = unsafe { &*self.table.load(Ordering::Acquire) }; - match Self::lookup(id, table) { - Some(x) => unsafe { Some((*x.get()).as_ref().unchecked_unwrap()) }, - None => self.get_slow(id, table), - } - } - - // Slow path: try to find our thread in the other hash tables, and then - // move it to the top-level hash table. - #[cold] - fn get_slow(&self, id: usize, table_top: &Table<T>) -> Option<&T> { - let mut current = &table_top.prev; - while let Some(ref table) = *current { - if let Some(x) = Self::lookup(id, table) { - let data = unsafe { (*x.get()).take().unchecked_unwrap() }; - return Some(self.insert(id, data, false)); - } - current = &table.prev; + fn get_inner(&self, thread: Thread) -> Option<&T> { + let bucket_ptr = + unsafe { self.buckets.get_unchecked(thread.bucket) }.load(Ordering::Acquire); + if bucket_ptr.is_null() { + return None; } - None + unsafe { (&*(&*bucket_ptr.add(thread.index)).get()).as_ref() } } #[cold] - fn insert(&self, id: usize, data: Box<T>, new: bool) -> &T { - // Lock the Mutex to ensure only a single thread is modify the hash - // table at once. + fn insert(&self, thread: Thread, data: T) -> &T { + // Lock the Mutex to ensure only a single thread is allocating buckets at once let mut count = self.lock.lock().unwrap(); - if new { - *count += 1; - } - let table_raw = self.table.load(Ordering::Relaxed); - let table = unsafe { &*table_raw }; - - // If the current top-level hash table is more than 75% full, add a new - // level with 2x the capacity. Elements will be moved up to the new top - // level table as they are accessed. - let table = if *count > table.entries.len() * 3 / 4 { - let entry = TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), - }; - let new_table = Box::into_raw(Box::new(Table { - entries: vec![entry; table.entries.len() * 2].into_boxed_slice(), - hash_bits: table.hash_bits + 1, - prev: unsafe { Some(Box::from_raw(table_raw)) }, - })); - self.table.store(new_table, Ordering::Release); - unsafe { &*new_table } + *count += 1; + + let bucket_atomic_ptr = unsafe { self.buckets.get_unchecked(thread.bucket) }; + + let bucket_ptr: *const _ = bucket_atomic_ptr.load(Ordering::Acquire); + let bucket_ptr = if bucket_ptr.is_null() { + // Allocate a new bucket + let bucket_ptr = allocate_bucket(thread.bucket_size); + bucket_atomic_ptr.store(bucket_ptr, Ordering::Release); + bucket_ptr } else { - table + bucket_ptr }; - // Insert the new element into the top-level hash table - for entry in table.entries.iter().cycle().skip(hash(id, table.hash_bits)) { - let owner = entry.owner.load(Ordering::Relaxed); - if owner == 0 { - unsafe { - entry.owner.store(id, Ordering::Relaxed); - *entry.data.get() = Some(data); - return (*entry.data.get()).as_ref().unchecked_unwrap(); - } - } - if owner == id { - // This can happen if create() inserted a value into this - // ThreadLocal between our calls to get_fast() and insert(). We - // just return the existing value and drop the newly-allocated - // Box. - unsafe { - return (*entry.data.get()).as_ref().unchecked_unwrap(); - } - } + drop(count); + + // Insert the new element into the bucket + unsafe { + let value_ptr = (&*bucket_ptr.add(thread.index)).get(); + *value_ptr = Some(data); + (&*value_ptr).as_ref().unchecked_unwrap() } - unreachable!(); } fn raw_iter(&mut self) -> RawIter<T> { RawIter { remaining: *self.lock.get_mut().unwrap(), + buckets: unsafe { + *(&self.buckets as *const _ as *const [*const UnsafeCell<Option<T>>; BUCKETS]) + }, + bucket: 0, + bucket_size: 1, index: 0, - table: self.table.load(Ordering::Relaxed), } } - /// Returns a mutable iterator over the local values of all threads. + /// Returns a mutable iterator over the local values of all threads in + /// unspecified order. /// /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other @@ -374,30 +321,42 @@ impl<T: Send + UnwindSafe> UnwindSafe for ThreadLocal<T> {} struct RawIter<T: Send> { remaining: usize, + buckets: [*const UnsafeCell<Option<T>>; BUCKETS], + bucket: usize, + bucket_size: usize, index: usize, - table: *const Table<T>, } impl<T: Send> Iterator for RawIter<T> { - type Item = *mut Option<Box<T>>; + type Item = *mut Option<T>; - fn next(&mut self) -> Option<*mut Option<Box<T>>> { + fn next(&mut self) -> Option<Self::Item> { if self.remaining == 0 { return None; } loop { - let entries = unsafe { &(*self.table).entries[..] }; - while self.index < entries.len() { - let val = entries[self.index].data.get(); - self.index += 1; - if unsafe { (*val).is_some() } { - self.remaining -= 1; - return Some(val); + let bucket = unsafe { *self.buckets.get_unchecked(self.bucket) }; + + if !bucket.is_null() { + while self.index < self.bucket_size { + let item = unsafe { (&*bucket.add(self.index)).get() }; + + self.index += 1; + + if unsafe { &*item }.is_some() { + self.remaining -= 1; + return Some(item); + } } } + + if self.bucket != 0 { + self.bucket_size <<= 1; + } + self.bucket += 1; + self.index = 0; - self.table = unsafe { &**(*self.table).prev.as_ref().unchecked_unwrap() }; } } @@ -418,7 +377,7 @@ impl<'a, T: Send + 'a> Iterator for IterMut<'a, T> { fn next(&mut self) -> Option<&'a mut T> { self.raw .next() - .map(|x| unsafe { &mut **(*x).as_mut().unchecked_unwrap() }) + .map(|x| unsafe { &mut *(*x).as_mut().unchecked_unwrap() }) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -440,7 +399,7 @@ impl<T: Send> Iterator for IntoIter<T> { fn next(&mut self) -> Option<T> { self.raw .next() - .map(|x| unsafe { *(*x).take().unchecked_unwrap() }) + .map(|x| unsafe { (*x).take().unchecked_unwrap() }) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -450,9 +409,18 @@ impl<T: Send> Iterator for IntoIter<T> { impl<T: Send> ExactSizeIterator for IntoIter<T> {} +fn allocate_bucket<T>(size: usize) -> *mut UnsafeCell<Option<T>> { + Box::into_raw( + (0..size) + .map(|_| UnsafeCell::new(None::<T>)) + .collect::<Vec<_>>() + .into_boxed_slice(), + ) as *mut _ +} + #[cfg(test)] mod tests { - use super::{CachedThreadLocal, ThreadLocal}; + use super::ThreadLocal; use std::cell::RefCell; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -482,23 +450,6 @@ mod tests { } #[test] - fn same_thread_cached() { - let create = make_create(); - let mut tls = CachedThreadLocal::new(); - assert_eq!(None, tls.get()); - assert_eq!("ThreadLocal { local_data: None }", format!("{:?}", &tls)); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!("ThreadLocal { local_data: Some(0) }", format!("{:?}", &tls)); - tls.clear(); - assert_eq!(None, tls.get()); - } - - #[test] fn different_thread() { let create = make_create(); let tls = Arc::new(ThreadLocal::new()); @@ -521,28 +472,6 @@ mod tests { } #[test] - fn different_thread_cached() { - let create = make_create(); - let tls = Arc::new(CachedThreadLocal::new()); - assert_eq!(None, tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - - let tls2 = tls.clone(); - let create2 = create.clone(); - thread::spawn(move || { - assert_eq!(None, tls2.get()); - assert_eq!(1, *tls2.get_or(|| create2())); - assert_eq!(Some(&1), tls2.get()); - }) - .join() - .unwrap(); - - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - } - - #[test] fn iter() { let tls = Arc::new(ThreadLocal::new()); tls.get_or(|| Box::new(1)); @@ -556,43 +485,17 @@ mod tests { }) .join() .unwrap(); + drop(tls2); }) .join() .unwrap(); let mut tls = Arc::try_unwrap(tls).unwrap(); let mut v = tls.iter_mut().map(|x| **x).collect::<Vec<i32>>(); - v.sort(); - assert_eq!(vec![1, 2, 3], v); - let mut v = tls.into_iter().map(|x| *x).collect::<Vec<i32>>(); - v.sort(); - assert_eq!(vec![1, 2, 3], v); - } - - #[test] - fn iter_cached() { - let tls = Arc::new(CachedThreadLocal::new()); - tls.get_or(|| Box::new(1)); - - let tls2 = tls.clone(); - thread::spawn(move || { - tls2.get_or(|| Box::new(2)); - let tls3 = tls2.clone(); - thread::spawn(move || { - tls3.get_or(|| Box::new(3)); - }) - .join() - .unwrap(); - }) - .join() - .unwrap(); - - let mut tls = Arc::try_unwrap(tls).unwrap(); - let mut v = tls.iter_mut().map(|x| **x).collect::<Vec<i32>>(); - v.sort(); + v.sort_unstable(); assert_eq!(vec![1, 2, 3], v); let mut v = tls.into_iter().map(|x| *x).collect::<Vec<i32>>(); - v.sort(); + v.sort_unstable(); assert_eq!(vec![1, 2, 3], v); } @@ -601,7 +504,5 @@ mod tests { fn foo<T: Sync>() {} foo::<ThreadLocal<String>>(); foo::<ThreadLocal<RefCell<String>>>(); - foo::<CachedThreadLocal<String>>(); - foo::<CachedThreadLocal<RefCell<String>>>(); } } diff --git a/src/thread_id.rs b/src/thread_id.rs index e757948..397f772 100644 --- a/src/thread_id.rs +++ b/src/thread_id.rs @@ -5,57 +5,123 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. +use std::cmp::Reverse; use std::collections::BinaryHeap; use std::sync::Mutex; use std::usize; +use POINTER_WIDTH; -// Thread ID manager which allocates thread IDs. It attempts to aggressively -// reuse thread IDs where possible to avoid cases where a ThreadLocal grows -// indefinitely when it is used by many short-lived threads. +/// Thread ID manager which allocates thread IDs. It attempts to aggressively +/// reuse thread IDs where possible to avoid cases where a ThreadLocal grows +/// indefinitely when it is used by many short-lived threads. struct ThreadIdManager { - limit: usize, - free_list: BinaryHeap<usize>, + free_from: usize, + free_list: BinaryHeap<Reverse<usize>>, } impl ThreadIdManager { fn new() -> ThreadIdManager { ThreadIdManager { - limit: usize::MAX, + free_from: 0, free_list: BinaryHeap::new(), } } fn alloc(&mut self) -> usize { if let Some(id) = self.free_list.pop() { - id + id.0 } else { - let id = self.limit; - self.limit = self.limit.checked_sub(1).expect("Ran out of thread IDs"); + let id = self.free_from; + self.free_from = self + .free_from + .checked_add(1) + .expect("Ran out of thread IDs"); id } } fn free(&mut self, id: usize) { - self.free_list.push(id); + self.free_list.push(Reverse(id)); } } lazy_static! { static ref THREAD_ID_MANAGER: Mutex<ThreadIdManager> = Mutex::new(ThreadIdManager::new()); } -// Non-zero integer which is unique to the current thread while it is running. -// A thread ID may be reused after a thread exits. -struct ThreadId(usize); -impl ThreadId { - fn new() -> ThreadId { - ThreadId(THREAD_ID_MANAGER.lock().unwrap().alloc()) +/// Data which is unique to the current thread while it is running. +/// A thread ID may be reused after a thread exits. +#[derive(Clone, Copy)] +pub(crate) struct Thread { + /// The thread ID obtained from the thread ID manager. + pub(crate) id: usize, + /// The bucket this thread's local storage will be in. + pub(crate) bucket: usize, + /// The size of the bucket this thread's local storage will be in. + pub(crate) bucket_size: usize, + /// The index into the bucket this thread's local storage is in. + pub(crate) index: usize, +} +impl Thread { + fn new(id: usize) -> Thread { + let bucket = usize::from(POINTER_WIDTH) - id.leading_zeros() as usize; + let bucket_size = 1 << bucket.saturating_sub(1); + let index = if id != 0 { id ^ bucket_size } else { 0 }; + + Thread { + id, + bucket, + bucket_size, + index, + } + } +} + +/// Wrapper around `Thread` that allocates and deallocates the ID. +struct ThreadHolder(Thread); +impl ThreadHolder { + fn new() -> ThreadHolder { + ThreadHolder(Thread::new(THREAD_ID_MANAGER.lock().unwrap().alloc())) } } -impl Drop for ThreadId { +impl Drop for ThreadHolder { fn drop(&mut self) { - THREAD_ID_MANAGER.lock().unwrap().free(self.0); + THREAD_ID_MANAGER.lock().unwrap().free(self.0.id); } } -thread_local!(static THREAD_ID: ThreadId = ThreadId::new()); -/// Returns a non-zero ID for the current thread -pub fn get() -> usize { - THREAD_ID.with(|x| x.0) +thread_local!(static THREAD_HOLDER: ThreadHolder = ThreadHolder::new()); + +/// Get the current thread. +pub(crate) fn get() -> Thread { + THREAD_HOLDER.with(|holder| holder.0) +} + +#[test] +fn test_thread() { + let thread = Thread::new(0); + assert_eq!(thread.id, 0); + assert_eq!(thread.bucket, 0); + assert_eq!(thread.bucket_size, 1); + assert_eq!(thread.index, 0); + + let thread = Thread::new(1); + assert_eq!(thread.id, 1); + assert_eq!(thread.bucket, 1); + assert_eq!(thread.bucket_size, 1); + assert_eq!(thread.index, 0); + + let thread = Thread::new(2); + assert_eq!(thread.id, 2); + assert_eq!(thread.bucket, 2); + assert_eq!(thread.bucket_size, 2); + assert_eq!(thread.index, 0); + + let thread = Thread::new(3); + assert_eq!(thread.id, 3); + assert_eq!(thread.bucket, 2); + assert_eq!(thread.bucket_size, 2); + assert_eq!(thread.index, 1); + + let thread = Thread::new(19); + assert_eq!(thread.id, 19); + assert_eq!(thread.bucket, 5); + assert_eq!(thread.bucket_size, 16); + assert_eq!(thread.index, 3); } diff --git a/src/unreachable.rs b/src/unreachable.rs index baff766..db4d831 100644 --- a/src/unreachable.rs +++ b/src/unreachable.rs @@ -5,26 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. -//! # unreachable -//! inlined from https://github.com/reem/rust-unreachable/ -//! -//! An unreachable code optimization hint in stable rust, and some useful -//! extension traits for `Option` and `Result`. -//! - -/// Hint to the optimizer that any code path which calls this function is -/// statically unreachable and can be removed. -/// -/// Calling this function in reachable code invokes undefined behavior. Be -/// very, very sure this is what you want; often, a simple `panic!` is more -/// suitable. -#[inline] -pub unsafe fn unreachable() -> ! { - /// The empty type for cases which can't occur. - enum Void { } - let x: &Void = ::std::mem::transmute(1usize); - match *x {} -} +use std::hint::unreachable_unchecked; /// An extension trait for `Option<T>` providing unchecked unwrapping methods. pub trait UncheckedOptionExt<T> { @@ -48,12 +29,14 @@ impl<T> UncheckedOptionExt<T> for Option<T> { unsafe fn unchecked_unwrap(self) -> T { match self { Some(x) => x, - None => unreachable() + None => unreachable_unchecked(), } } unsafe fn unchecked_unwrap_none(self) { - if self.is_some() { unreachable() } + if self.is_some() { + unreachable_unchecked() + } } } @@ -61,14 +44,14 @@ impl<T, E> UncheckedResultExt<T, E> for Result<T, E> { unsafe fn unchecked_unwrap_ok(self) -> T { match self { Ok(x) => x, - Err(_) => unreachable() + Err(_) => unreachable_unchecked(), } } unsafe fn unchecked_unwrap_err(self) -> E { match self { - Ok(_) => unreachable(), - Err(e) => e + Ok(_) => unreachable_unchecked(), + Err(e) => e, } } -}
\ No newline at end of file +} |