aboutsummaryrefslogtreecommitdiff
path: root/src/bytes_mut.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bytes_mut.rs')
-rw-r--r--src/bytes_mut.rs330
1 files changed, 272 insertions, 58 deletions
diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs
index 147484d..70613b2 100644
--- a/src/bytes_mut.rs
+++ b/src/bytes_mut.rs
@@ -1,5 +1,5 @@
use core::iter::{FromIterator, Iterator};
-use core::mem::{self, ManuallyDrop};
+use core::mem::{self, ManuallyDrop, MaybeUninit};
use core::ops::{Deref, DerefMut};
use core::ptr::{self, NonNull};
use core::{cmp, fmt, hash, isize, slice, usize};
@@ -8,6 +8,7 @@ use alloc::{
borrow::{Borrow, BorrowMut},
boxed::Box,
string::String,
+ vec,
vec::Vec,
};
@@ -15,7 +16,7 @@ use crate::buf::{IntoIter, UninitSlice};
use crate::bytes::Vtable;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
-use crate::loom::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
+use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::{Buf, BufMut, Bytes};
/// A unique reference to a contiguous slice of memory.
@@ -252,12 +253,28 @@ impl BytesMut {
let ptr = self.ptr.as_ptr();
let len = self.len;
- let data = AtomicPtr::new(self.data as _);
+ let data = AtomicPtr::new(self.data.cast());
mem::forget(self);
unsafe { Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) }
}
}
+ /// Creates a new `BytesMut`, which is initialized with zero.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use bytes::BytesMut;
+ ///
+ /// let zeros = BytesMut::zeroed(42);
+ ///
+ /// assert_eq!(zeros.len(), 42);
+ /// zeros.into_iter().for_each(|x| assert_eq!(x, 0));
+ /// ```
+ pub fn zeroed(len: usize) -> BytesMut {
+ BytesMut::from_vec(vec![0; len])
+ }
+
/// Splits the bytes into two at the given index.
///
/// Afterwards `self` contains elements `[0, at)`, and the returned
@@ -494,11 +511,20 @@ impl BytesMut {
/// reallocations. A call to `reserve` may result in an allocation.
///
/// Before allocating new buffer space, the function will attempt to reclaim
- /// space in the existing buffer. If the current handle references a small
- /// view in the original buffer and all other handles have been dropped,
- /// and the requested capacity is less than or equal to the existing
- /// buffer's capacity, then the current view will be copied to the front of
- /// the buffer and the handle will take ownership of the full buffer.
+ /// space in the existing buffer. If the current handle references a view
+ /// into a larger original buffer, and all other handles referencing part
+ /// of the same original buffer have been dropped, then the current view
+ /// can be copied/shifted to the front of the buffer and the handle can take
+ /// ownership of the full buffer, provided that the full buffer is large
+ /// enough to fit the requested additional capacity.
+ ///
+ /// This optimization will only happen if shifting the data from the current
+ /// view to the front of the buffer is not too expensive in terms of the
+ /// (amortized) time required. The precise condition is subject to change;
+ /// as of now, the length of the data being shifted needs to be at least as
+ /// large as the distance that it's shifted by. If the current view is empty
+ /// and the original buffer is large enough to fit the requested additional
+ /// capacity, then reallocations will never happen.
///
/// # Examples
///
@@ -562,17 +588,34 @@ impl BytesMut {
// space.
//
// Otherwise, since backed by a vector, use `Vec::reserve`
+ //
+ // We need to make sure that this optimization does not kill the
+ // amortized runtimes of BytesMut's operations.
unsafe {
let (off, prev) = self.get_vec_pos();
// Only reuse space if we can satisfy the requested additional space.
- if self.capacity() - self.len() + off >= additional {
- // There's space - reuse it
+ //
+ // Also check if the value of `off` suggests that enough bytes
+ // have been read to account for the overhead of shifting all
+ // the data (in an amortized analysis).
+ // Hence the condition `off >= self.len()`.
+ //
+ // This condition also already implies that the buffer is going
+ // to be (at least) half-empty in the end; so we do not break
+ // the (amortized) runtime with future resizes of the underlying
+ // `Vec`.
+ //
+ // [For more details check issue #524, and PR #525.]
+ if self.capacity() - self.len() + off >= additional && off >= self.len() {
+ // There's enough space, and it's not too much overhead:
+ // reuse the space!
//
// Just move the pointer back to the start after copying
// data back.
let base_ptr = self.ptr.as_ptr().offset(-(off as isize));
- ptr::copy(self.ptr.as_ptr(), base_ptr, self.len);
+ // Since `off >= self.len()`, the two regions don't overlap.
+ ptr::copy_nonoverlapping(self.ptr.as_ptr(), base_ptr, self.len);
self.ptr = vptr(base_ptr);
self.set_vec_pos(0, prev);
@@ -580,13 +623,14 @@ impl BytesMut {
// can gain capacity back.
self.cap += off;
} else {
- // No space - allocate more
+ // Not enough space, or reusing might be too much overhead:
+ // allocate more space!
let mut v =
ManuallyDrop::new(rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off));
v.reserve(additional);
// Update the info
- self.ptr = vptr(v.as_mut_ptr().offset(off as isize));
+ self.ptr = vptr(v.as_mut_ptr().add(off));
self.len = v.len() - off;
self.cap = v.capacity() - off;
}
@@ -596,7 +640,7 @@ impl BytesMut {
}
debug_assert_eq!(kind, KIND_ARC);
- let shared: *mut Shared = self.data as _;
+ let shared: *mut Shared = self.data;
// Reserving involves abandoning the currently shared buffer and
// allocating a new vector with the requested capacity.
@@ -619,29 +663,65 @@ impl BytesMut {
// sure that the vector has enough capacity.
let v = &mut (*shared).vec;
- if v.capacity() >= new_cap {
- // The capacity is sufficient, reclaim the buffer
- let ptr = v.as_mut_ptr();
+ let v_capacity = v.capacity();
+ let ptr = v.as_mut_ptr();
- ptr::copy(self.ptr.as_ptr(), ptr, len);
+ let offset = offset_from(self.ptr.as_ptr(), ptr);
+
+ // Compare the condition in the `kind == KIND_VEC` case above
+ // for more details.
+ if v_capacity >= new_cap + offset {
+ self.cap = new_cap;
+ // no copy is necessary
+ } else if v_capacity >= new_cap && offset >= len {
+ // The capacity is sufficient, and copying is not too much
+ // overhead: reclaim the buffer!
+
+ // `offset >= len` means: no overlap
+ ptr::copy_nonoverlapping(self.ptr.as_ptr(), ptr, len);
self.ptr = vptr(ptr);
self.cap = v.capacity();
+ } else {
+ // calculate offset
+ let off = (self.ptr.as_ptr() as usize) - (v.as_ptr() as usize);
- return;
- }
+ // new_cap is calculated in terms of `BytesMut`, not the underlying
+ // `Vec`, so it does not take the offset into account.
+ //
+ // Thus we have to manually add it here.
+ new_cap = new_cap.checked_add(off).expect("overflow");
- // The vector capacity is not sufficient. The reserve request is
- // asking for more than the initial buffer capacity. Allocate more
- // than requested if `new_cap` is not much bigger than the current
- // capacity.
- //
- // There are some situations, using `reserve_exact` that the
- // buffer capacity could be below `original_capacity`, so do a
- // check.
- let double = v.capacity().checked_shl(1).unwrap_or(new_cap);
+ // The vector capacity is not sufficient. The reserve request is
+ // asking for more than the initial buffer capacity. Allocate more
+ // than requested if `new_cap` is not much bigger than the current
+ // capacity.
+ //
+ // There are some situations, using `reserve_exact` that the
+ // buffer capacity could be below `original_capacity`, so do a
+ // check.
+ let double = v.capacity().checked_shl(1).unwrap_or(new_cap);
+
+ new_cap = cmp::max(double, new_cap);
- new_cap = cmp::max(cmp::max(double, new_cap), original_capacity);
+ // No space - allocate more
+ //
+ // The length field of `Shared::vec` is not used by the `BytesMut`;
+ // instead we use the `len` field in the `BytesMut` itself. However,
+ // when calling `reserve`, it doesn't guarantee that data stored in
+ // the unused capacity of the vector is copied over to the new
+ // allocation, so we need to ensure that we don't have any data we
+ // care about in the unused capacity before calling `reserve`.
+ debug_assert!(off + len <= v.capacity());
+ v.set_len(off + len);
+ v.reserve(new_cap - v.len());
+
+ // Update the info
+ self.ptr = vptr(v.as_mut_ptr().add(off));
+ self.cap = v.capacity() - off;
+ }
+
+ return;
} else {
new_cap = cmp::max(new_cap, original_capacity);
}
@@ -659,7 +739,7 @@ impl BytesMut {
// Update self
let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC;
- self.data = data as _;
+ self.data = invalid_ptr(data);
self.ptr = vptr(v.as_mut_ptr());
self.len = v.len();
self.cap = v.capacity();
@@ -686,11 +766,11 @@ impl BytesMut {
self.reserve(cnt);
unsafe {
- let dst = self.uninit_slice();
+ let dst = self.spare_capacity_mut();
// Reserved above
debug_assert!(dst.len() >= cnt);
- ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr() as *mut u8, cnt);
+ ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
}
unsafe {
@@ -700,10 +780,11 @@ impl BytesMut {
/// Absorbs a `BytesMut` that was previously split off.
///
- /// If the two `BytesMut` objects were previously contiguous, i.e., if
- /// `other` was created by calling `split_off` on this `BytesMut`, then
- /// this is an `O(1)` operation that just decreases a reference
- /// count and sets a few indices. Otherwise this method degenerates to
+ /// If the two `BytesMut` objects were previously contiguous and not mutated
+ /// in a way that causes re-allocation i.e., if `other` was created by
+ /// calling `split_off` on this `BytesMut`, then this is an `O(1)` operation
+ /// that just decreases a reference count and sets a few indices.
+ /// Otherwise this method degenerates to
/// `self.extend_from_slice(other.as_ref())`.
///
/// # Examples
@@ -754,7 +835,7 @@ impl BytesMut {
ptr,
len,
cap,
- data: data as *mut _,
+ data: invalid_ptr(data),
}
}
@@ -801,7 +882,7 @@ impl BytesMut {
// Updating the start of the view is setting `ptr` to point to the
// new start and updating the `len` field to reflect the new length
// of the view.
- self.ptr = vptr(self.ptr.as_ptr().offset(start as isize));
+ self.ptr = vptr(self.ptr.as_ptr().add(start));
if self.len >= start {
self.len -= start;
@@ -825,7 +906,7 @@ impl BytesMut {
return Ok(());
}
- let ptr = unsafe { self.ptr.as_ptr().offset(self.len as isize) };
+ let ptr = unsafe { self.ptr.as_ptr().add(self.len) };
if ptr == other.ptr.as_ptr()
&& self.kind() == KIND_ARC
&& other.kind() == KIND_ARC
@@ -875,7 +956,7 @@ impl BytesMut {
// always succeed.
debug_assert_eq!(shared as usize & KIND_MASK, KIND_ARC);
- self.data = shared as _;
+ self.data = shared;
}
/// Makes an exact shallow clone of `self`.
@@ -908,16 +989,45 @@ impl BytesMut {
debug_assert_eq!(self.kind(), KIND_VEC);
debug_assert!(pos <= MAX_VEC_POS);
- self.data = ((pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK)) as *mut _;
+ self.data = invalid_ptr((pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK));
}
+ /// Returns the remaining spare capacity of the buffer as a slice of `MaybeUninit<u8>`.
+ ///
+ /// The returned slice can be used to fill the buffer with data (e.g. by
+ /// reading from a file) before marking the data as initialized using the
+ /// [`set_len`] method.
+ ///
+ /// [`set_len`]: BytesMut::set_len
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use bytes::BytesMut;
+ ///
+ /// // Allocate buffer big enough for 10 bytes.
+ /// let mut buf = BytesMut::with_capacity(10);
+ ///
+ /// // Fill in the first 3 elements.
+ /// let uninit = buf.spare_capacity_mut();
+ /// uninit[0].write(0);
+ /// uninit[1].write(1);
+ /// uninit[2].write(2);
+ ///
+ /// // Mark the first 3 bytes of the buffer as being initialized.
+ /// unsafe {
+ /// buf.set_len(3);
+ /// }
+ ///
+ /// assert_eq!(&buf[..], &[0, 1, 2]);
+ /// ```
#[inline]
- fn uninit_slice(&mut self) -> &mut UninitSlice {
+ pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
unsafe {
- let ptr = self.ptr.as_ptr().offset(self.len as isize);
+ let ptr = self.ptr.as_ptr().add(self.len);
let len = self.cap - self.len;
- UninitSlice::from_raw_parts_mut(ptr, len)
+ slice::from_raw_parts_mut(ptr.cast(), len)
}
}
}
@@ -934,7 +1044,7 @@ impl Drop for BytesMut {
let _ = rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off);
}
} else if kind == KIND_ARC {
- unsafe { release_shared(self.data as _) };
+ unsafe { release_shared(self.data) };
}
}
}
@@ -991,7 +1101,7 @@ unsafe impl BufMut for BytesMut {
if self.capacity() == self.len() {
self.reserve(64);
}
- self.uninit_slice()
+ UninitSlice::from_slice(self.spare_capacity_mut())
}
// Specialize these methods so they can skip checking `remaining_mut`
@@ -1016,7 +1126,7 @@ unsafe impl BufMut for BytesMut {
fn put_bytes(&mut self, val: u8, cnt: usize) {
self.reserve(cnt);
unsafe {
- let dst = self.uninit_slice();
+ let dst = self.spare_capacity_mut();
// Reserved above
debug_assert!(dst.len() >= cnt);
@@ -1161,7 +1271,7 @@ impl<'a> IntoIterator for &'a BytesMut {
type IntoIter = core::slice::Iter<'a, u8>;
fn into_iter(self) -> Self::IntoIter {
- self.as_ref().into_iter()
+ self.as_ref().iter()
}
}
@@ -1190,7 +1300,18 @@ impl<'a> Extend<&'a u8> for BytesMut {
where
T: IntoIterator<Item = &'a u8>,
{
- self.extend(iter.into_iter().map(|b| *b))
+ self.extend(iter.into_iter().copied())
+ }
+}
+
+impl Extend<Bytes> for BytesMut {
+ fn extend<T>(&mut self, iter: T)
+ where
+ T: IntoIterator<Item = Bytes>,
+ {
+ for bytes in iter {
+ self.extend_from_slice(&bytes)
+ }
}
}
@@ -1202,7 +1323,7 @@ impl FromIterator<u8> for BytesMut {
impl<'a> FromIterator<&'a u8> for BytesMut {
fn from_iter<T: IntoIterator<Item = &'a u8>>(into_iter: T) -> Self {
- BytesMut::from_iter(into_iter.into_iter().map(|b| *b))
+ BytesMut::from_iter(into_iter.into_iter().copied())
}
}
@@ -1243,10 +1364,13 @@ unsafe fn release_shared(ptr: *mut Shared) {
// > "acquire" operation before deleting the object.
//
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
- atomic::fence(Ordering::Acquire);
+ //
+ // Thread sanitizer does not support atomic fences. Use an atomic load
+ // instead.
+ (*ptr).ref_count.load(Ordering::Acquire);
// Drop the data
- Box::from_raw(ptr);
+ drop(Box::from_raw(ptr));
}
impl Shared {
@@ -1392,7 +1516,7 @@ impl PartialOrd<BytesMut> for str {
impl PartialEq<Vec<u8>> for BytesMut {
fn eq(&self, other: &Vec<u8>) -> bool {
- *self == &other[..]
+ *self == other[..]
}
}
@@ -1416,7 +1540,7 @@ impl PartialOrd<BytesMut> for Vec<u8> {
impl PartialEq<String> for BytesMut {
fn eq(&self, other: &String) -> bool {
- *self == &other[..]
+ *self == other[..]
}
}
@@ -1482,13 +1606,51 @@ impl PartialOrd<BytesMut> for &str {
impl PartialEq<BytesMut> for Bytes {
fn eq(&self, other: &BytesMut) -> bool {
- &other[..] == &self[..]
+ other[..] == self[..]
}
}
impl PartialEq<Bytes> for BytesMut {
fn eq(&self, other: &Bytes) -> bool {
- &other[..] == &self[..]
+ other[..] == self[..]
+ }
+}
+
+impl From<BytesMut> for Vec<u8> {
+ fn from(mut bytes: BytesMut) -> Self {
+ let kind = bytes.kind();
+
+ let mut vec = if kind == KIND_VEC {
+ unsafe {
+ let (off, _) = bytes.get_vec_pos();
+ rebuild_vec(bytes.ptr.as_ptr(), bytes.len, bytes.cap, off)
+ }
+ } else if kind == KIND_ARC {
+ let shared = bytes.data as *mut Shared;
+
+ if unsafe { (*shared).is_unique() } {
+ let vec = mem::replace(unsafe { &mut (*shared).vec }, Vec::new());
+
+ unsafe { release_shared(shared) };
+
+ vec
+ } else {
+ return bytes.deref().to_vec();
+ }
+ } else {
+ return bytes.deref().to_vec();
+ };
+
+ let len = bytes.len;
+
+ unsafe {
+ ptr::copy(bytes.ptr.as_ptr(), vec.as_mut_ptr(), len);
+ vec.set_len(len);
+ }
+
+ mem::forget(bytes);
+
+ vec
}
}
@@ -1501,6 +1663,35 @@ fn vptr(ptr: *mut u8) -> NonNull<u8> {
}
}
+/// Returns a dangling pointer with the given address. This is used to store
+/// integer data in pointer fields.
+///
+/// It is equivalent to `addr as *mut T`, but this fails on miri when strict
+/// provenance checking is enabled.
+#[inline]
+fn invalid_ptr<T>(addr: usize) -> *mut T {
+ let ptr = core::ptr::null_mut::<u8>().wrapping_add(addr);
+ debug_assert_eq!(ptr as usize, addr);
+ ptr.cast::<T>()
+}
+
+/// Precondition: dst >= original
+///
+/// The following line is equivalent to:
+///
+/// ```rust,ignore
+/// self.ptr.as_ptr().offset_from(ptr) as usize;
+/// ```
+///
+/// But due to min rust is 1.39 and it is only stablised
+/// in 1.47, we cannot use it.
+#[inline]
+fn offset_from(dst: *mut u8, original: *mut u8) -> usize {
+ debug_assert!(dst >= original);
+
+ dst as usize - original as usize
+}
+
unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) -> Vec<u8> {
let ptr = ptr.offset(-(off as isize));
len += off;
@@ -1513,6 +1704,7 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize)
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_v_clone,
+ to_vec: shared_v_to_vec,
drop: shared_v_drop,
};
@@ -1520,10 +1712,32 @@ unsafe fn shared_v_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> By
let shared = data.load(Ordering::Relaxed) as *mut Shared;
increment_shared(shared);
- let data = AtomicPtr::new(shared as _);
+ let data = AtomicPtr::new(shared as *mut ());
Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE)
}
+unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
+ let shared: *mut Shared = data.load(Ordering::Relaxed).cast();
+
+ if (*shared).is_unique() {
+ let shared = &mut *shared;
+
+ // Drop shared
+ let mut vec = mem::replace(&mut shared.vec, Vec::new());
+ release_shared(shared);
+
+ // Copy back buffer
+ ptr::copy(ptr, vec.as_mut_ptr(), len);
+ vec.set_len(len);
+
+ vec
+ } else {
+ let v = slice::from_raw_parts(ptr, len).to_vec();
+ release_shared(shared);
+ v
+ }
+}
+
unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
data.with_mut(|shared| {
release_shared(*shared as *mut Shared);