summaryrefslogtreecommitdiff
path: root/src/frame/compress.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/frame/compress.rs')
-rw-r--r--src/frame/compress.rs471
1 files changed, 471 insertions, 0 deletions
diff --git a/src/frame/compress.rs b/src/frame/compress.rs
new file mode 100644
index 0000000..cee32eb
--- /dev/null
+++ b/src/frame/compress.rs
@@ -0,0 +1,471 @@
+use std::{
+ fmt,
+ hash::Hasher,
+ io::{self, Write},
+};
+use twox_hash::XxHash32;
+
+use crate::{
+ block::{
+ compress::compress_internal,
+ hashtable::{HashTable, HashTable4K},
+ },
+ sink::vec_sink_for_compression,
+};
+
+use super::Error;
+use super::{
+ header::{BlockInfo, BlockMode, FrameInfo, BLOCK_INFO_SIZE, MAX_FRAME_INFO_SIZE},
+ BlockSize,
+};
+use crate::block::WINDOW_SIZE;
+
+/// A writer for compressing a LZ4 stream.
+///
+/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
+/// Bytes written to this writer are compressed using the [LZ4 frame
+/// format](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).
+///
+/// Writes are buffered automatically, so there's no need to wrap the given
+/// writer in a `std::io::BufWriter`.
+///
+/// To ensure a well formed stream the encoder must be finalized by calling
+/// either the [`finish()`], [`try_finish()`], or [`auto_finish()`] methods.
+///
+/// [`finish()`]: Self::finish
+/// [`try_finish()`]: Self::try_finish
+/// [`auto_finish()`]: Self::auto_finish
+///
+/// # Example 1
+/// Serializing json values into a compressed file.
+///
+/// ```no_run
+/// let compressed_file = std::fs::File::create("datafile").unwrap();
+/// let mut compressor = lz4_flex::frame::FrameEncoder::new(compressed_file);
+/// serde_json::to_writer(&mut compressor, &serde_json::json!({ "an": "object" })).unwrap();
+/// compressor.finish().unwrap();
+/// ```
+///
+/// # Example 2
+/// Serializing multiple json values into a compressed file using linked blocks.
+///
+/// ```no_run
+/// let compressed_file = std::fs::File::create("datafile").unwrap();
+/// let mut frame_info = lz4_flex::frame::FrameInfo::new();
+/// frame_info.block_mode = lz4_flex::frame::BlockMode::Linked;
+/// let mut compressor = lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, compressed_file);
+/// for i in 0..10u64 {
+/// serde_json::to_writer(&mut compressor, &serde_json::json!({ "i": i })).unwrap();
+/// }
+/// compressor.finish().unwrap();
+/// ```
+pub struct FrameEncoder<W: io::Write> {
+ /// Our buffer of uncompressed bytes.
+ src: Vec<u8>,
+ /// Index into src: starting point of bytes not yet compressed
+ src_start: usize,
+ /// Index into src: end point of bytes not not yet compressed
+ src_end: usize,
+ /// Index into src: starting point of external dictionary (applicable in Linked block mode)
+ ext_dict_offset: usize,
+ /// Length of external dictionary
+ ext_dict_len: usize,
+ /// Counter of bytes already compressed to the compression_table
+ /// _Not_ the same as `content_len` as this is reset every to 2GB.
+ src_stream_offset: usize,
+ /// Encoder table
+ compression_table: HashTable4K,
+ /// The underlying writer.
+ w: W,
+ /// Xxhash32 used when content checksum is enabled.
+ content_hasher: XxHash32,
+ /// Number of bytes compressed
+ content_len: u64,
+ /// The compressed bytes buffer. Bytes are compressed from src (usually)
+ /// to dst before being written to w.
+ dst: Vec<u8>,
+ /// Whether we have an open frame in the output.
+ is_frame_open: bool,
+ /// Whether we have an frame closed in the output.
+ data_to_frame_written: bool,
+ /// The frame information to be used in this encoder.
+ frame_info: FrameInfo,
+}
+
+impl<W: io::Write> FrameEncoder<W> {
+ fn init(&mut self) {
+ let max_block_size = self.frame_info.block_size.get_size();
+ let src_size = if self.frame_info.block_mode == BlockMode::Linked {
+ // In linked mode we consume the input (bumping src_start) but leave the
+ // beginning of src to be used as a prefix in subsequent blocks.
+ // That is at least until we have at least `max_block_size + WINDOW_SIZE`
+ // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
+ // and the input goes to the beginning of src again.
+ // Since we always want to be able to write a full block (up to max_block_size)
+ // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes.
+ max_block_size * 2 + WINDOW_SIZE
+ } else {
+ max_block_size
+ };
+ // Since this method is called potentially multiple times, don't reserve _additional_
+ // capacity if not required.
+ self.src
+ .reserve(src_size.saturating_sub(self.src.capacity()));
+ self.dst.reserve(
+ crate::block::compress::get_maximum_output_size(max_block_size)
+ .saturating_sub(self.dst.capacity()),
+ );
+ }
+
+ /// Returns a wrapper around `self` that will finish the stream on drop.
+ ///
+ /// # Note
+ /// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
+ /// [`try_finish()`] instead.
+ ///
+ /// [`finish()`]: Self::finish
+ /// [`try_finish()`]: Self::try_finish
+ pub fn auto_finish(self) -> AutoFinishEncoder<W> {
+ AutoFinishEncoder {
+ encoder: Some(self),
+ }
+ }
+
+ /// Creates a new Encoder with the specified FrameInfo.
+ pub fn with_frame_info(frame_info: FrameInfo, wtr: W) -> Self {
+ FrameEncoder {
+ src: Vec::new(),
+ w: wtr,
+ // 16 KB hash table for matches, same as the reference implementation.
+ compression_table: HashTable4K::new(),
+ content_hasher: XxHash32::with_seed(0),
+ content_len: 0,
+ dst: Vec::new(),
+ is_frame_open: false,
+ data_to_frame_written: false,
+ frame_info,
+ src_start: 0,
+ src_end: 0,
+ ext_dict_offset: 0,
+ ext_dict_len: 0,
+ src_stream_offset: 0,
+ }
+ }
+
+ /// Creates a new Encoder with the default settings.
+ pub fn new(wtr: W) -> Self {
+ Self::with_frame_info(Default::default(), wtr)
+ }
+
+ /// The frame information used by this Encoder.
+ pub fn frame_info(&mut self) -> &FrameInfo {
+ &self.frame_info
+ }
+
+ /// Consumes this encoder, flushing internal buffer and writing stream terminator.
+ pub fn finish(mut self) -> Result<W, Error> {
+ self.try_finish()?;
+ Ok(self.w)
+ }
+
+ /// Attempt to finish this output stream, flushing internal buffer and writing stream
+ /// terminator.
+ pub fn try_finish(&mut self) -> Result<(), Error> {
+ match self.flush() {
+ Ok(()) => {
+ // Empty input special case
+ // https://github.com/ouch-org/ouch/pull/163#discussion_r1108965151
+ if !self.is_frame_open && !self.data_to_frame_written {
+ self.begin_frame(0)?;
+ }
+ self.end_frame()?;
+ self.data_to_frame_written = true;
+ Ok(())
+ }
+ Err(err) => Err(err.into()),
+ }
+ }
+
+ /// Returns the underlying writer _without_ flushing the stream.
+ /// This may leave the output in an unfinished state.
+ pub fn into_inner(self) -> W {
+ self.w
+ }
+
+ /// Gets a reference to the underlying writer in this encoder.
+ pub fn get_ref(&self) -> &W {
+ &self.w
+ }
+
+ /// Gets a reference to the underlying writer in this encoder.
+ ///
+ /// Note that mutating the output/input state of the stream may corrupt
+ /// this encoder, so care must be taken when using this method.
+ pub fn get_mut(&mut self) -> &mut W {
+ &mut self.w
+ }
+
+ /// Closes the frame by writing the end marker.
+ fn end_frame(&mut self) -> Result<(), Error> {
+ debug_assert!(self.is_frame_open);
+ self.is_frame_open = false;
+ if let Some(expected) = self.frame_info.content_size {
+ if expected != self.content_len {
+ return Err(Error::ContentLengthError {
+ expected,
+ actual: self.content_len,
+ });
+ }
+ }
+
+ let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
+ BlockInfo::EndMark.write(&mut block_info_buffer[..])?;
+ self.w.write_all(&block_info_buffer[..])?;
+ if self.frame_info.content_checksum {
+ let content_checksum = self.content_hasher.finish() as u32;
+ self.w.write_all(&content_checksum.to_le_bytes())?;
+ }
+
+ Ok(())
+ }
+
+ /// Begin the frame by writing the frame header.
+ /// It'll also setup the encoder for compressing blocks for the the new frame.
+ fn begin_frame(&mut self, buf_len: usize) -> io::Result<()> {
+ self.is_frame_open = true;
+ if self.frame_info.block_size == BlockSize::Auto {
+ self.frame_info.block_size = BlockSize::from_buf_length(buf_len);
+ }
+ self.init();
+ let mut frame_info_buffer = [0u8; MAX_FRAME_INFO_SIZE];
+ let size = self.frame_info.write(&mut frame_info_buffer)?;
+ self.w.write_all(&frame_info_buffer[..size])?;
+
+ if self.content_len != 0 {
+ // This is the second or later frame for this Encoder,
+ // reset compressor state for the new frame.
+ self.content_len = 0;
+ self.src_stream_offset = 0;
+ self.src.clear();
+ self.src_start = 0;
+ self.src_end = 0;
+ self.ext_dict_len = 0;
+ self.content_hasher = XxHash32::with_seed(0);
+ self.compression_table.clear();
+ }
+ Ok(())
+ }
+
+ /// Consumes the src contents between src_start and src_end,
+ /// which shouldn't exceed the max block size.
+ fn write_block(&mut self) -> io::Result<()> {
+ debug_assert!(self.is_frame_open);
+ let max_block_size = self.frame_info.block_size.get_size();
+ debug_assert!(self.src_end - self.src_start <= max_block_size);
+
+ // Reposition the compression table if we're anywhere near an overflowing hazard
+ if self.src_stream_offset + max_block_size + WINDOW_SIZE >= u32::MAX as usize / 2 {
+ self.compression_table
+ .reposition((self.src_stream_offset - self.ext_dict_len) as _);
+ self.src_stream_offset = self.ext_dict_len;
+ }
+
+ // input to the compressor, which may include a prefix when blocks are linked
+ let input = &self.src[..self.src_end];
+ // the contents of the block are between src_start and src_end
+ let src = &input[self.src_start..];
+
+ let dst_required_size = crate::block::compress::get_maximum_output_size(src.len());
+
+ let compress_result = if self.ext_dict_len != 0 {
+ debug_assert_eq!(self.frame_info.block_mode, BlockMode::Linked);
+ compress_internal::<_, true, _>(
+ input,
+ self.src_start,
+ &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
+ &mut self.compression_table,
+ &self.src[self.ext_dict_offset..self.ext_dict_offset + self.ext_dict_len],
+ self.src_stream_offset,
+ )
+ } else {
+ compress_internal::<_, false, _>(
+ input,
+ self.src_start,
+ &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
+ &mut self.compression_table,
+ b"",
+ self.src_stream_offset,
+ )
+ };
+
+ let (block_info, block_data) = match compress_result.map_err(Error::CompressionError)? {
+ comp_len if comp_len < src.len() => {
+ (BlockInfo::Compressed(comp_len as _), &self.dst[..comp_len])
+ }
+ _ => (BlockInfo::Uncompressed(src.len() as _), src),
+ };
+
+ // Write the (un)compressed block to the writer and the block checksum (if applicable).
+ let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
+ block_info.write(&mut block_info_buffer[..])?;
+ self.w.write_all(&block_info_buffer[..])?;
+ self.w.write_all(block_data)?;
+ if self.frame_info.block_checksums {
+ let mut block_hasher = XxHash32::with_seed(0);
+ block_hasher.write(block_data);
+ let block_checksum = block_hasher.finish() as u32;
+ self.w.write_all(&block_checksum.to_le_bytes())?;
+ }
+
+ // Content checksum, if applicable
+ if self.frame_info.content_checksum {
+ self.content_hasher.write(src);
+ }
+
+ // Buffer and offsets maintenance
+ self.content_len += src.len() as u64;
+ self.src_start += src.len();
+ debug_assert_eq!(self.src_start, self.src_end);
+ if self.frame_info.block_mode == BlockMode::Linked {
+ // In linked mode we consume the input (bumping src_start) but leave the
+ // beginning of src to be used as a prefix in subsequent blocks.
+ // That is at least until we have at least `max_block_size + WINDOW_SIZE`
+ // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
+ // and the input goes to the beginning of src again.
+ debug_assert_eq!(self.src.capacity(), max_block_size * 2 + WINDOW_SIZE);
+ if self.src_start >= max_block_size + WINDOW_SIZE {
+ // The ext_dict will become the last WINDOW_SIZE bytes
+ self.ext_dict_offset = self.src_end - WINDOW_SIZE;
+ self.ext_dict_len = WINDOW_SIZE;
+ // Input goes in the beginning of the buffer again.
+ self.src_stream_offset += self.src_end;
+ self.src_start = 0;
+ self.src_end = 0;
+ } else if self.src_start + self.ext_dict_len > WINDOW_SIZE {
+ // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict.
+ // Since we have a limited buffer we must shrink ext_dict in favor of the prefix,
+ // so that we can fit up to max_block_size bytes between dst_start and ext_dict
+ // start.
+ let delta = self
+ .ext_dict_len
+ .min(self.src_start + self.ext_dict_len - WINDOW_SIZE);
+ self.ext_dict_offset += delta;
+ self.ext_dict_len -= delta;
+ debug_assert!(self.src_start + self.ext_dict_len >= WINDOW_SIZE)
+ }
+ debug_assert!(
+ self.ext_dict_len == 0 || self.src_start + max_block_size <= self.ext_dict_offset
+ );
+ } else {
+ // In independent block mode we consume the entire src buffer
+ // which is sized equal to the frame max_block_size.
+ debug_assert_eq!(self.ext_dict_len, 0);
+ debug_assert_eq!(self.src.capacity(), max_block_size);
+ self.src_start = 0;
+ self.src_end = 0;
+ // Advance stream offset so we don't have to reset the match dict
+ // for the next block.
+ self.src_stream_offset += src.len();
+ }
+ debug_assert!(self.src_start <= self.src_end);
+ debug_assert!(self.src_start + max_block_size <= self.src.capacity());
+ Ok(())
+ }
+}
+
+impl<W: io::Write> io::Write for FrameEncoder<W> {
+ fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
+ if !self.is_frame_open && !buf.is_empty() {
+ self.begin_frame(buf.len())?;
+ }
+ let buf_len = buf.len();
+ while !buf.is_empty() {
+ let src_filled = self.src_end - self.src_start;
+ let max_fill_len = self.frame_info.block_size.get_size() - src_filled;
+ if max_fill_len == 0 {
+ // make space by writing next block
+ self.write_block()?;
+ debug_assert_eq!(self.src_end, self.src_start);
+ continue;
+ }
+
+ let fill_len = max_fill_len.min(buf.len());
+ vec_copy_overwriting(&mut self.src, self.src_end, &buf[..fill_len]);
+ buf = &buf[fill_len..];
+ self.src_end += fill_len;
+ }
+ Ok(buf_len)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ if self.src_start != self.src_end {
+ self.write_block()?;
+ }
+ Ok(())
+ }
+}
+
+/// A wrapper around an [`FrameEncoder<W>`] that finishes the stream on drop.
+///
+/// This can be created by the [`auto_finish()`] method on the [`FrameEncoder<W>`].
+///
+/// # Note
+/// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
+/// [`try_finish()`] instead.
+///
+/// [`finish()`]: FrameEncoder::finish
+/// [`try_finish()`]: FrameEncoder::try_finish
+/// [`auto_finish()`]: FrameEncoder::auto_finish
+pub struct AutoFinishEncoder<W: Write> {
+ // We wrap this in an option to take it during drop.
+ encoder: Option<FrameEncoder<W>>,
+}
+
+impl<W: io::Write> Drop for AutoFinishEncoder<W> {
+ fn drop(&mut self) {
+ if let Some(mut encoder) = self.encoder.take() {
+ let _ = encoder.try_finish();
+ }
+ }
+}
+
+impl<W: Write> Write for AutoFinishEncoder<W> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.encoder.as_mut().unwrap().write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.encoder.as_mut().unwrap().flush()
+ }
+}
+
+impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FrameEncoder")
+ .field("w", &self.w)
+ .field("frame_info", &self.frame_info)
+ .field("is_frame_open", &self.is_frame_open)
+ .field("content_hasher", &self.content_hasher)
+ .field("content_len", &self.content_len)
+ .field("dst", &"[...]")
+ .field("src", &"[...]")
+ .field("src_start", &self.src_start)
+ .field("src_end", &self.src_end)
+ .field("ext_dict_offset", &self.ext_dict_offset)
+ .field("ext_dict_len", &self.ext_dict_len)
+ .field("src_stream_offset", &self.src_stream_offset)
+ .finish()
+ }
+}
+
+/// Copy `src` into `target` starting from the `start` index, overwriting existing data if any.
+#[inline]
+fn vec_copy_overwriting(target: &mut Vec<u8>, target_start: usize, src: &[u8]) {
+ debug_assert!(target_start + src.len() <= target.capacity());
+
+ // By combining overwriting (copy_from_slice) and extending (extend_from_slice)
+ // we can fill the ring buffer without initializing it (eg. filling with 0).
+ let overwrite_len = (target.len() - target_start).min(src.len());
+ target[target_start..target_start + overwrite_len].copy_from_slice(&src[..overwrite_len]);
+ target.extend_from_slice(&src[overwrite_len..]);
+}