diff options
Diffstat (limited to 'src/frame/compress.rs')
-rw-r--r-- | src/frame/compress.rs | 471 |
1 files changed, 471 insertions, 0 deletions
diff --git a/src/frame/compress.rs b/src/frame/compress.rs new file mode 100644 index 0000000..cee32eb --- /dev/null +++ b/src/frame/compress.rs @@ -0,0 +1,471 @@ +use std::{ + fmt, + hash::Hasher, + io::{self, Write}, +}; +use twox_hash::XxHash32; + +use crate::{ + block::{ + compress::compress_internal, + hashtable::{HashTable, HashTable4K}, + }, + sink::vec_sink_for_compression, +}; + +use super::Error; +use super::{ + header::{BlockInfo, BlockMode, FrameInfo, BLOCK_INFO_SIZE, MAX_FRAME_INFO_SIZE}, + BlockSize, +}; +use crate::block::WINDOW_SIZE; + +/// A writer for compressing a LZ4 stream. +/// +/// This `FrameEncoder` wraps any other writer that implements `io::Write`. +/// Bytes written to this writer are compressed using the [LZ4 frame +/// format](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md). +/// +/// Writes are buffered automatically, so there's no need to wrap the given +/// writer in a `std::io::BufWriter`. +/// +/// To ensure a well formed stream the encoder must be finalized by calling +/// either the [`finish()`], [`try_finish()`], or [`auto_finish()`] methods. +/// +/// [`finish()`]: Self::finish +/// [`try_finish()`]: Self::try_finish +/// [`auto_finish()`]: Self::auto_finish +/// +/// # Example 1 +/// Serializing json values into a compressed file. +/// +/// ```no_run +/// let compressed_file = std::fs::File::create("datafile").unwrap(); +/// let mut compressor = lz4_flex::frame::FrameEncoder::new(compressed_file); +/// serde_json::to_writer(&mut compressor, &serde_json::json!({ "an": "object" })).unwrap(); +/// compressor.finish().unwrap(); +/// ``` +/// +/// # Example 2 +/// Serializing multiple json values into a compressed file using linked blocks. +/// +/// ```no_run +/// let compressed_file = std::fs::File::create("datafile").unwrap(); +/// let mut frame_info = lz4_flex::frame::FrameInfo::new(); +/// frame_info.block_mode = lz4_flex::frame::BlockMode::Linked; +/// let mut compressor = lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, compressed_file); +/// for i in 0..10u64 { +/// serde_json::to_writer(&mut compressor, &serde_json::json!({ "i": i })).unwrap(); +/// } +/// compressor.finish().unwrap(); +/// ``` +pub struct FrameEncoder<W: io::Write> { + /// Our buffer of uncompressed bytes. + src: Vec<u8>, + /// Index into src: starting point of bytes not yet compressed + src_start: usize, + /// Index into src: end point of bytes not not yet compressed + src_end: usize, + /// Index into src: starting point of external dictionary (applicable in Linked block mode) + ext_dict_offset: usize, + /// Length of external dictionary + ext_dict_len: usize, + /// Counter of bytes already compressed to the compression_table + /// _Not_ the same as `content_len` as this is reset every to 2GB. + src_stream_offset: usize, + /// Encoder table + compression_table: HashTable4K, + /// The underlying writer. + w: W, + /// Xxhash32 used when content checksum is enabled. + content_hasher: XxHash32, + /// Number of bytes compressed + content_len: u64, + /// The compressed bytes buffer. Bytes are compressed from src (usually) + /// to dst before being written to w. + dst: Vec<u8>, + /// Whether we have an open frame in the output. + is_frame_open: bool, + /// Whether we have an frame closed in the output. + data_to_frame_written: bool, + /// The frame information to be used in this encoder. + frame_info: FrameInfo, +} + +impl<W: io::Write> FrameEncoder<W> { + fn init(&mut self) { + let max_block_size = self.frame_info.block_size.get_size(); + let src_size = if self.frame_info.block_mode == BlockMode::Linked { + // In linked mode we consume the input (bumping src_start) but leave the + // beginning of src to be used as a prefix in subsequent blocks. + // That is at least until we have at least `max_block_size + WINDOW_SIZE` + // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes + // and the input goes to the beginning of src again. + // Since we always want to be able to write a full block (up to max_block_size) + // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes. + max_block_size * 2 + WINDOW_SIZE + } else { + max_block_size + }; + // Since this method is called potentially multiple times, don't reserve _additional_ + // capacity if not required. + self.src + .reserve(src_size.saturating_sub(self.src.capacity())); + self.dst.reserve( + crate::block::compress::get_maximum_output_size(max_block_size) + .saturating_sub(self.dst.capacity()), + ); + } + + /// Returns a wrapper around `self` that will finish the stream on drop. + /// + /// # Note + /// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or + /// [`try_finish()`] instead. + /// + /// [`finish()`]: Self::finish + /// [`try_finish()`]: Self::try_finish + pub fn auto_finish(self) -> AutoFinishEncoder<W> { + AutoFinishEncoder { + encoder: Some(self), + } + } + + /// Creates a new Encoder with the specified FrameInfo. + pub fn with_frame_info(frame_info: FrameInfo, wtr: W) -> Self { + FrameEncoder { + src: Vec::new(), + w: wtr, + // 16 KB hash table for matches, same as the reference implementation. + compression_table: HashTable4K::new(), + content_hasher: XxHash32::with_seed(0), + content_len: 0, + dst: Vec::new(), + is_frame_open: false, + data_to_frame_written: false, + frame_info, + src_start: 0, + src_end: 0, + ext_dict_offset: 0, + ext_dict_len: 0, + src_stream_offset: 0, + } + } + + /// Creates a new Encoder with the default settings. + pub fn new(wtr: W) -> Self { + Self::with_frame_info(Default::default(), wtr) + } + + /// The frame information used by this Encoder. + pub fn frame_info(&mut self) -> &FrameInfo { + &self.frame_info + } + + /// Consumes this encoder, flushing internal buffer and writing stream terminator. + pub fn finish(mut self) -> Result<W, Error> { + self.try_finish()?; + Ok(self.w) + } + + /// Attempt to finish this output stream, flushing internal buffer and writing stream + /// terminator. + pub fn try_finish(&mut self) -> Result<(), Error> { + match self.flush() { + Ok(()) => { + // Empty input special case + // https://github.com/ouch-org/ouch/pull/163#discussion_r1108965151 + if !self.is_frame_open && !self.data_to_frame_written { + self.begin_frame(0)?; + } + self.end_frame()?; + self.data_to_frame_written = true; + Ok(()) + } + Err(err) => Err(err.into()), + } + } + + /// Returns the underlying writer _without_ flushing the stream. + /// This may leave the output in an unfinished state. + pub fn into_inner(self) -> W { + self.w + } + + /// Gets a reference to the underlying writer in this encoder. + pub fn get_ref(&self) -> &W { + &self.w + } + + /// Gets a reference to the underlying writer in this encoder. + /// + /// Note that mutating the output/input state of the stream may corrupt + /// this encoder, so care must be taken when using this method. + pub fn get_mut(&mut self) -> &mut W { + &mut self.w + } + + /// Closes the frame by writing the end marker. + fn end_frame(&mut self) -> Result<(), Error> { + debug_assert!(self.is_frame_open); + self.is_frame_open = false; + if let Some(expected) = self.frame_info.content_size { + if expected != self.content_len { + return Err(Error::ContentLengthError { + expected, + actual: self.content_len, + }); + } + } + + let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE]; + BlockInfo::EndMark.write(&mut block_info_buffer[..])?; + self.w.write_all(&block_info_buffer[..])?; + if self.frame_info.content_checksum { + let content_checksum = self.content_hasher.finish() as u32; + self.w.write_all(&content_checksum.to_le_bytes())?; + } + + Ok(()) + } + + /// Begin the frame by writing the frame header. + /// It'll also setup the encoder for compressing blocks for the the new frame. + fn begin_frame(&mut self, buf_len: usize) -> io::Result<()> { + self.is_frame_open = true; + if self.frame_info.block_size == BlockSize::Auto { + self.frame_info.block_size = BlockSize::from_buf_length(buf_len); + } + self.init(); + let mut frame_info_buffer = [0u8; MAX_FRAME_INFO_SIZE]; + let size = self.frame_info.write(&mut frame_info_buffer)?; + self.w.write_all(&frame_info_buffer[..size])?; + + if self.content_len != 0 { + // This is the second or later frame for this Encoder, + // reset compressor state for the new frame. + self.content_len = 0; + self.src_stream_offset = 0; + self.src.clear(); + self.src_start = 0; + self.src_end = 0; + self.ext_dict_len = 0; + self.content_hasher = XxHash32::with_seed(0); + self.compression_table.clear(); + } + Ok(()) + } + + /// Consumes the src contents between src_start and src_end, + /// which shouldn't exceed the max block size. + fn write_block(&mut self) -> io::Result<()> { + debug_assert!(self.is_frame_open); + let max_block_size = self.frame_info.block_size.get_size(); + debug_assert!(self.src_end - self.src_start <= max_block_size); + + // Reposition the compression table if we're anywhere near an overflowing hazard + if self.src_stream_offset + max_block_size + WINDOW_SIZE >= u32::MAX as usize / 2 { + self.compression_table + .reposition((self.src_stream_offset - self.ext_dict_len) as _); + self.src_stream_offset = self.ext_dict_len; + } + + // input to the compressor, which may include a prefix when blocks are linked + let input = &self.src[..self.src_end]; + // the contents of the block are between src_start and src_end + let src = &input[self.src_start..]; + + let dst_required_size = crate::block::compress::get_maximum_output_size(src.len()); + + let compress_result = if self.ext_dict_len != 0 { + debug_assert_eq!(self.frame_info.block_mode, BlockMode::Linked); + compress_internal::<_, true, _>( + input, + self.src_start, + &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size), + &mut self.compression_table, + &self.src[self.ext_dict_offset..self.ext_dict_offset + self.ext_dict_len], + self.src_stream_offset, + ) + } else { + compress_internal::<_, false, _>( + input, + self.src_start, + &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size), + &mut self.compression_table, + b"", + self.src_stream_offset, + ) + }; + + let (block_info, block_data) = match compress_result.map_err(Error::CompressionError)? { + comp_len if comp_len < src.len() => { + (BlockInfo::Compressed(comp_len as _), &self.dst[..comp_len]) + } + _ => (BlockInfo::Uncompressed(src.len() as _), src), + }; + + // Write the (un)compressed block to the writer and the block checksum (if applicable). + let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE]; + block_info.write(&mut block_info_buffer[..])?; + self.w.write_all(&block_info_buffer[..])?; + self.w.write_all(block_data)?; + if self.frame_info.block_checksums { + let mut block_hasher = XxHash32::with_seed(0); + block_hasher.write(block_data); + let block_checksum = block_hasher.finish() as u32; + self.w.write_all(&block_checksum.to_le_bytes())?; + } + + // Content checksum, if applicable + if self.frame_info.content_checksum { + self.content_hasher.write(src); + } + + // Buffer and offsets maintenance + self.content_len += src.len() as u64; + self.src_start += src.len(); + debug_assert_eq!(self.src_start, self.src_end); + if self.frame_info.block_mode == BlockMode::Linked { + // In linked mode we consume the input (bumping src_start) but leave the + // beginning of src to be used as a prefix in subsequent blocks. + // That is at least until we have at least `max_block_size + WINDOW_SIZE` + // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes + // and the input goes to the beginning of src again. + debug_assert_eq!(self.src.capacity(), max_block_size * 2 + WINDOW_SIZE); + if self.src_start >= max_block_size + WINDOW_SIZE { + // The ext_dict will become the last WINDOW_SIZE bytes + self.ext_dict_offset = self.src_end - WINDOW_SIZE; + self.ext_dict_len = WINDOW_SIZE; + // Input goes in the beginning of the buffer again. + self.src_stream_offset += self.src_end; + self.src_start = 0; + self.src_end = 0; + } else if self.src_start + self.ext_dict_len > WINDOW_SIZE { + // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict. + // Since we have a limited buffer we must shrink ext_dict in favor of the prefix, + // so that we can fit up to max_block_size bytes between dst_start and ext_dict + // start. + let delta = self + .ext_dict_len + .min(self.src_start + self.ext_dict_len - WINDOW_SIZE); + self.ext_dict_offset += delta; + self.ext_dict_len -= delta; + debug_assert!(self.src_start + self.ext_dict_len >= WINDOW_SIZE) + } + debug_assert!( + self.ext_dict_len == 0 || self.src_start + max_block_size <= self.ext_dict_offset + ); + } else { + // In independent block mode we consume the entire src buffer + // which is sized equal to the frame max_block_size. + debug_assert_eq!(self.ext_dict_len, 0); + debug_assert_eq!(self.src.capacity(), max_block_size); + self.src_start = 0; + self.src_end = 0; + // Advance stream offset so we don't have to reset the match dict + // for the next block. + self.src_stream_offset += src.len(); + } + debug_assert!(self.src_start <= self.src_end); + debug_assert!(self.src_start + max_block_size <= self.src.capacity()); + Ok(()) + } +} + +impl<W: io::Write> io::Write for FrameEncoder<W> { + fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> { + if !self.is_frame_open && !buf.is_empty() { + self.begin_frame(buf.len())?; + } + let buf_len = buf.len(); + while !buf.is_empty() { + let src_filled = self.src_end - self.src_start; + let max_fill_len = self.frame_info.block_size.get_size() - src_filled; + if max_fill_len == 0 { + // make space by writing next block + self.write_block()?; + debug_assert_eq!(self.src_end, self.src_start); + continue; + } + + let fill_len = max_fill_len.min(buf.len()); + vec_copy_overwriting(&mut self.src, self.src_end, &buf[..fill_len]); + buf = &buf[fill_len..]; + self.src_end += fill_len; + } + Ok(buf_len) + } + + fn flush(&mut self) -> io::Result<()> { + if self.src_start != self.src_end { + self.write_block()?; + } + Ok(()) + } +} + +/// A wrapper around an [`FrameEncoder<W>`] that finishes the stream on drop. +/// +/// This can be created by the [`auto_finish()`] method on the [`FrameEncoder<W>`]. +/// +/// # Note +/// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or +/// [`try_finish()`] instead. +/// +/// [`finish()`]: FrameEncoder::finish +/// [`try_finish()`]: FrameEncoder::try_finish +/// [`auto_finish()`]: FrameEncoder::auto_finish +pub struct AutoFinishEncoder<W: Write> { + // We wrap this in an option to take it during drop. + encoder: Option<FrameEncoder<W>>, +} + +impl<W: io::Write> Drop for AutoFinishEncoder<W> { + fn drop(&mut self) { + if let Some(mut encoder) = self.encoder.take() { + let _ = encoder.try_finish(); + } + } +} + +impl<W: Write> Write for AutoFinishEncoder<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.encoder.as_mut().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.encoder.as_mut().unwrap().flush() + } +} + +impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameEncoder") + .field("w", &self.w) + .field("frame_info", &self.frame_info) + .field("is_frame_open", &self.is_frame_open) + .field("content_hasher", &self.content_hasher) + .field("content_len", &self.content_len) + .field("dst", &"[...]") + .field("src", &"[...]") + .field("src_start", &self.src_start) + .field("src_end", &self.src_end) + .field("ext_dict_offset", &self.ext_dict_offset) + .field("ext_dict_len", &self.ext_dict_len) + .field("src_stream_offset", &self.src_stream_offset) + .finish() + } +} + +/// Copy `src` into `target` starting from the `start` index, overwriting existing data if any. +#[inline] +fn vec_copy_overwriting(target: &mut Vec<u8>, target_start: usize, src: &[u8]) { + debug_assert!(target_start + src.len() <= target.capacity()); + + // By combining overwriting (copy_from_slice) and extending (extend_from_slice) + // we can fill the ring buffer without initializing it (eg. filling with 0). + let overwrite_len = (target.len() - target_start).min(src.len()); + target[target_start..target_start + overwrite_len].copy_from_slice(&src[..overwrite_len]); + target.extend_from_slice(&src[overwrite_len..]); +} |