summaryrefslogtreecommitdiff
path: root/libfec
diff options
context:
space:
mode:
Diffstat (limited to 'libfec')
-rw-r--r--libfec/fec_open.cpp7
-rw-r--r--libfec/fec_private.h3
-rw-r--r--libfec/fec_process.cpp63
-rw-r--r--libfec/test/fec_unittest.cpp10
4 files changed, 32 insertions, 51 deletions
diff --git a/libfec/fec_open.cpp b/libfec/fec_open.cpp
index 175207b8..6825942b 100644
--- a/libfec/fec_open.cpp
+++ b/libfec/fec_open.cpp
@@ -418,8 +418,6 @@ int fec_close(struct fec_handle *f)
close(f->fd);
}
- pthread_mutex_destroy(&f->mutex);
-
reset_handle(f);
delete f;
@@ -528,11 +526,6 @@ int fec_open(struct fec_handle **handle, const char *path, int mode, int flags,
f->ecc.rsn = FEC_RSM - roots;
f->flags = flags;
- if (unlikely(pthread_mutex_init(&f->mutex, NULL) != 0)) {
- error("failed to create a mutex: %s", strerror(errno));
- return -1;
- }
-
f->fd = TEMP_FAILURE_RETRY(open(path, mode | O_CLOEXEC));
if (f->fd == -1) {
diff --git a/libfec/fec_private.h b/libfec/fec_private.h
index 199f8017..0c633223 100644
--- a/libfec/fec_private.h
+++ b/libfec/fec_private.h
@@ -135,8 +135,7 @@ struct fec_handle {
ecc_info ecc;
int fd;
int flags; /* additional flags passed to fec_open */
- int mode; /* mode for open(2) */
- pthread_mutex_t mutex;
+ int mode; /* mode for open(2) */
uint64_t errors;
uint64_t data_size;
uint64_t pos;
diff --git a/libfec/fec_process.cpp b/libfec/fec_process.cpp
index f11b8b2c..51290772 100644
--- a/libfec/fec_process.cpp
+++ b/libfec/fec_process.cpp
@@ -14,12 +14,13 @@
* limitations under the License.
*/
+#include <future>
#include "fec_private.h"
struct process_info {
int id;
- fec_handle *f;
- uint8_t *buf;
+ fec_handle* f;
+ uint8_t* buf;
size_t count;
uint64_t offset;
read_func func;
@@ -28,21 +29,15 @@ struct process_info {
};
/* thread function */
-static void * __process(void *cookie)
-{
- process_info *p = static_cast<process_info *>(cookie);
-
- debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
- p->offset + p->count);
+static process_info* __process(process_info* p) {
+ debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);
p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
return p;
}
/* launches a maximum number of threads to process a read */
-ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
- read_func func)
-{
+ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
check(f);
check(buf);
check(func);
@@ -60,30 +55,27 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
}
uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
- size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);
-
- size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
- size_t max_threads = fec_div_round_up(count, count_per_thread);
+ size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE);
- if ((size_t)threads > max_threads) {
- threads = (int)max_threads;
+ /* start at most one thread per block we're accessing */
+ if ((size_t)threads > blocks) {
+ threads = (int)blocks;
}
+ size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
size_t left = count;
uint64_t pos = offset;
uint64_t end = start + count_per_thread;
- debug("%d threads, %zu bytes per thread (total %zu)", threads,
- count_per_thread, count);
+ debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
+ count_per_thread, count, blocks);
- std::vector<pthread_t> handles;
+ std::vector<std::future<process_info*>> handles;
process_info info[threads];
ssize_t rc = 0;
/* start threads to process queue */
- for (int i = 0; i < threads; ++i) {
- check(left > 0);
-
+ for (int i = 0; i < threads && left > 0; ++i) {
info[i].id = i;
info[i].f = f;
info[i].buf = &buf[pos - offset];
@@ -97,32 +89,19 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
info[i].count = left;
}
- pthread_t thread;
-
- if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
- error("failed to create thread: %s", strerror(errno));
- rc = -1;
- } else {
- handles.push_back(thread);
- }
+ handles.push_back(std::async(std::launch::async, __process, &info[i]));
pos = end;
- end += count_per_thread;
+ end += count_per_thread;
left -= info[i].count;
}
- check(left == 0);
-
ssize_t nread = 0;
/* wait for all threads to complete */
- for (auto thread : handles) {
- process_info *p = NULL;
-
- if (pthread_join(thread, (void **)&p) != 0) {
- error("failed to join thread: %s", strerror(errno));
- rc = -1;
- } else if (!p || p->rc == -1) {
+ for (auto&& future : handles) {
+ process_info* p = future.get();
+ if (!p || p->rc == -1) {
rc = -1;
} else {
nread += p->rc;
@@ -130,7 +109,7 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
}
}
- if (rc == -1) {
+ if (left > 0 || rc == -1) {
errno = EIO;
return -1;
}
diff --git a/libfec/test/fec_unittest.cpp b/libfec/test/fec_unittest.cpp
index 421eb501..1ced2d98 100644
--- a/libfec/test/fec_unittest.cpp
+++ b/libfec/test/fec_unittest.cpp
@@ -215,6 +215,16 @@ TEST_F(FecUnitTest, VerityImage_FecRead) {
ASSERT_EQ(1024, fec_pread(handle, read_data.data(), 1024, corrupt_offset));
ASSERT_EQ(std::vector<uint8_t>(1024, 255), read_data);
+
+ // Unaligned read that spans two blocks
+ ASSERT_EQ(678, fec_pread(handle, read_data.data(), 678, corrupt_offset - 123));
+ ASSERT_EQ(std::vector<uint8_t>(123, 254),
+ std::vector<uint8_t>(read_data.begin(), read_data.begin() + 123));
+ ASSERT_EQ(std::vector<uint8_t>(555, 255),
+ std::vector<uint8_t>(read_data.begin() + 123, read_data.begin() + 678));
+
+ std::vector<uint8_t> large_data(53388, 0);
+ ASSERT_EQ(53388, fec_pread(handle, large_data.data(), 53388, 385132));
}
TEST_F(FecUnitTest, LoadAvbImage_HashtreeFooter) {