diff options
author | Daniel Zheng <zhengdaniel@google.com> | 2024-02-12 16:19:37 -0800 |
---|---|---|
committer | Daniel Zheng <zhengdaniel@google.com> | 2024-02-22 21:38:47 -0800 |
commit | bee3f962fce9415214c52f79d5b92efcd5f20f25 (patch) | |
tree | dcac32f706172f55bc2b26db35390e5c9fefc0a7 /fs_mgr | |
parent | df09f80cf43981ef0d96826ee5b879864029184c (diff) | |
download | core-bee3f962fce9415214c52f79d5b92efcd5f20f25.tar.gz |
libsnapshot: stride compression
Alternate dispatching blocks between threads rather than splitting the
data beforehand and then sending to threads in order to ensure that
single threading + multithreading chunks data at the same locations.
Without this change, the resulting op count + data section of the cow
will differ between --enable-threading && --disable-threading at
runtime, which is a result we don't want
Test: th
Change-Id: I3ed8add0552745a281fce2aa7f1d1d32eb547e63
Diffstat (limited to 'fs_mgr')
-rw-r--r-- | fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp index 30c5135a2..de2e52833 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp @@ -603,41 +603,47 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompres std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression( const size_t num_blocks, const void* data, CowOperationType type) { const size_t num_threads = num_compress_threads_; - const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); + // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2 + // until all blocks are processed std::vector<CompressedBuffer> compressed_vec; - // Submit the blocks per thread. The retrieval of - // compressed buffers has to be done in the same order. - // We should not poll for completed buffers in a different order as the - // buffers are tightly coupled with block ordering. - for (size_t i = 0; i < num_threads; i++) { - CompressWorker* worker = compress_threads_[i].get(); - auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); - // Enqueue the blocks to be compressed for each thread. - while (blocks_in_batch) { - CompressedBuffer buffer; - - const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type); - size_t num_blocks = compression_factor / header_.block_size; - - buffer.compression_factor = compression_factor; - worker->EnqueueCompressBlocks(iter, compression_factor, 1); - compressed_vec.push_back(std::move(buffer)); - blocks_in_batch -= num_blocks; - iter += compression_factor; - } + int iteration = 0; + int blocks_to_compress = static_cast<int>(num_blocks); + while (blocks_to_compress) { + CompressedBuffer buffer; + CompressWorker* worker = compress_threads_[iteration % num_threads].get(); + + const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type); + size_t num_blocks = compression_factor / header_.block_size; + + worker->EnqueueCompressBlocks(iter, compression_factor, 1); + buffer.compression_factor = compression_factor; + compressed_vec.push_back(std::move(buffer)); + + iteration++; + iter += compression_factor; + blocks_to_compress -= num_blocks; } - // Fetch compressed buffers from the threads std::vector<std::vector<uint8_t>> compressed_buf; + std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads); compressed_buf.clear(); for (size_t i = 0; i < num_threads; i++) { CompressWorker* worker = compress_threads_[i].get(); - if (!worker->GetCompressedBuffers(&compressed_buf)) { + if (!worker->GetCompressedBuffers(&worker_buffers[i])) { return {}; } } + // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers + // t1 t2 t1 t2 <- processed by these threads + // Ordering is important here. We need to retrieve the compressed data in the same order we + // processed it and assume that that we submit data beginning with the first thread and then + // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads + // via the same ordering + for (size_t i = 0; i < compressed_vec.size(); i++) { + compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); + } if (compressed_vec.size() != compressed_buf.size()) { LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size() |