aboutsummaryrefslogtreecommitdiff
path: root/pw_async
diff options
context:
space:
mode:
Diffstat (limited to 'pw_async')
-rw-r--r--pw_async/BUILD.bazel118
-rw-r--r--pw_async/BUILD.gn38
-rw-r--r--pw_async/CMakeLists.txt55
-rw-r--r--pw_async/OWNERS2
-rw-r--r--pw_async/backend.cmake21
-rw-r--r--pw_async/docs.rst37
-rw-r--r--pw_async/fake_dispatcher_fixture.gni1
-rw-r--r--pw_async/fake_dispatcher_test.cc374
-rw-r--r--pw_async/fake_dispatcher_test.gni2
-rw-r--r--pw_async/heap_dispatcher.cc66
-rw-r--r--pw_async/heap_dispatcher.gni38
-rw-r--r--pw_async/public/pw_async/context.h (renamed from pw_async/public/pw_async/internal/types.h)19
-rw-r--r--pw_async/public/pw_async/dispatcher.h71
-rw-r--r--pw_async/public/pw_async/fake_dispatcher.h40
-rw-r--r--pw_async/public/pw_async/fake_dispatcher_fixture.h17
-rw-r--r--pw_async/public/pw_async/function_dispatcher.h52
-rw-r--r--pw_async/public/pw_async/heap_dispatcher.h48
-rw-r--r--pw_async/public/pw_async/task.h21
-rw-r--r--pw_async/public/pw_async/task_function.h39
19 files changed, 849 insertions, 210 deletions
diff --git a/pw_async/BUILD.bazel b/pw_async/BUILD.bazel
index 2ae446d6b..4c1481100 100644
--- a/pw_async/BUILD.bazel
+++ b/pw_async/BUILD.bazel
@@ -12,14 +12,116 @@
# License for the specific language governing permissions and limitations under
# the License.
-filegroup(
- name = "pw_async_files",
- srcs = [
- "fake_dispatcher_test.cc",
+load(
+ "//pw_build:pigweed.bzl",
+ "pw_cc_facade",
+ "pw_cc_library",
+ "pw_cc_test",
+)
+
+package(default_visibility = ["//visibility:public"])
+
+licenses(["notice"])
+
+pw_cc_library(
+ name = "dispatcher",
+ hdrs = [
"public/pw_async/dispatcher.h",
- "public/pw_async/fake_dispatcher.h",
- "public/pw_async/fake_dispatcher_fixture.h",
- "public/pw_async/internal/types.h",
- "public/pw_async/task.h",
+ "public/pw_async/function_dispatcher.h",
+ ],
+ includes = ["public"],
+ deps = [
+ ":types",
+ "//pw_chrono:system_clock",
+ "//pw_function",
+ "//pw_status",
+ ],
+)
+
+pw_cc_facade(
+ name = "task_facade",
+ hdrs = ["public/pw_async/task.h"],
+ includes = ["public"],
+ deps = [
+ ":types",
+ "//pw_chrono:system_clock",
+ "//pw_function",
+ "//pw_status",
+ ],
+)
+
+pw_cc_library(
+ name = "task",
+ hdrs = ["public/pw_async/task.h"],
+ includes = ["public"],
+ deps = [
+ ":types",
+ "//pw_chrono:system_clock",
+ "//pw_function",
+ "//pw_status",
+ "@pigweed//targets:pw_async_task_backend",
+ ],
+)
+
+pw_cc_library(
+ name = "types",
+ hdrs = [
+ "public/pw_async/context.h",
+ "public/pw_async/task_function.h",
+ ],
+ includes = ["public"],
+ deps = [
+ "//pw_function",
+ "//pw_status",
+ ],
+)
+
+pw_cc_facade(
+ name = "fake_dispatcher_facade",
+ hdrs = ["public/pw_async/fake_dispatcher.h"],
+ includes = ["public"],
+ deps = [":dispatcher"],
+)
+
+pw_cc_library(
+ name = "fake_dispatcher",
+ hdrs = ["public/pw_async/fake_dispatcher.h"],
+ includes = ["public"],
+ deps = [
+ ":dispatcher",
+ "@pigweed//targets:pw_async_fake_dispatcher_backend",
+ ],
+)
+
+pw_cc_test(
+ name = "fake_dispatcher_test",
+ srcs = ["fake_dispatcher_test.cc"],
+ deps = [
+ ":fake_dispatcher",
+ "//pw_containers:vector",
+ "//pw_log",
+ "//pw_string:to_string",
+ "//pw_sync:timed_thread_notification",
+ "//pw_thread:thread",
+ ],
+)
+
+pw_cc_library(
+ name = "fake_dispatcher_fixture",
+ hdrs = ["public/pw_async/fake_dispatcher_fixture.h"],
+ includes = ["public"],
+ deps = [":fake_dispatcher"],
+)
+
+pw_cc_library(
+ name = "heap_dispatcher",
+ srcs = ["heap_dispatcher.cc"],
+ hdrs = ["public/pw_async/heap_dispatcher.h"],
+ includes = ["public"],
+ deps = [
+ ":dispatcher",
+ ":task",
+ ":types",
+ "//pw_result",
],
)
diff --git a/pw_async/BUILD.gn b/pw_async/BUILD.gn
index 483b9b5e6..59ccdd046 100644
--- a/pw_async/BUILD.gn
+++ b/pw_async/BUILD.gn
@@ -18,6 +18,7 @@ import("$dir_pw_async/async.gni")
import("$dir_pw_async/backend.gni")
import("$dir_pw_async/fake_dispatcher_fixture.gni")
import("$dir_pw_async/fake_dispatcher_test.gni")
+import("$dir_pw_async/heap_dispatcher.gni")
import("$dir_pw_build/facade.gni")
import("$dir_pw_build/target_types.gni")
import("$dir_pw_chrono/backend.gni")
@@ -30,10 +31,15 @@ config("public_include_path") {
pw_source_set("dispatcher") {
public_configs = [ ":public_include_path" ]
public_deps = [
+ ":types",
"$dir_pw_chrono:system_clock",
dir_pw_function,
+ dir_pw_status,
+ ]
+ public = [
+ "public/pw_async/dispatcher.h",
+ "public/pw_async/function_dispatcher.h",
]
- public = [ "public/pw_async/dispatcher.h" ]
visibility = [
":*",
"$dir_pw_async_basic:*",
@@ -44,13 +50,27 @@ pw_facade("task") {
backend = pw_async_TASK_BACKEND
public_configs = [ ":public_include_path" ]
public_deps = [
+ ":types",
"$dir_pw_chrono:system_clock",
dir_pw_function,
dir_pw_status,
]
+ public = [ "public/pw_async/task.h" ]
+ visibility = [
+ ":*",
+ "$dir_pw_async_basic:*",
+ ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY
+}
+
+pw_source_set("types") {
+ public_configs = [ ":public_include_path" ]
+ public_deps = [
+ dir_pw_function,
+ dir_pw_status,
+ ]
public = [
- "public/pw_async/internal/types.h",
- "public/pw_async/task.h",
+ "public/pw_async/context.h",
+ "public/pw_async/task_function.h",
]
visibility = [
":*",
@@ -77,6 +97,14 @@ fake_dispatcher_fixture("fake_dispatcher_fixture") {
] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY
}
+pw_async_heap_dispatcher_source_set("heap_dispatcher") {
+ task_backend = ":task"
+ visibility = [
+ ":*",
+ "$dir_pw_async_basic:*",
+ ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY
+}
+
pw_test_group("tests") {
}
@@ -85,10 +113,12 @@ pw_doc_group("docs") {
}
# Satisfy source_is_in_build_files presubmit step
-pw_source_set("fake_dispatcher_test") {
+pw_source_set("satisfy_presubmit") {
sources = [
"fake_dispatcher_test.cc",
+ "heap_dispatcher.cc",
"public/pw_async/fake_dispatcher_fixture.h",
+ "public/pw_async/heap_dispatcher.h",
]
visibility = []
}
diff --git a/pw_async/CMakeLists.txt b/pw_async/CMakeLists.txt
new file mode 100644
index 000000000..126a05854
--- /dev/null
+++ b/pw_async/CMakeLists.txt
@@ -0,0 +1,55 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+include($ENV{PW_ROOT}/pw_build/pigweed.cmake)
+include($ENV{PW_ROOT}/pw_async/backend.cmake)
+
+pw_add_library(pw_async.types INTERFACE
+ HEADERS
+ public/pw_async/context.h
+ public/pw_async/task_function.h
+ PUBLIC_INCLUDES
+ public
+ PUBLIC_DEPS
+ pw_function
+ pw_status
+)
+
+pw_add_facade(pw_async.task INTERFACE
+ BACKEND
+ pw_async.task_BACKEND
+ HEADERS
+ public/pw_async/task.h
+ PUBLIC_INCLUDES
+ public
+ PUBLIC_DEPS
+ pw_async.types
+ pw_function
+ pw_status
+)
+
+pw_add_facade(pw_async.dispatcher INTERFACE
+ BACKEND
+ pw_async.dispatcher_BACKEND
+ HEADERS
+ public/pw_async/dispatcher.h
+ public/pw_async/function_dispatcher.h
+ PUBLIC_INCLUDES
+ public
+ PUBLIC_DEPS
+ pw_async.types
+ pw_chrono.system_clock
+ pw_function
+ pw_status
+)
diff --git a/pw_async/OWNERS b/pw_async/OWNERS
new file mode 100644
index 000000000..95e4875d7
--- /dev/null
+++ b/pw_async/OWNERS
@@ -0,0 +1,2 @@
+benlawson@google.com
+cramertj@google.com
diff --git a/pw_async/backend.cmake b/pw_async/backend.cmake
new file mode 100644
index 000000000..b9ddc0287
--- /dev/null
+++ b/pw_async/backend.cmake
@@ -0,0 +1,21 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+include_guard(GLOBAL)
+
+include($ENV{PW_ROOT}/pw_build/pigweed.cmake)
+
+# Backend for the pw_async module.
+pw_add_backend_variable(pw_async.task_BACKEND)
+pw_add_backend_variable(pw_async.dispatcher_BACKEND)
diff --git a/pw_async/docs.rst b/pw_async/docs.rst
index e1a34e009..8ead20e04 100644
--- a/pw_async/docs.rst
+++ b/pw_async/docs.rst
@@ -33,6 +33,11 @@ Dispatcher API
Task API
==============
+.. doxygenstruct:: pw::async::Context
+ :members:
+
+.. doxygentypedef:: pw::async::TaskFunction
+
.. doxygenclass:: pw::async::Task
:members:
@@ -41,8 +46,17 @@ Facade API
Task
----
-The Task type represents a work item that is submitted to a Dispatcher. The Task
-facade enables Dispatcher backends to specify custom state and methods.
+The ``Task`` type represents a work item that can be submitted to and executed
+by a ``Dispatcher``.
+
+To run work on a ``Dispatcher`` event loop, a ``Task`` can be constructed from
+a function or lambda (see ``pw::async::TaskFunction``) and submitted to run
+using the ``pw::async::Dispatcher::Post`` method (and its siblings, ``PostAt``
+etc.).
+
+The ``Task`` facade enables backends to provide custom storage containers for
+``Task`` s, as well as to keep per- ``Task`` data alongside the ``TaskFunction``
+(such as ``next`` pointers for intrusive linked-lists of ``Task``).
The active Task backend is configured with the GN variable
``pw_async_TASK_BACKEND``. The specified target must define a class
@@ -50,6 +64,9 @@ The active Task backend is configured with the GN variable
that meets the interface requirements in ``public/pw_async/task.h``. Task will
then trivially wrap ``NativeTask``.
+The bazel build provides the ``pw_async_task_backend`` label flag to configure
+the active Task backend.
+
FakeDispatcher
--------------
The FakeDispatcher facade is a utility for simulating a real Dispatcher
@@ -58,13 +75,16 @@ code that uses Dispatcher. FakeDispatcher is a facade instead of a concrete
implementation because it depends on Task state for processing tasks, which
varies across Task backends.
-The active Task backend is configured with the GN variable
+The active FakeDispatcher backend is configured with the GN variable
``pw_async_FAKE_DISPATCHER_BACKEND``. The specified target must define a class
``pw::async::test::backend::NativeFakeDispatcher`` in the header
``pw_async_backend/fake_dispatcher.h`` that meets the interface requirements in
``public/pw_async/task.h``. FakeDispatcher will then trivially wrap
``NativeFakeDispatcher``.
+The bazel build provides the ``pw_async_fake_dispatcher_backend`` label flag to
+configure the FakeDispatcher backend.
+
Testing FakeDispatcher
^^^^^^^^^^^^^^^^^^^^^^
The GN template ``fake_dispatcher_tests`` in ``fake_dispatcher_tests.gni``
@@ -72,6 +92,16 @@ creates a test target that tests a FakeDispatcher backend. This enables
one test suite to be shared across FakeDispatcher backends and ensures
conformance.
+FunctionDispatcher
+------------------
+.. doxygenclass:: pw::async::FunctionDispatcher
+ :members:
+
+HeapDispatcher
+--------------
+.. doxygenclass:: pw::async::HeapDispatcher
+ :members:
+
Design
======
@@ -169,6 +199,5 @@ Roadmap
-------
- Stabilize Task cancellation API
- Utility for dynamically allocated Tasks
-- Bazel support
- CMake support
- Support for C++20 coroutines
diff --git a/pw_async/fake_dispatcher_fixture.gni b/pw_async/fake_dispatcher_fixture.gni
index a58be16ac..7e8fda760 100644
--- a/pw_async/fake_dispatcher_fixture.gni
+++ b/pw_async/fake_dispatcher_fixture.gni
@@ -28,6 +28,7 @@ template("fake_dispatcher_fixture") {
assert(defined(invoker.backend))
pw_source_set(target_name) {
+ testonly = pw_unit_test_TESTONLY
public = [ "$dir_pw_async/public/pw_async/fake_dispatcher_fixture.h" ]
public_deps = [
"$dir_pw_unit_test",
diff --git a/pw_async/fake_dispatcher_test.cc b/pw_async/fake_dispatcher_test.cc
index 3674f4cb0..3e5113aa1 100644
--- a/pw_async/fake_dispatcher_test.cc
+++ b/pw_async/fake_dispatcher_test.cc
@@ -14,176 +14,316 @@
#include "pw_async/fake_dispatcher.h"
#include "gtest/gtest.h"
-#include "pw_thread/thread.h"
-#include "pw_thread_stl/options.h"
+#include "pw_containers/vector.h"
+#include "pw_string/to_string.h"
#define ASSERT_OK(status) ASSERT_EQ(OkStatus(), status)
#define ASSERT_CANCELLED(status) ASSERT_EQ(Status::Cancelled(), status)
using namespace std::chrono_literals;
+struct CallCounts {
+ int ok = 0;
+ int cancelled = 0;
+ bool operator==(const CallCounts& other) const {
+ return ok == other.ok && cancelled == other.cancelled;
+ }
+};
+
+namespace pw {
+template <>
+StatusWithSize ToString<CallCounts>(const CallCounts& value,
+ span<char> buffer) {
+ return string::Format(buffer,
+ "CallCounts {.ok = %d, .cancelled = %d}",
+ value.ok,
+ value.cancelled);
+}
+} // namespace pw
+
namespace pw::async::test {
+namespace {
+
+struct CallCounter {
+ CallCounts counts;
+ auto fn() {
+ return [this](Context&, Status status) {
+ if (status.ok()) {
+ this->counts.ok++;
+ } else if (status.IsCancelled()) {
+ this->counts.cancelled++;
+ }
+ };
+ }
+};
-TEST(FakeDispatcher, PostTasks) {
+TEST(FakeDispatcher, UnpostedTasksDontRun) {
FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{});
+}
- int count = 0;
- auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
- ASSERT_OK(status);
- ++count;
- };
+TEST(FakeDispatcher, PostedTaskRunsOnce) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- Task task(inc_count);
+TEST(FakeDispatcher, TaskPostedTwiceBeforeRunningRunsOnce) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
dispatcher.Post(task);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- Task task2(inc_count);
- dispatcher.Post(task2);
+TEST(FakeDispatcher, TaskRepostedAfterRunningRunsTwice) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+ dispatcher.Post(task);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 2});
+}
- Task task3(inc_count);
- dispatcher.Post(task3);
+TEST(FakeDispatcher, TwoPostedTasksEachRunOnce) {
+ FakeDispatcher dispatcher;
+ CallCounter counter_1;
+ Task task_1(counter_1.fn());
+ CallCounter counter_2;
+ Task task_2(counter_2.fn());
+ dispatcher.Post(task_1);
+ dispatcher.Post(task_2);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter_1.counts, CallCounts{.ok = 1});
+ EXPECT_EQ(counter_2.counts, CallCounts{.ok = 1});
+}
- // Should not run; RunUntilIdle() does not advance time.
- Task task4([&count]([[maybe_unused]] Context& c, Status status) {
- ASSERT_CANCELLED(status);
- ++count;
- });
- dispatcher.PostAfter(task4, 1ms);
+TEST(FakeDispatcher, PostedTasksRunInOrderForFairness) {
+ FakeDispatcher dispatcher;
+ pw::Vector<uint8_t, 3> task_run_order;
+ Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); });
+ Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); });
+ Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); });
+ dispatcher.Post(task_1);
+ dispatcher.Post(task_2);
+ dispatcher.Post(task_3);
+ dispatcher.RunUntilIdle();
+ pw::Vector<uint8_t, 3> expected_run_order({1, 2, 3});
+ EXPECT_EQ(task_run_order, expected_run_order);
+}
+TEST(FakeDispatcher, RequestStopQueuesPreviouslyPostedTaskWithCancel) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ dispatcher.RequestStop();
dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1});
+}
+
+TEST(FakeDispatcher, RequestStopQueuesNewlyPostedTaskWithCancel) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
dispatcher.RequestStop();
+ dispatcher.Post(task);
dispatcher.RunUntilIdle();
- ASSERT_EQ(count, 4);
+ EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1});
}
-// Lambdas can only capture one ptr worth of memory without allocating, so we
-// group the data we want to share between tasks and their containing tests
-// inside one struct.
-struct TaskPair {
- Task task_a;
- Task task_b;
- int count = 0;
-};
+TEST(FakeDispatcher, RunUntilIdleDoesNotRunFutureTask) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ // Should not run; RunUntilIdle() does not advance time.
+ Task task(counter.fn());
+ dispatcher.PostAfter(task, 1ms);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{});
+}
-TEST(FakeDispatcher, DelayedTasks) {
+TEST(FakeDispatcher, PostAfterRunsTasksInSequence) {
FakeDispatcher dispatcher;
- TaskPair tp;
+ pw::Vector<uint8_t, 3> task_run_order;
+ Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); });
+ Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); });
+ Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); });
+ dispatcher.PostAfter(task_1, 50ms);
+ dispatcher.PostAfter(task_2, 25ms);
+ dispatcher.PostAfter(task_3, 100ms);
+ dispatcher.RunFor(125ms);
+ pw::Vector<uint8_t, 3> expected_run_order({2, 1, 3});
+ EXPECT_EQ(task_run_order, expected_run_order);
+}
- Task task0([&tp]([[maybe_unused]] Context& c, Status status) {
- ASSERT_OK(status);
- tp.count = tp.count * 10 + 4;
- });
- dispatcher.PostAfter(task0, 200ms);
+TEST(FakeDispatcher, PostAfterWithEarlierTimeRunsSooner) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.PostAfter(task, 100ms);
+ dispatcher.PostAfter(task, 50ms);
+ dispatcher.RunFor(60ms);
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- Task task1([&tp]([[maybe_unused]] Context& c, Status status) {
- ASSERT_OK(status);
- tp.count = tp.count * 10 + 1;
- c.dispatcher->PostAfter(tp.task_a, 50ms);
- c.dispatcher->PostAfter(tp.task_b, 25ms);
- });
- dispatcher.PostAfter(task1, 100ms);
+TEST(FakeDispatcher, PostAfterWithLaterTimeRunsSooner) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.PostAfter(task, 50ms);
+ dispatcher.PostAfter(task, 100ms);
+ dispatcher.RunFor(60ms);
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- tp.task_a.set_function([&tp]([[maybe_unused]] Context& c, Status status) {
- ASSERT_OK(status);
- tp.count = tp.count * 10 + 3;
- });
- tp.task_b.set_function([&tp]([[maybe_unused]] Context& c, Status status) {
- ASSERT_OK(status);
- tp.count = tp.count * 10 + 2;
- });
+TEST(FakeDispatcher, PostThenPostAfterRunsImmediately) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ dispatcher.PostAfter(task, 50ms);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- dispatcher.RunFor(200ms);
- dispatcher.RequestStop();
+TEST(FakeDispatcher, PostAfterThenPostRunsImmediately) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.PostAfter(task, 50ms);
+ dispatcher.Post(task);
dispatcher.RunUntilIdle();
- ASSERT_EQ(tp.count, 1234);
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
}
-TEST(FakeDispatcher, CancelTasks) {
+TEST(FakeDispatcher, CancelAfterPostStopsTaskFromRunning) {
FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ EXPECT_TRUE(dispatcher.Cancel(task));
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{});
+}
- auto shouldnt_run = []([[maybe_unused]] Context& c,
- [[maybe_unused]] Status status) { FAIL(); };
+TEST(FakeDispatcher, CancelAfterPostAfterStopsTaskFromRunning) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.PostAfter(task, 50ms);
+ EXPECT_TRUE(dispatcher.Cancel(task));
+ dispatcher.RunFor(60ms);
+ EXPECT_EQ(counter.counts, CallCounts{});
+}
- TaskPair tp;
- // This task gets canceled in cancel_task.
- tp.task_a.set_function(shouldnt_run);
- dispatcher.PostAfter(tp.task_a, 40ms);
+TEST(FakeDispatcher, CancelAfterPostAndPostAfterStopsTaskFromRunning) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ dispatcher.PostAfter(task, 50ms);
+ EXPECT_TRUE(dispatcher.Cancel(task));
+ dispatcher.RunFor(60ms);
+ EXPECT_EQ(counter.counts, CallCounts{});
+}
- // This task gets canceled immediately.
- Task task1(shouldnt_run);
- dispatcher.PostAfter(task1, 10ms);
- ASSERT_TRUE(dispatcher.Cancel(task1));
+TEST(FakeDispatcher, PostAgainAfterCancelRuns) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
+ EXPECT_TRUE(dispatcher.Cancel(task));
+ dispatcher.Post(task);
+ dispatcher.RunUntilIdle();
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+}
- // This task cancels the first task.
- Task cancel_task([&tp](Context& c, Status status) {
- ASSERT_OK(status);
- ASSERT_TRUE(c.dispatcher->Cancel(tp.task_a));
- ++tp.count;
- });
- dispatcher.PostAfter(cancel_task, 20ms);
+TEST(FakeDispatcher, CancelWithoutPostReturnsFalse) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ EXPECT_FALSE(dispatcher.Cancel(task));
+}
- dispatcher.RunFor(50ms);
- dispatcher.RequestStop();
+TEST(FakeDispatcher, CancelAfterRunningReturnsFalse) {
+ FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task(counter.fn());
+ dispatcher.Post(task);
dispatcher.RunUntilIdle();
- ASSERT_EQ(tp.count, 1);
+ EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
+ EXPECT_FALSE(dispatcher.Cancel(task));
}
-// Test RequestStop() from inside task.
-TEST(FakeDispatcher, RequestStopInsideTask) {
+TEST(FakeDispatcher, CancelInsideOtherTaskCancelsTaskWithoutRunningIt) {
FakeDispatcher dispatcher;
- int count = 0;
- auto cancelled_cb = [&count]([[maybe_unused]] Context& c, Status status) {
- ASSERT_CANCELLED(status);
- ++count;
- };
+ CallCounter cancelled_task_counter;
+ Task cancelled_task(cancelled_task_counter.fn());
- // These tasks are never executed and cleaned up in RequestStop().
- Task task0(cancelled_cb), task1(cancelled_cb);
- dispatcher.PostAfter(task0, 20ms);
- dispatcher.PostAfter(task1, 21ms);
-
- Task stop_task([&count]([[maybe_unused]] Context& c, Status status) {
+ Task canceling_task([&cancelled_task](Context& c, Status status) {
ASSERT_OK(status);
- ++count;
- static_cast<FakeDispatcher*>(c.dispatcher)->RequestStop();
- static_cast<FakeDispatcher*>(c.dispatcher)->RunUntilIdle();
+ ASSERT_TRUE(c.dispatcher->Cancel(cancelled_task));
});
- dispatcher.Post(stop_task);
+ dispatcher.Post(canceling_task);
+ dispatcher.Post(cancelled_task);
dispatcher.RunUntilIdle();
- ASSERT_EQ(count, 3);
+
+ // NOTE: the cancelled task is *not* run with `Cancel`.
+ // This is likely to produce strange behavior, and this contract should
+ // be revisited and carefully documented.
+ EXPECT_EQ(cancelled_task_counter.counts, CallCounts{});
}
-TEST(FakeDispatcher, PeriodicTasks) {
+TEST(FakeDispatcher, CancelInsideCurrentTaskFails) {
FakeDispatcher dispatcher;
- int count = 0;
- Task periodic_task([&count]([[maybe_unused]] Context& c, Status status) {
+ Task self_cancel_task;
+ self_cancel_task.set_function([&self_cancel_task](Context& c, Status status) {
ASSERT_OK(status);
- ++count;
+ ASSERT_FALSE(c.dispatcher->Cancel(self_cancel_task));
});
- dispatcher.PostPeriodicAt(periodic_task, 20ms, dispatcher.now() + 50ms);
+ dispatcher.Post(self_cancel_task);
+ dispatcher.RunUntilIdle();
+}
+
+TEST(FakeDispatcher, RequestStopInsideOtherTaskCancelsOtherTask) {
+ FakeDispatcher dispatcher;
+
+ // This task is never executed and is cleaned up in RequestStop().
+ CallCounter task_counter;
+ Task task(task_counter.fn());
- // Cancel periodic task after it has run thrice, at +50ms, +70ms, and +90ms.
- Task cancel_task([&periodic_task](Context& c, Status status) {
+ int stop_count = 0;
+ Task stop_task([&stop_count]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
- c.dispatcher->Cancel(periodic_task);
+ stop_count++;
+ static_cast<FakeDispatcher*>(c.dispatcher)->RequestStop();
});
- dispatcher.PostAfter(cancel_task, 100ms);
- dispatcher.RunFor(300ms);
- dispatcher.RequestStop();
+ dispatcher.Post(stop_task);
+ dispatcher.Post(task);
+
dispatcher.RunUntilIdle();
- ASSERT_EQ(count, 3);
+ EXPECT_EQ(stop_count, 1);
+ EXPECT_EQ(task_counter.counts, CallCounts{.cancelled = 1});
}
TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) {
- int count = 0;
- auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
- ASSERT_CANCELLED(status);
- ++count;
- };
- Task task0(inc_count), task1(inc_count), task2(inc_count);
+ CallCounter counter;
+ Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn());
{
FakeDispatcher dispatcher;
@@ -192,25 +332,21 @@ TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) {
dispatcher.PostAfter(task2, 10s);
}
- ASSERT_EQ(count, 3);
+ ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3});
}
TEST(DispatcherBasic, TasksCancelledByRunFor) {
- int count = 0;
- auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
- ASSERT_CANCELLED(status);
- ++count;
- };
- Task task0(inc_count), task1(inc_count), task2(inc_count);
-
FakeDispatcher dispatcher;
+ CallCounter counter;
+ Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn());
dispatcher.PostAfter(task0, 10s);
dispatcher.PostAfter(task1, 10s);
dispatcher.PostAfter(task2, 10s);
dispatcher.RequestStop();
dispatcher.RunFor(5s);
- ASSERT_EQ(count, 3);
+ ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3});
}
+} // namespace
} // namespace pw::async::test
diff --git a/pw_async/fake_dispatcher_test.gni b/pw_async/fake_dispatcher_test.gni
index 33a32ee33..0cc2995e4 100644
--- a/pw_async/fake_dispatcher_test.gni
+++ b/pw_async/fake_dispatcher_test.gni
@@ -33,6 +33,8 @@ template("fake_dispatcher_test") {
pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != "" &&
pw_thread_THREAD_BACKEND != ""
deps = [
+ "$dir_pw_containers:vector",
+ "$dir_pw_string:to_string",
"$dir_pw_sync:timed_thread_notification",
"$dir_pw_thread:thread",
dir_pw_log,
diff --git a/pw_async/heap_dispatcher.cc b/pw_async/heap_dispatcher.cc
new file mode 100644
index 000000000..e9f51a30b
--- /dev/null
+++ b/pw_async/heap_dispatcher.cc
@@ -0,0 +1,66 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_async/heap_dispatcher.h"
+
+#include "pw_async/task.h"
+#include "pw_result/result.h"
+
+namespace pw::async {
+
+namespace {
+
+// TODO: b/277793223 - Optimize to avoid double virtual indirection and double
+// allocation. In situations in which pw::Function is large enough and the
+// captures are small enough, we could eliminate this by reshaping the task as
+// just a pw::Function.
+struct TaskAndFunction {
+ static Result<TaskAndFunction*> New(TaskFunction&& task) {
+ // std::nothrow causes new to return a nullptr on failure instead of
+ // throwing.
+ TaskAndFunction* t = new (std::nothrow) TaskAndFunction();
+ if (!t) {
+ return Status::ResourceExhausted();
+ }
+ t->func = std::move(task);
+
+ // Closure captures must not include references, as that would be UB due to
+ // the `delete` at the end of the function. See
+ // https://reviews.llvm.org/D48239.
+ t->task.set_function([t](Context& ctx, Status status) {
+ t->func(ctx, status);
+
+ // Delete must appear at the very end of this closure to avoid
+ // use-after-free of captures or Context.task.
+ delete t;
+ });
+
+ return t;
+ }
+ Task task;
+ TaskFunction func;
+};
+} // namespace
+
+Status HeapDispatcher::PostAt(TaskFunction&& task_func,
+ chrono::SystemClock::time_point time) {
+ Result<TaskAndFunction*> result = TaskAndFunction::New(std::move(task_func));
+ if (!result.ok()) {
+ return result.status();
+ }
+ dispatcher_.PostAt((*result)->task, time);
+ return Status();
+}
+
+} // namespace pw::async
diff --git a/pw_async/heap_dispatcher.gni b/pw_async/heap_dispatcher.gni
new file mode 100644
index 000000000..674a4fc3a
--- /dev/null
+++ b/pw_async/heap_dispatcher.gni
@@ -0,0 +1,38 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import("//build_overrides/pigweed.gni")
+import("$dir_pw_build/target_types.gni")
+
+# Creates a HeapDispatcher source set for a specified Task backend.
+#
+# Parameters
+#
+# task_backend (required)
+# [target] The Task backend.
+template("pw_async_heap_dispatcher_source_set") {
+ assert(defined(invoker.task_backend))
+
+ pw_source_set(target_name) {
+ public = [ "$dir_pw_async/public/pw_async/heap_dispatcher.h" ]
+ sources = [ "$dir_pw_async/heap_dispatcher.cc" ]
+ deps = [ "$dir_pw_result" ]
+ public_deps = [
+ "$dir_pw_async:dispatcher",
+ "$dir_pw_async:types",
+ invoker.task_backend,
+ ]
+ forward_variables_from(invoker, [ "visibility" ])
+ }
+}
diff --git a/pw_async/public/pw_async/internal/types.h b/pw_async/public/pw_async/context.h
index c8b7ea287..845d11374 100644
--- a/pw_async/public/pw_async/internal/types.h
+++ b/pw_async/public/pw_async/context.h
@@ -21,25 +21,12 @@ namespace pw::async {
class Dispatcher;
class Task;
+/// Contextual information provided by a `Dispatcher` to a running task.
struct Context {
+ /// The `Dispatcher` running the current `Task`.
Dispatcher* dispatcher;
+ /// The current ``Task`` being executed.
Task* task;
};
-// A TaskFunction is a unit of work that is wrapped by a Task and executed on a
-// Dispatcher.
-//
-// TaskFunctions take a `Context` as their first argument. Before executing a
-// Task, the Dispatcher sets the pointer to itself and to the Task in `Context`.
-//
-// TaskFunctions take a `Status` as their second argument. When a Task is
-// running as normal, |status| == PW_STATUS_OK. If a Task will not be able to
-// run as scheduled, the Dispatcher will still invoke the TaskFunction with
-// |status| == PW_STATUS_CANCELLED. This provides an opportunity to reclaim
-// resources held by the Task.
-//
-// A Task will not run as scheduled if, for example, it is still waiting when
-// the Dispatcher shuts down.
-using TaskFunction = Function<void(Context&, Status)>;
-
} // namespace pw::async
diff --git a/pw_async/public/pw_async/dispatcher.h b/pw_async/public/pw_async/dispatcher.h
index 8faaccd67..fb0dfdd13 100644
--- a/pw_async/public/pw_async/dispatcher.h
+++ b/pw_async/public/pw_async/dispatcher.h
@@ -19,39 +19,66 @@ namespace pw::async {
class Task;
-/// Asynchronous Dispatcher abstract class. A default implementation is provided
-/// in pw_async_basic.
+/// Abstract base class for an asynchronous dispatcher loop.
///
-/// Dispatcher implements VirtualSystemClock so the Dispatcher's time can be
-/// injected into other modules under test. This is useful for consistently
-/// simulating time when using FakeDispatcher (rather than using
-/// chrono::SimulatedSystemClock separately).
+/// `Dispatcher`s run many short, non-blocking units of work on a single thread.
+/// This approach has a number of advantages compared with executing concurrent
+/// tasks on separate threads:
+///
+/// - `Dispatcher`s can make more efficient use of system resources, since they
+/// don't need to maintain separate thread stacks.
+/// - `Dispatcher`s can run on systems without thread support, such as no-RTOS
+/// embedded environments.
+/// - `Dispatcher`s allow tasks to communicate with one another without the
+/// synchronization overhead of locks, atomics, fences, or `volatile`.
+///
+/// Thread support: `Dispatcher` methods may be safely invoked from any thread,
+/// but the resulting tasks will always execute on a single thread. Whether
+/// or not methods may be invoked from interrupt context is
+/// implementation-defined.
+///
+/// `VirtualSystemClock`: `Dispatcher` implements `VirtualSystemClock` in order
+/// to provide a consistent source of (possibly mocked) time information to
+/// tasks.
+///
+/// A simple default dispatcher implementation is provided by `pw_async_basic`.
class Dispatcher : public chrono::VirtualSystemClock {
public:
~Dispatcher() override = default;
- /// Post caller owned |task|.
- virtual void Post(Task& task) = 0;
+ /// Post caller-owned |task| to be run on the dispatch loop.
+ ///
+ /// Posted tasks execute in the order they are posted. This ensures that
+ /// tasks can re-post themselves and yield in order to allow other tasks the
+ /// opportunity to execute.
+ ///
+ /// A given |task| must only be posted to a single `Dispatcher`.
+ virtual void Post(Task& task) { PostAt(task, now()); }
/// Post caller owned |task| to be run after |delay|.
- virtual void PostAfter(Task& task, chrono::SystemClock::duration delay) = 0;
+ ///
+ /// If |task| was already posted to run at an earlier time (before |delay|
+ /// would expire), |task| must be run at the earlier time, and |task|
+ /// *may* also be run at the later time.
+ virtual void PostAfter(Task& task, chrono::SystemClock::duration delay) {
+ PostAt(task, now() + delay);
+ }
/// Post caller owned |task| to be run at |time|.
+ ///
+ /// If |task| was already posted to run before |time|,
+ /// |task| must be run at the earlier time, and |task| *may* also be run at
+ /// the later time.
virtual void PostAt(Task& task, chrono::SystemClock::time_point time) = 0;
- /// Post caller owned |task| to be run immediately then rerun at a regular
- /// |interval|.
- virtual void PostPeriodic(Task& task,
- chrono::SystemClock::duration interval) = 0;
- /// Post caller owned |task| to be run at |time| then rerun at a regular
- /// |interval|. |interval| must not be zero.
- virtual void PostPeriodicAt(Task& task,
- chrono::SystemClock::duration interval,
- chrono::SystemClock::time_point time) = 0;
-
- /// Returns true if |task| is succesfully canceled.
- /// If cancelation fails, the task may be running or completed.
- /// Periodic tasks may be posted once more after they are canceled.
+ /// Prevent a `Post`ed task from starting.
+ ///
+ /// Returns:
+ /// true: the task was successfully canceled and will not be run by the
+ /// dispatcher until `Post`ed again.
+ /// false: the task could not be cancelled because it either was not
+ /// posted, already ran, or is currently running on the `Dispatcher`
+ /// thread.
virtual bool Cancel(Task& task) = 0;
};
diff --git a/pw_async/public/pw_async/fake_dispatcher.h b/pw_async/public/pw_async/fake_dispatcher.h
index 43931d032..a1b85ae74 100644
--- a/pw_async/public/pw_async/fake_dispatcher.h
+++ b/pw_async/public/pw_async/fake_dispatcher.h
@@ -18,30 +18,39 @@
namespace pw::async::test {
-/// FakeDispatcher is a facade for an implementation of Dispatcher that is used
-/// in unit tests. FakeDispatcher uses simulated time. RunUntil() and RunFor()
-/// advance time immediately, and now() returns the current simulated time.
+/// `FakeDispatcher` is a `Dispatcher` implementation for use in unit tests.
///
-/// To support various Task backends, FakeDispatcher wraps a
-/// backend::NativeFakeDispatcher that implements standard FakeDispatcher
-/// behavior using backend::NativeTask objects.
+/// Threading: `FakeDispatcher` is *NOT* thread-safe and, unlike other
+/// `Dispatcher` implementations. This means that tasks must not be posted from
+/// multiple threads at once, and tasks cannot be posted from other threads
+/// while the dispatcher is executing.
+///
+/// Time: `FakeDispatcher` uses simulated time. `RunUntil()` and `RunFor()`
+/// advance time immediately, and `now()` returns the current simulated time.
+///
+/// To support various `Task` backends, `FakeDispatcher` wraps a
+/// `backend::NativeFakeDispatcher` that implements standard `FakeDispatcher`
+/// behavior using `backend::NativeTask` objects.
class FakeDispatcher final : public Dispatcher {
public:
FakeDispatcher() : native_dispatcher_(*this) {}
/// Execute all runnable tasks and return without advancing simulated time.
- void RunUntilIdle() { native_dispatcher_.RunUntilIdle(); }
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunUntilIdle() { return native_dispatcher_.RunUntilIdle(); }
/// Run the dispatcher until Now() has reached `end_time`, executing all tasks
/// that come due before then.
- void RunUntil(chrono::SystemClock::time_point end_time) {
- native_dispatcher_.RunUntil(end_time);
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunUntil(chrono::SystemClock::time_point end_time) {
+ return native_dispatcher_.RunUntil(end_time);
}
/// Run the Dispatcher until `duration` has elapsed, executing all tasks that
/// come due in that period.
- void RunFor(chrono::SystemClock::duration duration) {
- native_dispatcher_.RunFor(duration);
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunFor(chrono::SystemClock::duration duration) {
+ return native_dispatcher_.RunFor(duration);
}
/// Stop processing tasks. After calling RequestStop(), the next time the
@@ -57,15 +66,6 @@ class FakeDispatcher final : public Dispatcher {
void PostAt(Task& task, chrono::SystemClock::time_point time) override {
native_dispatcher_.PostAt(task, time);
}
- void PostPeriodic(Task& task,
- chrono::SystemClock::duration interval) override {
- native_dispatcher_.PostPeriodic(task, interval);
- }
- void PostPeriodicAt(Task& task,
- chrono::SystemClock::duration interval,
- chrono::SystemClock::time_point start_time) override {
- native_dispatcher_.PostPeriodicAt(task, interval, start_time);
- }
bool Cancel(Task& task) override { return native_dispatcher_.Cancel(task); }
// VirtualSystemClock overrides:
diff --git a/pw_async/public/pw_async/fake_dispatcher_fixture.h b/pw_async/public/pw_async/fake_dispatcher_fixture.h
index b32efeb92..b8c02c6a6 100644
--- a/pw_async/public/pw_async/fake_dispatcher_fixture.h
+++ b/pw_async/public/pw_async/fake_dispatcher_fixture.h
@@ -28,11 +28,11 @@ namespace pw::async::test {
/// MyClass obj(dispatcher());
///
/// obj.ScheduleSomeTasks();
-/// RunUntilIdle();
+/// EXPECT_TRUE(RunUntilIdle());
/// EXPECT_TRUE(some condition);
///
/// obj.ScheduleTaskToRunIn30Seconds();
-/// RunFor(30s);
+/// EXPECT_TRUE(RunFor(30s));
/// EXPECT_TRUE(task ran);
/// }
/// @endcode
@@ -45,18 +45,21 @@ class FakeDispatcherFixture : public ::testing::Test {
chrono::SystemClock::time_point now() { return dispatcher_.now(); }
/// Dispatches all tasks with due times up until `now()`.
- void RunUntilIdle() { dispatcher_.RunUntilIdle(); }
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunUntilIdle() { return dispatcher_.RunUntilIdle(); }
/// Dispatches all tasks with due times up to `end_time`, progressively
/// advancing the fake clock.
- void RunUntil(chrono::SystemClock::time_point end_time) {
- dispatcher_.RunUntil(end_time);
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunUntil(chrono::SystemClock::time_point end_time) {
+ return dispatcher_.RunUntil(end_time);
}
/// Dispatches all tasks with due times up to `now() + duration`,
/// progressively advancing the fake clock.
- void RunFor(chrono::SystemClock::duration duration) {
- dispatcher_.RunFor(duration);
+ /// Returns true iff any tasks were invoked during the run.
+ bool RunFor(chrono::SystemClock::duration duration) {
+ return dispatcher_.RunFor(duration);
}
private:
diff --git a/pw_async/public/pw_async/function_dispatcher.h b/pw_async/public/pw_async/function_dispatcher.h
new file mode 100644
index 000000000..e3316e10e
--- /dev/null
+++ b/pw_async/public/pw_async/function_dispatcher.h
@@ -0,0 +1,52 @@
+// Copyright 2022 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <utility>
+
+#include "pw_async/dispatcher.h"
+#include "pw_async/task_function.h"
+#include "pw_status/status.h"
+
+namespace pw::async {
+
+/// FunctionDispatcher extends Dispatcher with Post*() methods that take a
+/// TaskFunction instead of a Task. This implies that Tasks are allocated or
+/// are taken from a Task pool. Tasks are owned and managed by the Dispatcher.
+class FunctionDispatcher : public Dispatcher {
+ public:
+ ~FunctionDispatcher() override = default;
+
+ // Prevent hiding of overloaded virtual methods.
+ using Dispatcher::Post;
+ using Dispatcher::PostAfter;
+ using Dispatcher::PostAt;
+
+ /// Post dispatcher owned |task_func| function.
+ virtual Status Post(TaskFunction&& task_func) {
+ return PostAt(std::move(task_func), now());
+ }
+
+ /// Post dispatcher owned |task_func| function to be run after |delay|.
+ virtual Status PostAfter(TaskFunction&& task_func,
+ chrono::SystemClock::duration delay) {
+ return PostAt(std::move(task_func), now() + delay);
+ }
+
+ /// Post dispatcher owned |task_func| function to be run at |time|.
+ virtual Status PostAt(TaskFunction&& task_func,
+ chrono::SystemClock::time_point time) = 0;
+};
+
+} // namespace pw::async
diff --git a/pw_async/public/pw_async/heap_dispatcher.h b/pw_async/public/pw_async/heap_dispatcher.h
new file mode 100644
index 000000000..199aa9059
--- /dev/null
+++ b/pw_async/public/pw_async/heap_dispatcher.h
@@ -0,0 +1,48 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_async/function_dispatcher.h"
+
+namespace pw::async {
+
+/// HeapDispatcher wraps an existing Dispatcher and allocates Task objects on
+/// the heap before posting them to the existing Dispatcher. After Tasks run,
+/// they are automatically freed.
+class HeapDispatcher final : public FunctionDispatcher {
+ public:
+ HeapDispatcher(Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
+ ~HeapDispatcher() override = default;
+
+ // FunctionDispatcher overrides:
+ Status PostAt(TaskFunction&& task_func,
+ chrono::SystemClock::time_point time) override;
+
+ // Dispatcher overrides:
+ inline void PostAt(Task& task,
+ chrono::SystemClock::time_point time) override {
+ return dispatcher_.PostAt(task, time);
+ }
+ inline bool Cancel(Task& task) override { return dispatcher_.Cancel(task); }
+
+ // VirtualSystemClock overrides:
+ inline chrono::SystemClock::time_point now() override {
+ return dispatcher_.now();
+ }
+
+ private:
+ Dispatcher& dispatcher_;
+};
+
+} // namespace pw::async
diff --git a/pw_async/public/pw_async/task.h b/pw_async/public/pw_async/task.h
index c1fa8e665..82618f5cf 100644
--- a/pw_async/public/pw_async/task.h
+++ b/pw_async/public/pw_async/task.h
@@ -15,7 +15,8 @@
#include <optional>
-#include "pw_async/internal/types.h"
+#include "pw_async/context.h"
+#include "pw_async/task_function.h"
#include "pw_async_backend/task.h"
namespace pw::async {
@@ -24,16 +25,16 @@ namespace test {
class FakeDispatcher;
}
-/// A Task represents a unit of work (TaskFunction) that can be executed on a
-/// Dispatcher. To support various Dispatcher backends, it wraps a
+/// A `Task` represents a unit of work (`TaskFunction`) that can be executed on
+/// a `Dispatcher`. To support various `Dispatcher` backends, it wraps a
/// `backend::NativeTask`, which contains backend-specific state and methods.
class Task final {
public:
- /// The default constructor creates a Task without a function.
- /// `set_function()` must be called before posting the Task.
+ /// The default constructor creates a `Task` without a function.
+ /// `set_function()` must be called before posting the `Task`.
Task() : native_type_(*this) {}
- /// Constructs a Task that calls `f` when executed on a Dispatcher.
+ /// Constructs a Task that calls `f` when executed on a `Dispatcher`.
explicit Task(TaskFunction&& f) : native_type_(*this, std::move(f)) {}
Task(const Task&) = delete;
@@ -41,8 +42,8 @@ class Task final {
Task(Task&&) = delete;
Task& operator=(Task&&) = delete;
- /// Configure the TaskFunction after construction. This MUST NOT be called
- /// while this Task is pending in a Dispatcher.
+ /// Configure the `TaskFunction` after construction. This MUST NOT be called
+ /// while this `Task` is pending in a `Dispatcher`.
void set_function(TaskFunction&& f) {
native_type_.set_function(std::move(f));
}
@@ -50,8 +51,8 @@ class Task final {
/// Executes this task.
void operator()(Context& ctx, Status status) { native_type_(ctx, status); }
- /// Returns the inner NativeTask containing backend-specific state. Only
- /// Dispatcher backends or non-portable code should call these methods!
+ /// Returns the inner `NativeTask` containing backend-specific state. Only
+ /// `Dispatcher` backends or non-portable code should call these methods!
backend::NativeTask& native_type() { return native_type_; }
const backend::NativeTask& native_type() const { return native_type_; }
diff --git a/pw_async/public/pw_async/task_function.h b/pw_async/public/pw_async/task_function.h
new file mode 100644
index 000000000..3c4f4706e
--- /dev/null
+++ b/pw_async/public/pw_async/task_function.h
@@ -0,0 +1,39 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_async/context.h"
+#include "pw_function/function.h"
+#include "pw_status/status.h"
+
+namespace pw::async {
+
+/// A `TaskFunction` is a unit of work that is wrapped by a Task and executed on
+/// a `Dispatcher`.
+///
+/// `TaskFunction`s take a `Context` as their first argument. Before executing a
+/// `Task`, the `Dispatcher` sets the pointer to itself and to the `Task` in
+/// `Context`.
+///
+/// `TaskFunction`s take a `Status` as their second argument. When a Task is
+/// running as normal, |status| is `PW_STATUS_OK`. If a `Task` will not be able
+/// to run as scheduled, the `Dispatcher` will still invoke the `TaskFunction`
+/// with |status| `PW_STATUS_CANCELLED`. This provides an opportunity to reclaim
+/// resources held by the Task.
+///
+/// A `Task` will not run as scheduled if, for example, it is still waiting when
+/// the `Dispatcher` shuts down.
+using TaskFunction = Function<void(Context&, Status)>;
+
+} // namespace pw::async