diff options
Diffstat (limited to 'libfec')
-rw-r--r-- | libfec/fec_open.cpp | 7 | ||||
-rw-r--r-- | libfec/fec_private.h | 3 | ||||
-rw-r--r-- | libfec/fec_process.cpp | 63 | ||||
-rw-r--r-- | libfec/test/fec_unittest.cpp | 10 |
4 files changed, 32 insertions, 51 deletions
diff --git a/libfec/fec_open.cpp b/libfec/fec_open.cpp index 175207b8..6825942b 100644 --- a/libfec/fec_open.cpp +++ b/libfec/fec_open.cpp @@ -418,8 +418,6 @@ int fec_close(struct fec_handle *f) close(f->fd); } - pthread_mutex_destroy(&f->mutex); - reset_handle(f); delete f; @@ -528,11 +526,6 @@ int fec_open(struct fec_handle **handle, const char *path, int mode, int flags, f->ecc.rsn = FEC_RSM - roots; f->flags = flags; - if (unlikely(pthread_mutex_init(&f->mutex, NULL) != 0)) { - error("failed to create a mutex: %s", strerror(errno)); - return -1; - } - f->fd = TEMP_FAILURE_RETRY(open(path, mode | O_CLOEXEC)); if (f->fd == -1) { diff --git a/libfec/fec_private.h b/libfec/fec_private.h index 199f8017..0c633223 100644 --- a/libfec/fec_private.h +++ b/libfec/fec_private.h @@ -135,8 +135,7 @@ struct fec_handle { ecc_info ecc; int fd; int flags; /* additional flags passed to fec_open */ - int mode; /* mode for open(2) */ - pthread_mutex_t mutex; + int mode; /* mode for open(2) */ uint64_t errors; uint64_t data_size; uint64_t pos; diff --git a/libfec/fec_process.cpp b/libfec/fec_process.cpp index f11b8b2c..51290772 100644 --- a/libfec/fec_process.cpp +++ b/libfec/fec_process.cpp @@ -14,12 +14,13 @@ * limitations under the License. */ +#include <future> #include "fec_private.h" struct process_info { int id; - fec_handle *f; - uint8_t *buf; + fec_handle* f; + uint8_t* buf; size_t count; uint64_t offset; read_func func; @@ -28,21 +29,15 @@ struct process_info { }; /* thread function */ -static void * __process(void *cookie) -{ - process_info *p = static_cast<process_info *>(cookie); - - debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, - p->offset + p->count); +static process_info* __process(process_info* p) { + debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count); p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors); return p; } /* launches a maximum number of threads to process a read */ -ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, - read_func func) -{ +ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) { check(f); check(buf); check(func); @@ -60,30 +55,27 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, } uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE; - size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE); - - size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE; - size_t max_threads = fec_div_round_up(count, count_per_thread); + size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE); - if ((size_t)threads > max_threads) { - threads = (int)max_threads; + /* start at most one thread per block we're accessing */ + if ((size_t)threads > blocks) { + threads = (int)blocks; } + size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE; size_t left = count; uint64_t pos = offset; uint64_t end = start + count_per_thread; - debug("%d threads, %zu bytes per thread (total %zu)", threads, - count_per_thread, count); + debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads, + count_per_thread, count, blocks); - std::vector<pthread_t> handles; + std::vector<std::future<process_info*>> handles; process_info info[threads]; ssize_t rc = 0; /* start threads to process queue */ - for (int i = 0; i < threads; ++i) { - check(left > 0); - + for (int i = 0; i < threads && left > 0; ++i) { info[i].id = i; info[i].f = f; info[i].buf = &buf[pos - offset]; @@ -97,32 +89,19 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, info[i].count = left; } - pthread_t thread; - - if (pthread_create(&thread, NULL, __process, &info[i]) != 0) { - error("failed to create thread: %s", strerror(errno)); - rc = -1; - } else { - handles.push_back(thread); - } + handles.push_back(std::async(std::launch::async, __process, &info[i])); pos = end; - end += count_per_thread; + end += count_per_thread; left -= info[i].count; } - check(left == 0); - ssize_t nread = 0; /* wait for all threads to complete */ - for (auto thread : handles) { - process_info *p = NULL; - - if (pthread_join(thread, (void **)&p) != 0) { - error("failed to join thread: %s", strerror(errno)); - rc = -1; - } else if (!p || p->rc == -1) { + for (auto&& future : handles) { + process_info* p = future.get(); + if (!p || p->rc == -1) { rc = -1; } else { nread += p->rc; @@ -130,7 +109,7 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, } } - if (rc == -1) { + if (left > 0 || rc == -1) { errno = EIO; return -1; } diff --git a/libfec/test/fec_unittest.cpp b/libfec/test/fec_unittest.cpp index 421eb501..1ced2d98 100644 --- a/libfec/test/fec_unittest.cpp +++ b/libfec/test/fec_unittest.cpp @@ -215,6 +215,16 @@ TEST_F(FecUnitTest, VerityImage_FecRead) { ASSERT_EQ(1024, fec_pread(handle, read_data.data(), 1024, corrupt_offset)); ASSERT_EQ(std::vector<uint8_t>(1024, 255), read_data); + + // Unaligned read that spans two blocks + ASSERT_EQ(678, fec_pread(handle, read_data.data(), 678, corrupt_offset - 123)); + ASSERT_EQ(std::vector<uint8_t>(123, 254), + std::vector<uint8_t>(read_data.begin(), read_data.begin() + 123)); + ASSERT_EQ(std::vector<uint8_t>(555, 255), + std::vector<uint8_t>(read_data.begin() + 123, read_data.begin() + 678)); + + std::vector<uint8_t> large_data(53388, 0); + ASSERT_EQ(53388, fec_pread(handle, large_data.data(), 53388, 385132)); } TEST_F(FecUnitTest, LoadAvbImage_HashtreeFooter) { |