aboutsummaryrefslogtreecommitdiff
path: root/pw_work_queue
diff options
context:
space:
mode:
authorEwout van Bekkum <ewout@google.com>2023-03-24 06:11:02 +0000
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2023-03-24 06:11:02 +0000
commitf4dd9366fb797dd3e909f669d12c0a61478c9c1d (patch)
tree96be58d1777492aee7e148b94c8a4ccc34c48bed /pw_work_queue
parent995e5ad13f6af02367284d1578028654251b8dd6 (diff)
downloadpigweed-f4dd9366fb797dd3e909f669d12c0a61478c9c1d.tar.gz
pw_work_queue: Migrate to pw::InlineQueue
Change-Id: I53f5aaf0da1f03a9a0295534fad3b66098d75236 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/134890 Pigweed-Auto-Submit: Ewout van Bekkum <ewout@google.com> Commit-Queue: Ewout van Bekkum <ewout@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_work_queue')
-rw-r--r--pw_work_queue/BUILD.bazel2
-rw-r--r--pw_work_queue/BUILD.gn6
-rw-r--r--pw_work_queue/CMakeLists.txt2
-rw-r--r--pw_work_queue/public/pw_work_queue/internal/circular_buffer.h80
-rw-r--r--pw_work_queue/public/pw_work_queue/work_queue.h19
-rw-r--r--pw_work_queue/work_queue.cc15
6 files changed, 22 insertions, 102 deletions
diff --git a/pw_work_queue/BUILD.bazel b/pw_work_queue/BUILD.bazel
index ff8855c1d..ec5813b39 100644
--- a/pw_work_queue/BUILD.bazel
+++ b/pw_work_queue/BUILD.bazel
@@ -30,11 +30,11 @@ pw_cc_library(
name = "pw_work_queue",
srcs = ["work_queue.cc"],
hdrs = [
- "public/pw_work_queue/internal/circular_buffer.h",
"public/pw_work_queue/work_queue.h",
],
includes = ["public"],
deps = [
+ "//pw_containers:inline_queue",
"//pw_function",
"//pw_metric:metric",
"//pw_status",
diff --git a/pw_work_queue/BUILD.gn b/pw_work_queue/BUILD.gn
index a84e30a08..265005bdf 100644
--- a/pw_work_queue/BUILD.gn
+++ b/pw_work_queue/BUILD.gn
@@ -27,11 +27,9 @@ config("public_include_path") {
pw_source_set("pw_work_queue") {
public_configs = [ ":public_include_path" ]
- public = [
- "public/pw_work_queue/internal/circular_buffer.h",
- "public/pw_work_queue/work_queue.h",
- ]
+ public = [ "public/pw_work_queue/work_queue.h" ]
public_deps = [
+ "$dir_pw_containers:inline_queue",
"$dir_pw_sync:interrupt_spin_lock",
"$dir_pw_sync:lock_annotations",
"$dir_pw_sync:thread_notification",
diff --git a/pw_work_queue/CMakeLists.txt b/pw_work_queue/CMakeLists.txt
index 32100a474..44ba6d359 100644
--- a/pw_work_queue/CMakeLists.txt
+++ b/pw_work_queue/CMakeLists.txt
@@ -16,11 +16,11 @@ include($ENV{PW_ROOT}/pw_build/pigweed.cmake)
pw_add_library(pw_work_queue STATIC
HEADERS
- public/pw_work_queue/internal/circular_buffer.h
public/pw_work_queue/work_queue.h
PUBLIC_INCLUDES
public
PUBLIC_DEPS
+ pw_containers.inline_queue
pw_sync.interrupt_spin_lock
pw_sync.lock_annotations
pw_sync.thread_notification
diff --git a/pw_work_queue/public/pw_work_queue/internal/circular_buffer.h b/pw_work_queue/public/pw_work_queue/internal/circular_buffer.h
deleted file mode 100644
index c2f4dc5dd..000000000
--- a/pw_work_queue/public/pw_work_queue/internal/circular_buffer.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright 2021 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#pragma once
-
-#include <cstdint>
-#include <optional>
-
-#include "pw_assert/assert.h"
-#include "pw_span/span.h"
-
-namespace pw::work_queue::internal {
-
-// TODO(hepler): Replace this with a std::deque like container.
-template <typename T>
-class CircularBuffer {
- public:
- explicit constexpr CircularBuffer(span<T> buffer)
- : buffer_(buffer), head_(0), tail_(0), count_(0) {}
-
- bool empty() const { return count_ == 0; }
- bool full() const { return count_ == buffer_.size(); }
- size_t size() const { return count_; }
- size_t capacity() const { return buffer_.size(); }
-
- template <typename Ty>
- bool Push(Ty&& value) {
- PW_DASSERT(tail_ < buffer_.size());
-
- if (full()) {
- return false;
- }
-
- buffer_[tail_] = std::forward<Ty>(value);
- IncrementWithWrap(tail_);
- ++count_;
- return true;
- }
-
- std::optional<T> Pop() {
- PW_DASSERT(head_ < buffer_.size());
-
- if (empty()) {
- return std::nullopt;
- }
-
- T entry = std::move(buffer_[head_]);
- IncrementWithWrap(head_);
- --count_;
- return entry;
- }
-
- private:
- void IncrementWithWrap(size_t& index) const {
- index++;
- // Note: branch is faster than mod (%) on common embedded architectures.
- if (index == buffer_.size()) {
- index = 0;
- }
- }
-
- span<T> buffer_;
-
- size_t head_;
- size_t tail_;
- size_t count_;
-};
-
-} // namespace pw::work_queue::internal
diff --git a/pw_work_queue/public/pw_work_queue/work_queue.h b/pw_work_queue/public/pw_work_queue/work_queue.h
index 7afb74e02..8d6691a4d 100644
--- a/pw_work_queue/public/pw_work_queue/work_queue.h
+++ b/pw_work_queue/public/pw_work_queue/work_queue.h
@@ -17,6 +17,7 @@
#include <array>
#include <cstdint>
+#include "pw_containers/inline_queue.h"
#include "pw_function/function.h"
#include "pw_metric/metric.h"
#include "pw_span/span.h"
@@ -25,7 +26,6 @@
#include "pw_sync/lock_annotations.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/thread_core.h"
-#include "pw_work_queue/internal/circular_buffer.h"
namespace pw::work_queue {
@@ -38,8 +38,10 @@ using WorkItem = Function<void()>;
class WorkQueue : public thread::ThreadCore {
public:
// Note: the ThreadNotification prevents this from being constexpr.
- explicit WorkQueue(span<WorkItem> queue_storage)
- : stop_requested_(false), circular_buffer_(queue_storage) {}
+ WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity)
+ : stop_requested_(false), queue_(queue) {
+ min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity));
+ }
// Enqueues a work_item for execution by the work queue thread.
//
@@ -78,7 +80,7 @@ class WorkQueue : public thread::ThreadCore {
sync::InterruptSpinLock lock_;
bool stop_requested_ PW_GUARDED_BY(lock_);
- internal::CircularBuffer<WorkItem> circular_buffer_ PW_GUARDED_BY(lock_);
+ InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
sync::ThreadNotification work_notification_;
// TODO(ewout): The group and/or its name token should be passed as a ctor
@@ -90,19 +92,16 @@ class WorkQueue : public thread::ThreadCore {
// metrics work as intended.
PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
- PW_METRIC(metrics_,
- min_queue_remaining_,
- "min_queue_remaining",
- static_cast<uint32_t>(circular_buffer_.capacity()));
+ PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
};
template <size_t kWorkQueueEntries>
class WorkQueueWithBuffer : public WorkQueue {
public:
- constexpr WorkQueueWithBuffer() : WorkQueue(queue_storage_) {}
+ constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {}
private:
- std::array<WorkItem, kWorkQueueEntries> queue_storage_;
+ InlineQueue<WorkItem, kWorkQueueEntries> queue_;
};
} // namespace pw::work_queue
diff --git a/pw_work_queue/work_queue.cc b/pw_work_queue/work_queue.cc
index 62be12381..c9d5e7fb0 100644
--- a/pw_work_queue/work_queue.cc
+++ b/pw_work_queue/work_queue.cc
@@ -37,8 +37,11 @@ void WorkQueue::Run() {
std::optional<WorkItem> possible_work_item;
{
std::lock_guard lock(lock_);
- possible_work_item = circular_buffer_.Pop();
- work_remaining = !circular_buffer_.empty();
+ if (!queue_.empty()) {
+ possible_work_item.emplace(std::move(queue_.front()));
+ queue_.pop();
+ }
+ work_remaining = !queue_.empty();
stop_requested = stop_requested_;
}
if (!possible_work_item.has_value()) {
@@ -69,18 +72,18 @@ Status WorkQueue::InternalPushWork(WorkItem&& work_item) {
return Status::FailedPrecondition();
}
- if (circular_buffer_.full()) {
+ if (queue_.full()) {
return Status::ResourceExhausted();
}
- circular_buffer_.Push(std::move(work_item));
+ queue_.emplace(std::move(work_item));
// Update the watermarks for the queue.
- const uint32_t queue_entries = circular_buffer_.size();
+ const uint32_t queue_entries = queue_.size();
if (queue_entries > max_queue_used_.value()) {
max_queue_used_.Set(queue_entries);
}
- const uint32_t queue_remaining = circular_buffer_.capacity() - queue_entries;
+ const uint32_t queue_remaining = queue_.capacity() - queue_entries;
if (queue_remaining < min_queue_remaining_.value()) {
min_queue_remaining_.Set(queue_entries);
}