aboutsummaryrefslogtreecommitdiff
path: root/src/bytes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bytes.rs')
-rw-r--r--src/bytes.rs106
1 files changed, 68 insertions, 38 deletions
diff --git a/src/bytes.rs b/src/bytes.rs
index 79a09f3..b1b35ea 100644
--- a/src/bytes.rs
+++ b/src/bytes.rs
@@ -10,16 +10,23 @@ use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use crate::Buf;
-/// A reference counted contiguous slice of memory.
+/// A cheaply cloneable and sliceable chunk of contiguous memory.
///
/// `Bytes` is an efficient container for storing and operating on contiguous
/// slices of memory. It is intended for use primarily in networking code, but
/// could have applications elsewhere as well.
///
/// `Bytes` values facilitate zero-copy network programming by allowing multiple
-/// `Bytes` objects to point to the same underlying memory. This is managed by
-/// using a reference count to track when the memory is no longer needed and can
-/// be freed.
+/// `Bytes` objects to point to the same underlying memory.
+///
+/// `Bytes` does not have a single implementation. It is an interface, whose
+/// exact behavior is implemented through dynamic dispatch in several underlying
+/// implementations of `Bytes`.
+///
+/// All `Bytes` implementations must fulfill the following requirements:
+/// - They are cheaply cloneable and thereby shareable between an unlimited amount
+/// of components, for example by modifying a reference count.
+/// - Instances can be sliced to refer to a subset of the the original buffer.
///
/// ```
/// use bytes::Bytes;
@@ -41,17 +48,33 @@ use crate::Buf;
/// to track information about which segment of the underlying memory the
/// `Bytes` handle has access to.
///
-/// `Bytes` keeps both a pointer to the shared `Arc` containing the full memory
+/// `Bytes` keeps both a pointer to the shared state containing the full memory
/// slice and a pointer to the start of the region visible by the handle.
/// `Bytes` also tracks the length of its view into the memory.
///
/// # Sharing
///
-/// The memory itself is reference counted, and multiple `Bytes` objects may
-/// point to the same region. Each `Bytes` handle point to different sections within
-/// the memory region, and `Bytes` handle may or may not have overlapping views
+/// `Bytes` contains a vtable, which allows implementations of `Bytes` to define
+/// how sharing/cloneing is implemented in detail.
+/// When `Bytes::clone()` is called, `Bytes` will call the vtable function for
+/// cloning the backing storage in order to share it behind between multiple
+/// `Bytes` instances.
+///
+/// For `Bytes` implementations which refer to constant memory (e.g. created
+/// via `Bytes::from_static()`) the cloning implementation will be a no-op.
+///
+/// For `Bytes` implementations which point to a reference counted shared storage
+/// (e.g. an `Arc<[u8]>`), sharing will be implemented by increasing the
+/// the reference count.
+///
+/// Due to this mechanism, multiple `Bytes` instances may point to the same
+/// shared memory region.
+/// Each `Bytes` instance can point to different sections within that
+/// memory region, and `Bytes` instances may or may not have overlapping views
/// into the memory.
///
+/// The following diagram visualizes a scenario where 2 `Bytes` instances make
+/// use of an `Arc`-based backing storage, and provide access to different views:
///
/// ```text
///
@@ -175,7 +198,7 @@ impl Bytes {
self.len == 0
}
- ///Creates `Bytes` instance from slice, by copying it.
+ /// Creates `Bytes` instance from slice, by copying it.
pub fn copy_from_slice(data: &[u8]) -> Self {
data.to_vec().into()
}
@@ -214,7 +237,7 @@ impl Bytes {
};
let end = match range.end_bound() {
- Bound::Included(&n) => n + 1,
+ Bound::Included(&n) => n.checked_add(1).expect("out of range"),
Bound::Excluded(&n) => n,
Bound::Unbounded => len,
};
@@ -507,7 +530,7 @@ impl Buf for Bytes {
}
#[inline]
- fn bytes(&self) -> &[u8] {
+ fn chunk(&self) -> &[u8] {
self.as_slice()
}
@@ -525,8 +548,14 @@ impl Buf for Bytes {
}
}
- fn to_bytes(&mut self) -> crate::Bytes {
- core::mem::replace(self, Bytes::new())
+ fn copy_to_bytes(&mut self, len: usize) -> crate::Bytes {
+ if len == self.remaining() {
+ core::mem::replace(self, Bytes::new())
+ } else {
+ let ret = self.slice(..len);
+ self.advance(len);
+ ret
+ }
}
}
@@ -777,8 +806,7 @@ impl From<Vec<u8>> for Bytes {
let slice = vec.into_boxed_slice();
let len = slice.len();
- let ptr = slice.as_ptr();
- drop(Box::into_raw(slice));
+ let ptr = Box::into_raw(slice) as *mut u8;
if ptr as usize & 0x1 == 0 {
let data = ptr as usize | KIND_VEC;
@@ -994,33 +1022,35 @@ unsafe fn shallow_clone_vec(
// `Release` is used synchronize with other threads that
// will load the `arc` field.
//
- // If the `compare_and_swap` fails, then the thread lost the
+ // If the `compare_exchange` fails, then the thread lost the
// race to promote the buffer to shared. The `Acquire`
- // ordering will synchronize with the `compare_and_swap`
+ // ordering will synchronize with the `compare_exchange`
// that happened in the other thread and the `Shared`
// pointed to by `actual` will be visible.
- let actual = atom.compare_and_swap(ptr as _, shared as _, Ordering::AcqRel);
-
- if actual as usize == ptr as usize {
- // The upgrade was successful, the new handle can be
- // returned.
- return Bytes {
- ptr: offset,
- len,
- data: AtomicPtr::new(shared as _),
- vtable: &SHARED_VTABLE,
- };
+ match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) {
+ Ok(actual) => {
+ debug_assert!(actual as usize == ptr as usize);
+ // The upgrade was successful, the new handle can be
+ // returned.
+ Bytes {
+ ptr: offset,
+ len,
+ data: AtomicPtr::new(shared as _),
+ vtable: &SHARED_VTABLE,
+ }
+ }
+ Err(actual) => {
+ // The upgrade failed, a concurrent clone happened. Release
+ // the allocation that was made in this thread, it will not
+ // be needed.
+ let shared = Box::from_raw(shared);
+ mem::forget(*shared);
+
+ // Buffer already promoted to shared storage, so increment ref
+ // count.
+ shallow_clone_arc(actual as _, offset, len)
+ }
}
-
- // The upgrade failed, a concurrent clone happened. Release
- // the allocation that was made in this thread, it will not
- // be needed.
- let shared = Box::from_raw(shared);
- mem::forget(*shared);
-
- // Buffer already promoted to shared storage, so increment ref
- // count.
- shallow_clone_arc(actual as _, offset, len)
}
unsafe fn release_shared(ptr: *mut Shared) {