diff options
Diffstat (limited to 'pw_async')
-rw-r--r-- | pw_async/BUILD.bazel | 118 | ||||
-rw-r--r-- | pw_async/BUILD.gn | 38 | ||||
-rw-r--r-- | pw_async/CMakeLists.txt | 55 | ||||
-rw-r--r-- | pw_async/OWNERS | 2 | ||||
-rw-r--r-- | pw_async/backend.cmake | 21 | ||||
-rw-r--r-- | pw_async/docs.rst | 37 | ||||
-rw-r--r-- | pw_async/fake_dispatcher_fixture.gni | 1 | ||||
-rw-r--r-- | pw_async/fake_dispatcher_test.cc | 374 | ||||
-rw-r--r-- | pw_async/fake_dispatcher_test.gni | 2 | ||||
-rw-r--r-- | pw_async/heap_dispatcher.cc | 66 | ||||
-rw-r--r-- | pw_async/heap_dispatcher.gni | 38 | ||||
-rw-r--r-- | pw_async/public/pw_async/context.h (renamed from pw_async/public/pw_async/internal/types.h) | 19 | ||||
-rw-r--r-- | pw_async/public/pw_async/dispatcher.h | 71 | ||||
-rw-r--r-- | pw_async/public/pw_async/fake_dispatcher.h | 40 | ||||
-rw-r--r-- | pw_async/public/pw_async/fake_dispatcher_fixture.h | 17 | ||||
-rw-r--r-- | pw_async/public/pw_async/function_dispatcher.h | 52 | ||||
-rw-r--r-- | pw_async/public/pw_async/heap_dispatcher.h | 48 | ||||
-rw-r--r-- | pw_async/public/pw_async/task.h | 21 | ||||
-rw-r--r-- | pw_async/public/pw_async/task_function.h | 39 |
19 files changed, 849 insertions, 210 deletions
diff --git a/pw_async/BUILD.bazel b/pw_async/BUILD.bazel index 2ae446d6b..4c1481100 100644 --- a/pw_async/BUILD.bazel +++ b/pw_async/BUILD.bazel @@ -12,14 +12,116 @@ # License for the specific language governing permissions and limitations under # the License. -filegroup( - name = "pw_async_files", - srcs = [ - "fake_dispatcher_test.cc", +load( + "//pw_build:pigweed.bzl", + "pw_cc_facade", + "pw_cc_library", + "pw_cc_test", +) + +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +pw_cc_library( + name = "dispatcher", + hdrs = [ "public/pw_async/dispatcher.h", - "public/pw_async/fake_dispatcher.h", - "public/pw_async/fake_dispatcher_fixture.h", - "public/pw_async/internal/types.h", - "public/pw_async/task.h", + "public/pw_async/function_dispatcher.h", + ], + includes = ["public"], + deps = [ + ":types", + "//pw_chrono:system_clock", + "//pw_function", + "//pw_status", + ], +) + +pw_cc_facade( + name = "task_facade", + hdrs = ["public/pw_async/task.h"], + includes = ["public"], + deps = [ + ":types", + "//pw_chrono:system_clock", + "//pw_function", + "//pw_status", + ], +) + +pw_cc_library( + name = "task", + hdrs = ["public/pw_async/task.h"], + includes = ["public"], + deps = [ + ":types", + "//pw_chrono:system_clock", + "//pw_function", + "//pw_status", + "@pigweed//targets:pw_async_task_backend", + ], +) + +pw_cc_library( + name = "types", + hdrs = [ + "public/pw_async/context.h", + "public/pw_async/task_function.h", + ], + includes = ["public"], + deps = [ + "//pw_function", + "//pw_status", + ], +) + +pw_cc_facade( + name = "fake_dispatcher_facade", + hdrs = ["public/pw_async/fake_dispatcher.h"], + includes = ["public"], + deps = [":dispatcher"], +) + +pw_cc_library( + name = "fake_dispatcher", + hdrs = ["public/pw_async/fake_dispatcher.h"], + includes = ["public"], + deps = [ + ":dispatcher", + "@pigweed//targets:pw_async_fake_dispatcher_backend", + ], +) + +pw_cc_test( + name = "fake_dispatcher_test", + srcs = ["fake_dispatcher_test.cc"], + deps = [ + ":fake_dispatcher", + "//pw_containers:vector", + "//pw_log", + "//pw_string:to_string", + "//pw_sync:timed_thread_notification", + "//pw_thread:thread", + ], +) + +pw_cc_library( + name = "fake_dispatcher_fixture", + hdrs = ["public/pw_async/fake_dispatcher_fixture.h"], + includes = ["public"], + deps = [":fake_dispatcher"], +) + +pw_cc_library( + name = "heap_dispatcher", + srcs = ["heap_dispatcher.cc"], + hdrs = ["public/pw_async/heap_dispatcher.h"], + includes = ["public"], + deps = [ + ":dispatcher", + ":task", + ":types", + "//pw_result", ], ) diff --git a/pw_async/BUILD.gn b/pw_async/BUILD.gn index 483b9b5e6..59ccdd046 100644 --- a/pw_async/BUILD.gn +++ b/pw_async/BUILD.gn @@ -18,6 +18,7 @@ import("$dir_pw_async/async.gni") import("$dir_pw_async/backend.gni") import("$dir_pw_async/fake_dispatcher_fixture.gni") import("$dir_pw_async/fake_dispatcher_test.gni") +import("$dir_pw_async/heap_dispatcher.gni") import("$dir_pw_build/facade.gni") import("$dir_pw_build/target_types.gni") import("$dir_pw_chrono/backend.gni") @@ -30,10 +31,15 @@ config("public_include_path") { pw_source_set("dispatcher") { public_configs = [ ":public_include_path" ] public_deps = [ + ":types", "$dir_pw_chrono:system_clock", dir_pw_function, + dir_pw_status, + ] + public = [ + "public/pw_async/dispatcher.h", + "public/pw_async/function_dispatcher.h", ] - public = [ "public/pw_async/dispatcher.h" ] visibility = [ ":*", "$dir_pw_async_basic:*", @@ -44,13 +50,27 @@ pw_facade("task") { backend = pw_async_TASK_BACKEND public_configs = [ ":public_include_path" ] public_deps = [ + ":types", "$dir_pw_chrono:system_clock", dir_pw_function, dir_pw_status, ] + public = [ "public/pw_async/task.h" ] + visibility = [ + ":*", + "$dir_pw_async_basic:*", + ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + +pw_source_set("types") { + public_configs = [ ":public_include_path" ] + public_deps = [ + dir_pw_function, + dir_pw_status, + ] public = [ - "public/pw_async/internal/types.h", - "public/pw_async/task.h", + "public/pw_async/context.h", + "public/pw_async/task_function.h", ] visibility = [ ":*", @@ -77,6 +97,14 @@ fake_dispatcher_fixture("fake_dispatcher_fixture") { ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY } +pw_async_heap_dispatcher_source_set("heap_dispatcher") { + task_backend = ":task" + visibility = [ + ":*", + "$dir_pw_async_basic:*", + ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + pw_test_group("tests") { } @@ -85,10 +113,12 @@ pw_doc_group("docs") { } # Satisfy source_is_in_build_files presubmit step -pw_source_set("fake_dispatcher_test") { +pw_source_set("satisfy_presubmit") { sources = [ "fake_dispatcher_test.cc", + "heap_dispatcher.cc", "public/pw_async/fake_dispatcher_fixture.h", + "public/pw_async/heap_dispatcher.h", ] visibility = [] } diff --git a/pw_async/CMakeLists.txt b/pw_async/CMakeLists.txt new file mode 100644 index 000000000..126a05854 --- /dev/null +++ b/pw_async/CMakeLists.txt @@ -0,0 +1,55 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +include($ENV{PW_ROOT}/pw_build/pigweed.cmake) +include($ENV{PW_ROOT}/pw_async/backend.cmake) + +pw_add_library(pw_async.types INTERFACE + HEADERS + public/pw_async/context.h + public/pw_async/task_function.h + PUBLIC_INCLUDES + public + PUBLIC_DEPS + pw_function + pw_status +) + +pw_add_facade(pw_async.task INTERFACE + BACKEND + pw_async.task_BACKEND + HEADERS + public/pw_async/task.h + PUBLIC_INCLUDES + public + PUBLIC_DEPS + pw_async.types + pw_function + pw_status +) + +pw_add_facade(pw_async.dispatcher INTERFACE + BACKEND + pw_async.dispatcher_BACKEND + HEADERS + public/pw_async/dispatcher.h + public/pw_async/function_dispatcher.h + PUBLIC_INCLUDES + public + PUBLIC_DEPS + pw_async.types + pw_chrono.system_clock + pw_function + pw_status +) diff --git a/pw_async/OWNERS b/pw_async/OWNERS new file mode 100644 index 000000000..95e4875d7 --- /dev/null +++ b/pw_async/OWNERS @@ -0,0 +1,2 @@ +benlawson@google.com +cramertj@google.com diff --git a/pw_async/backend.cmake b/pw_async/backend.cmake new file mode 100644 index 000000000..b9ddc0287 --- /dev/null +++ b/pw_async/backend.cmake @@ -0,0 +1,21 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +include_guard(GLOBAL) + +include($ENV{PW_ROOT}/pw_build/pigweed.cmake) + +# Backend for the pw_async module. +pw_add_backend_variable(pw_async.task_BACKEND) +pw_add_backend_variable(pw_async.dispatcher_BACKEND) diff --git a/pw_async/docs.rst b/pw_async/docs.rst index e1a34e009..8ead20e04 100644 --- a/pw_async/docs.rst +++ b/pw_async/docs.rst @@ -33,6 +33,11 @@ Dispatcher API Task API ============== +.. doxygenstruct:: pw::async::Context + :members: + +.. doxygentypedef:: pw::async::TaskFunction + .. doxygenclass:: pw::async::Task :members: @@ -41,8 +46,17 @@ Facade API Task ---- -The Task type represents a work item that is submitted to a Dispatcher. The Task -facade enables Dispatcher backends to specify custom state and methods. +The ``Task`` type represents a work item that can be submitted to and executed +by a ``Dispatcher``. + +To run work on a ``Dispatcher`` event loop, a ``Task`` can be constructed from +a function or lambda (see ``pw::async::TaskFunction``) and submitted to run +using the ``pw::async::Dispatcher::Post`` method (and its siblings, ``PostAt`` +etc.). + +The ``Task`` facade enables backends to provide custom storage containers for +``Task`` s, as well as to keep per- ``Task`` data alongside the ``TaskFunction`` +(such as ``next`` pointers for intrusive linked-lists of ``Task``). The active Task backend is configured with the GN variable ``pw_async_TASK_BACKEND``. The specified target must define a class @@ -50,6 +64,9 @@ The active Task backend is configured with the GN variable that meets the interface requirements in ``public/pw_async/task.h``. Task will then trivially wrap ``NativeTask``. +The bazel build provides the ``pw_async_task_backend`` label flag to configure +the active Task backend. + FakeDispatcher -------------- The FakeDispatcher facade is a utility for simulating a real Dispatcher @@ -58,13 +75,16 @@ code that uses Dispatcher. FakeDispatcher is a facade instead of a concrete implementation because it depends on Task state for processing tasks, which varies across Task backends. -The active Task backend is configured with the GN variable +The active FakeDispatcher backend is configured with the GN variable ``pw_async_FAKE_DISPATCHER_BACKEND``. The specified target must define a class ``pw::async::test::backend::NativeFakeDispatcher`` in the header ``pw_async_backend/fake_dispatcher.h`` that meets the interface requirements in ``public/pw_async/task.h``. FakeDispatcher will then trivially wrap ``NativeFakeDispatcher``. +The bazel build provides the ``pw_async_fake_dispatcher_backend`` label flag to +configure the FakeDispatcher backend. + Testing FakeDispatcher ^^^^^^^^^^^^^^^^^^^^^^ The GN template ``fake_dispatcher_tests`` in ``fake_dispatcher_tests.gni`` @@ -72,6 +92,16 @@ creates a test target that tests a FakeDispatcher backend. This enables one test suite to be shared across FakeDispatcher backends and ensures conformance. +FunctionDispatcher +------------------ +.. doxygenclass:: pw::async::FunctionDispatcher + :members: + +HeapDispatcher +-------------- +.. doxygenclass:: pw::async::HeapDispatcher + :members: + Design ====== @@ -169,6 +199,5 @@ Roadmap ------- - Stabilize Task cancellation API - Utility for dynamically allocated Tasks -- Bazel support - CMake support - Support for C++20 coroutines diff --git a/pw_async/fake_dispatcher_fixture.gni b/pw_async/fake_dispatcher_fixture.gni index a58be16ac..7e8fda760 100644 --- a/pw_async/fake_dispatcher_fixture.gni +++ b/pw_async/fake_dispatcher_fixture.gni @@ -28,6 +28,7 @@ template("fake_dispatcher_fixture") { assert(defined(invoker.backend)) pw_source_set(target_name) { + testonly = pw_unit_test_TESTONLY public = [ "$dir_pw_async/public/pw_async/fake_dispatcher_fixture.h" ] public_deps = [ "$dir_pw_unit_test", diff --git a/pw_async/fake_dispatcher_test.cc b/pw_async/fake_dispatcher_test.cc index 3674f4cb0..3e5113aa1 100644 --- a/pw_async/fake_dispatcher_test.cc +++ b/pw_async/fake_dispatcher_test.cc @@ -14,176 +14,316 @@ #include "pw_async/fake_dispatcher.h" #include "gtest/gtest.h" -#include "pw_thread/thread.h" -#include "pw_thread_stl/options.h" +#include "pw_containers/vector.h" +#include "pw_string/to_string.h" #define ASSERT_OK(status) ASSERT_EQ(OkStatus(), status) #define ASSERT_CANCELLED(status) ASSERT_EQ(Status::Cancelled(), status) using namespace std::chrono_literals; +struct CallCounts { + int ok = 0; + int cancelled = 0; + bool operator==(const CallCounts& other) const { + return ok == other.ok && cancelled == other.cancelled; + } +}; + +namespace pw { +template <> +StatusWithSize ToString<CallCounts>(const CallCounts& value, + span<char> buffer) { + return string::Format(buffer, + "CallCounts {.ok = %d, .cancelled = %d}", + value.ok, + value.cancelled); +} +} // namespace pw + namespace pw::async::test { +namespace { + +struct CallCounter { + CallCounts counts; + auto fn() { + return [this](Context&, Status status) { + if (status.ok()) { + this->counts.ok++; + } else if (status.IsCancelled()) { + this->counts.cancelled++; + } + }; + } +}; -TEST(FakeDispatcher, PostTasks) { +TEST(FakeDispatcher, UnpostedTasksDontRun) { FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{}); +} - int count = 0; - auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) { - ASSERT_OK(status); - ++count; - }; +TEST(FakeDispatcher, PostedTaskRunsOnce) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - Task task(inc_count); +TEST(FakeDispatcher, TaskPostedTwiceBeforeRunningRunsOnce) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); dispatcher.Post(task); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - Task task2(inc_count); - dispatcher.Post(task2); +TEST(FakeDispatcher, TaskRepostedAfterRunningRunsTwice) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); + dispatcher.Post(task); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 2}); +} - Task task3(inc_count); - dispatcher.Post(task3); +TEST(FakeDispatcher, TwoPostedTasksEachRunOnce) { + FakeDispatcher dispatcher; + CallCounter counter_1; + Task task_1(counter_1.fn()); + CallCounter counter_2; + Task task_2(counter_2.fn()); + dispatcher.Post(task_1); + dispatcher.Post(task_2); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter_1.counts, CallCounts{.ok = 1}); + EXPECT_EQ(counter_2.counts, CallCounts{.ok = 1}); +} - // Should not run; RunUntilIdle() does not advance time. - Task task4([&count]([[maybe_unused]] Context& c, Status status) { - ASSERT_CANCELLED(status); - ++count; - }); - dispatcher.PostAfter(task4, 1ms); +TEST(FakeDispatcher, PostedTasksRunInOrderForFairness) { + FakeDispatcher dispatcher; + pw::Vector<uint8_t, 3> task_run_order; + Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); }); + Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); }); + Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); }); + dispatcher.Post(task_1); + dispatcher.Post(task_2); + dispatcher.Post(task_3); + dispatcher.RunUntilIdle(); + pw::Vector<uint8_t, 3> expected_run_order({1, 2, 3}); + EXPECT_EQ(task_run_order, expected_run_order); +} +TEST(FakeDispatcher, RequestStopQueuesPreviouslyPostedTaskWithCancel) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + dispatcher.RequestStop(); dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1}); +} + +TEST(FakeDispatcher, RequestStopQueuesNewlyPostedTaskWithCancel) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); dispatcher.RequestStop(); + dispatcher.Post(task); dispatcher.RunUntilIdle(); - ASSERT_EQ(count, 4); + EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1}); } -// Lambdas can only capture one ptr worth of memory without allocating, so we -// group the data we want to share between tasks and their containing tests -// inside one struct. -struct TaskPair { - Task task_a; - Task task_b; - int count = 0; -}; +TEST(FakeDispatcher, RunUntilIdleDoesNotRunFutureTask) { + FakeDispatcher dispatcher; + CallCounter counter; + // Should not run; RunUntilIdle() does not advance time. + Task task(counter.fn()); + dispatcher.PostAfter(task, 1ms); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{}); +} -TEST(FakeDispatcher, DelayedTasks) { +TEST(FakeDispatcher, PostAfterRunsTasksInSequence) { FakeDispatcher dispatcher; - TaskPair tp; + pw::Vector<uint8_t, 3> task_run_order; + Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); }); + Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); }); + Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); }); + dispatcher.PostAfter(task_1, 50ms); + dispatcher.PostAfter(task_2, 25ms); + dispatcher.PostAfter(task_3, 100ms); + dispatcher.RunFor(125ms); + pw::Vector<uint8_t, 3> expected_run_order({2, 1, 3}); + EXPECT_EQ(task_run_order, expected_run_order); +} - Task task0([&tp]([[maybe_unused]] Context& c, Status status) { - ASSERT_OK(status); - tp.count = tp.count * 10 + 4; - }); - dispatcher.PostAfter(task0, 200ms); +TEST(FakeDispatcher, PostAfterWithEarlierTimeRunsSooner) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.PostAfter(task, 100ms); + dispatcher.PostAfter(task, 50ms); + dispatcher.RunFor(60ms); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - Task task1([&tp]([[maybe_unused]] Context& c, Status status) { - ASSERT_OK(status); - tp.count = tp.count * 10 + 1; - c.dispatcher->PostAfter(tp.task_a, 50ms); - c.dispatcher->PostAfter(tp.task_b, 25ms); - }); - dispatcher.PostAfter(task1, 100ms); +TEST(FakeDispatcher, PostAfterWithLaterTimeRunsSooner) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.PostAfter(task, 50ms); + dispatcher.PostAfter(task, 100ms); + dispatcher.RunFor(60ms); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - tp.task_a.set_function([&tp]([[maybe_unused]] Context& c, Status status) { - ASSERT_OK(status); - tp.count = tp.count * 10 + 3; - }); - tp.task_b.set_function([&tp]([[maybe_unused]] Context& c, Status status) { - ASSERT_OK(status); - tp.count = tp.count * 10 + 2; - }); +TEST(FakeDispatcher, PostThenPostAfterRunsImmediately) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + dispatcher.PostAfter(task, 50ms); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - dispatcher.RunFor(200ms); - dispatcher.RequestStop(); +TEST(FakeDispatcher, PostAfterThenPostRunsImmediately) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.PostAfter(task, 50ms); + dispatcher.Post(task); dispatcher.RunUntilIdle(); - ASSERT_EQ(tp.count, 1234); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); } -TEST(FakeDispatcher, CancelTasks) { +TEST(FakeDispatcher, CancelAfterPostStopsTaskFromRunning) { FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + EXPECT_TRUE(dispatcher.Cancel(task)); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{}); +} - auto shouldnt_run = []([[maybe_unused]] Context& c, - [[maybe_unused]] Status status) { FAIL(); }; +TEST(FakeDispatcher, CancelAfterPostAfterStopsTaskFromRunning) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.PostAfter(task, 50ms); + EXPECT_TRUE(dispatcher.Cancel(task)); + dispatcher.RunFor(60ms); + EXPECT_EQ(counter.counts, CallCounts{}); +} - TaskPair tp; - // This task gets canceled in cancel_task. - tp.task_a.set_function(shouldnt_run); - dispatcher.PostAfter(tp.task_a, 40ms); +TEST(FakeDispatcher, CancelAfterPostAndPostAfterStopsTaskFromRunning) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + dispatcher.PostAfter(task, 50ms); + EXPECT_TRUE(dispatcher.Cancel(task)); + dispatcher.RunFor(60ms); + EXPECT_EQ(counter.counts, CallCounts{}); +} - // This task gets canceled immediately. - Task task1(shouldnt_run); - dispatcher.PostAfter(task1, 10ms); - ASSERT_TRUE(dispatcher.Cancel(task1)); +TEST(FakeDispatcher, PostAgainAfterCancelRuns) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); + EXPECT_TRUE(dispatcher.Cancel(task)); + dispatcher.Post(task); + dispatcher.RunUntilIdle(); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); +} - // This task cancels the first task. - Task cancel_task([&tp](Context& c, Status status) { - ASSERT_OK(status); - ASSERT_TRUE(c.dispatcher->Cancel(tp.task_a)); - ++tp.count; - }); - dispatcher.PostAfter(cancel_task, 20ms); +TEST(FakeDispatcher, CancelWithoutPostReturnsFalse) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + EXPECT_FALSE(dispatcher.Cancel(task)); +} - dispatcher.RunFor(50ms); - dispatcher.RequestStop(); +TEST(FakeDispatcher, CancelAfterRunningReturnsFalse) { + FakeDispatcher dispatcher; + CallCounter counter; + Task task(counter.fn()); + dispatcher.Post(task); dispatcher.RunUntilIdle(); - ASSERT_EQ(tp.count, 1); + EXPECT_EQ(counter.counts, CallCounts{.ok = 1}); + EXPECT_FALSE(dispatcher.Cancel(task)); } -// Test RequestStop() from inside task. -TEST(FakeDispatcher, RequestStopInsideTask) { +TEST(FakeDispatcher, CancelInsideOtherTaskCancelsTaskWithoutRunningIt) { FakeDispatcher dispatcher; - int count = 0; - auto cancelled_cb = [&count]([[maybe_unused]] Context& c, Status status) { - ASSERT_CANCELLED(status); - ++count; - }; + CallCounter cancelled_task_counter; + Task cancelled_task(cancelled_task_counter.fn()); - // These tasks are never executed and cleaned up in RequestStop(). - Task task0(cancelled_cb), task1(cancelled_cb); - dispatcher.PostAfter(task0, 20ms); - dispatcher.PostAfter(task1, 21ms); - - Task stop_task([&count]([[maybe_unused]] Context& c, Status status) { + Task canceling_task([&cancelled_task](Context& c, Status status) { ASSERT_OK(status); - ++count; - static_cast<FakeDispatcher*>(c.dispatcher)->RequestStop(); - static_cast<FakeDispatcher*>(c.dispatcher)->RunUntilIdle(); + ASSERT_TRUE(c.dispatcher->Cancel(cancelled_task)); }); - dispatcher.Post(stop_task); + dispatcher.Post(canceling_task); + dispatcher.Post(cancelled_task); dispatcher.RunUntilIdle(); - ASSERT_EQ(count, 3); + + // NOTE: the cancelled task is *not* run with `Cancel`. + // This is likely to produce strange behavior, and this contract should + // be revisited and carefully documented. + EXPECT_EQ(cancelled_task_counter.counts, CallCounts{}); } -TEST(FakeDispatcher, PeriodicTasks) { +TEST(FakeDispatcher, CancelInsideCurrentTaskFails) { FakeDispatcher dispatcher; - int count = 0; - Task periodic_task([&count]([[maybe_unused]] Context& c, Status status) { + Task self_cancel_task; + self_cancel_task.set_function([&self_cancel_task](Context& c, Status status) { ASSERT_OK(status); - ++count; + ASSERT_FALSE(c.dispatcher->Cancel(self_cancel_task)); }); - dispatcher.PostPeriodicAt(periodic_task, 20ms, dispatcher.now() + 50ms); + dispatcher.Post(self_cancel_task); + dispatcher.RunUntilIdle(); +} + +TEST(FakeDispatcher, RequestStopInsideOtherTaskCancelsOtherTask) { + FakeDispatcher dispatcher; + + // This task is never executed and is cleaned up in RequestStop(). + CallCounter task_counter; + Task task(task_counter.fn()); - // Cancel periodic task after it has run thrice, at +50ms, +70ms, and +90ms. - Task cancel_task([&periodic_task](Context& c, Status status) { + int stop_count = 0; + Task stop_task([&stop_count]([[maybe_unused]] Context& c, Status status) { ASSERT_OK(status); - c.dispatcher->Cancel(periodic_task); + stop_count++; + static_cast<FakeDispatcher*>(c.dispatcher)->RequestStop(); }); - dispatcher.PostAfter(cancel_task, 100ms); - dispatcher.RunFor(300ms); - dispatcher.RequestStop(); + dispatcher.Post(stop_task); + dispatcher.Post(task); + dispatcher.RunUntilIdle(); - ASSERT_EQ(count, 3); + EXPECT_EQ(stop_count, 1); + EXPECT_EQ(task_counter.counts, CallCounts{.cancelled = 1}); } TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) { - int count = 0; - auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) { - ASSERT_CANCELLED(status); - ++count; - }; - Task task0(inc_count), task1(inc_count), task2(inc_count); + CallCounter counter; + Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn()); { FakeDispatcher dispatcher; @@ -192,25 +332,21 @@ TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) { dispatcher.PostAfter(task2, 10s); } - ASSERT_EQ(count, 3); + ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3}); } TEST(DispatcherBasic, TasksCancelledByRunFor) { - int count = 0; - auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) { - ASSERT_CANCELLED(status); - ++count; - }; - Task task0(inc_count), task1(inc_count), task2(inc_count); - FakeDispatcher dispatcher; + CallCounter counter; + Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn()); dispatcher.PostAfter(task0, 10s); dispatcher.PostAfter(task1, 10s); dispatcher.PostAfter(task2, 10s); dispatcher.RequestStop(); dispatcher.RunFor(5s); - ASSERT_EQ(count, 3); + ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3}); } +} // namespace } // namespace pw::async::test diff --git a/pw_async/fake_dispatcher_test.gni b/pw_async/fake_dispatcher_test.gni index 33a32ee33..0cc2995e4 100644 --- a/pw_async/fake_dispatcher_test.gni +++ b/pw_async/fake_dispatcher_test.gni @@ -33,6 +33,8 @@ template("fake_dispatcher_test") { pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != "" && pw_thread_THREAD_BACKEND != "" deps = [ + "$dir_pw_containers:vector", + "$dir_pw_string:to_string", "$dir_pw_sync:timed_thread_notification", "$dir_pw_thread:thread", dir_pw_log, diff --git a/pw_async/heap_dispatcher.cc b/pw_async/heap_dispatcher.cc new file mode 100644 index 000000000..e9f51a30b --- /dev/null +++ b/pw_async/heap_dispatcher.cc @@ -0,0 +1,66 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_async/heap_dispatcher.h" + +#include "pw_async/task.h" +#include "pw_result/result.h" + +namespace pw::async { + +namespace { + +// TODO: b/277793223 - Optimize to avoid double virtual indirection and double +// allocation. In situations in which pw::Function is large enough and the +// captures are small enough, we could eliminate this by reshaping the task as +// just a pw::Function. +struct TaskAndFunction { + static Result<TaskAndFunction*> New(TaskFunction&& task) { + // std::nothrow causes new to return a nullptr on failure instead of + // throwing. + TaskAndFunction* t = new (std::nothrow) TaskAndFunction(); + if (!t) { + return Status::ResourceExhausted(); + } + t->func = std::move(task); + + // Closure captures must not include references, as that would be UB due to + // the `delete` at the end of the function. See + // https://reviews.llvm.org/D48239. + t->task.set_function([t](Context& ctx, Status status) { + t->func(ctx, status); + + // Delete must appear at the very end of this closure to avoid + // use-after-free of captures or Context.task. + delete t; + }); + + return t; + } + Task task; + TaskFunction func; +}; +} // namespace + +Status HeapDispatcher::PostAt(TaskFunction&& task_func, + chrono::SystemClock::time_point time) { + Result<TaskAndFunction*> result = TaskAndFunction::New(std::move(task_func)); + if (!result.ok()) { + return result.status(); + } + dispatcher_.PostAt((*result)->task, time); + return Status(); +} + +} // namespace pw::async diff --git a/pw_async/heap_dispatcher.gni b/pw_async/heap_dispatcher.gni new file mode 100644 index 000000000..674a4fc3a --- /dev/null +++ b/pw_async/heap_dispatcher.gni @@ -0,0 +1,38 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import("//build_overrides/pigweed.gni") +import("$dir_pw_build/target_types.gni") + +# Creates a HeapDispatcher source set for a specified Task backend. +# +# Parameters +# +# task_backend (required) +# [target] The Task backend. +template("pw_async_heap_dispatcher_source_set") { + assert(defined(invoker.task_backend)) + + pw_source_set(target_name) { + public = [ "$dir_pw_async/public/pw_async/heap_dispatcher.h" ] + sources = [ "$dir_pw_async/heap_dispatcher.cc" ] + deps = [ "$dir_pw_result" ] + public_deps = [ + "$dir_pw_async:dispatcher", + "$dir_pw_async:types", + invoker.task_backend, + ] + forward_variables_from(invoker, [ "visibility" ]) + } +} diff --git a/pw_async/public/pw_async/internal/types.h b/pw_async/public/pw_async/context.h index c8b7ea287..845d11374 100644 --- a/pw_async/public/pw_async/internal/types.h +++ b/pw_async/public/pw_async/context.h @@ -21,25 +21,12 @@ namespace pw::async { class Dispatcher; class Task; +/// Contextual information provided by a `Dispatcher` to a running task. struct Context { + /// The `Dispatcher` running the current `Task`. Dispatcher* dispatcher; + /// The current ``Task`` being executed. Task* task; }; -// A TaskFunction is a unit of work that is wrapped by a Task and executed on a -// Dispatcher. -// -// TaskFunctions take a `Context` as their first argument. Before executing a -// Task, the Dispatcher sets the pointer to itself and to the Task in `Context`. -// -// TaskFunctions take a `Status` as their second argument. When a Task is -// running as normal, |status| == PW_STATUS_OK. If a Task will not be able to -// run as scheduled, the Dispatcher will still invoke the TaskFunction with -// |status| == PW_STATUS_CANCELLED. This provides an opportunity to reclaim -// resources held by the Task. -// -// A Task will not run as scheduled if, for example, it is still waiting when -// the Dispatcher shuts down. -using TaskFunction = Function<void(Context&, Status)>; - } // namespace pw::async diff --git a/pw_async/public/pw_async/dispatcher.h b/pw_async/public/pw_async/dispatcher.h index 8faaccd67..fb0dfdd13 100644 --- a/pw_async/public/pw_async/dispatcher.h +++ b/pw_async/public/pw_async/dispatcher.h @@ -19,39 +19,66 @@ namespace pw::async { class Task; -/// Asynchronous Dispatcher abstract class. A default implementation is provided -/// in pw_async_basic. +/// Abstract base class for an asynchronous dispatcher loop. /// -/// Dispatcher implements VirtualSystemClock so the Dispatcher's time can be -/// injected into other modules under test. This is useful for consistently -/// simulating time when using FakeDispatcher (rather than using -/// chrono::SimulatedSystemClock separately). +/// `Dispatcher`s run many short, non-blocking units of work on a single thread. +/// This approach has a number of advantages compared with executing concurrent +/// tasks on separate threads: +/// +/// - `Dispatcher`s can make more efficient use of system resources, since they +/// don't need to maintain separate thread stacks. +/// - `Dispatcher`s can run on systems without thread support, such as no-RTOS +/// embedded environments. +/// - `Dispatcher`s allow tasks to communicate with one another without the +/// synchronization overhead of locks, atomics, fences, or `volatile`. +/// +/// Thread support: `Dispatcher` methods may be safely invoked from any thread, +/// but the resulting tasks will always execute on a single thread. Whether +/// or not methods may be invoked from interrupt context is +/// implementation-defined. +/// +/// `VirtualSystemClock`: `Dispatcher` implements `VirtualSystemClock` in order +/// to provide a consistent source of (possibly mocked) time information to +/// tasks. +/// +/// A simple default dispatcher implementation is provided by `pw_async_basic`. class Dispatcher : public chrono::VirtualSystemClock { public: ~Dispatcher() override = default; - /// Post caller owned |task|. - virtual void Post(Task& task) = 0; + /// Post caller-owned |task| to be run on the dispatch loop. + /// + /// Posted tasks execute in the order they are posted. This ensures that + /// tasks can re-post themselves and yield in order to allow other tasks the + /// opportunity to execute. + /// + /// A given |task| must only be posted to a single `Dispatcher`. + virtual void Post(Task& task) { PostAt(task, now()); } /// Post caller owned |task| to be run after |delay|. - virtual void PostAfter(Task& task, chrono::SystemClock::duration delay) = 0; + /// + /// If |task| was already posted to run at an earlier time (before |delay| + /// would expire), |task| must be run at the earlier time, and |task| + /// *may* also be run at the later time. + virtual void PostAfter(Task& task, chrono::SystemClock::duration delay) { + PostAt(task, now() + delay); + } /// Post caller owned |task| to be run at |time|. + /// + /// If |task| was already posted to run before |time|, + /// |task| must be run at the earlier time, and |task| *may* also be run at + /// the later time. virtual void PostAt(Task& task, chrono::SystemClock::time_point time) = 0; - /// Post caller owned |task| to be run immediately then rerun at a regular - /// |interval|. - virtual void PostPeriodic(Task& task, - chrono::SystemClock::duration interval) = 0; - /// Post caller owned |task| to be run at |time| then rerun at a regular - /// |interval|. |interval| must not be zero. - virtual void PostPeriodicAt(Task& task, - chrono::SystemClock::duration interval, - chrono::SystemClock::time_point time) = 0; - - /// Returns true if |task| is succesfully canceled. - /// If cancelation fails, the task may be running or completed. - /// Periodic tasks may be posted once more after they are canceled. + /// Prevent a `Post`ed task from starting. + /// + /// Returns: + /// true: the task was successfully canceled and will not be run by the + /// dispatcher until `Post`ed again. + /// false: the task could not be cancelled because it either was not + /// posted, already ran, or is currently running on the `Dispatcher` + /// thread. virtual bool Cancel(Task& task) = 0; }; diff --git a/pw_async/public/pw_async/fake_dispatcher.h b/pw_async/public/pw_async/fake_dispatcher.h index 43931d032..a1b85ae74 100644 --- a/pw_async/public/pw_async/fake_dispatcher.h +++ b/pw_async/public/pw_async/fake_dispatcher.h @@ -18,30 +18,39 @@ namespace pw::async::test { -/// FakeDispatcher is a facade for an implementation of Dispatcher that is used -/// in unit tests. FakeDispatcher uses simulated time. RunUntil() and RunFor() -/// advance time immediately, and now() returns the current simulated time. +/// `FakeDispatcher` is a `Dispatcher` implementation for use in unit tests. /// -/// To support various Task backends, FakeDispatcher wraps a -/// backend::NativeFakeDispatcher that implements standard FakeDispatcher -/// behavior using backend::NativeTask objects. +/// Threading: `FakeDispatcher` is *NOT* thread-safe and, unlike other +/// `Dispatcher` implementations. This means that tasks must not be posted from +/// multiple threads at once, and tasks cannot be posted from other threads +/// while the dispatcher is executing. +/// +/// Time: `FakeDispatcher` uses simulated time. `RunUntil()` and `RunFor()` +/// advance time immediately, and `now()` returns the current simulated time. +/// +/// To support various `Task` backends, `FakeDispatcher` wraps a +/// `backend::NativeFakeDispatcher` that implements standard `FakeDispatcher` +/// behavior using `backend::NativeTask` objects. class FakeDispatcher final : public Dispatcher { public: FakeDispatcher() : native_dispatcher_(*this) {} /// Execute all runnable tasks and return without advancing simulated time. - void RunUntilIdle() { native_dispatcher_.RunUntilIdle(); } + /// Returns true iff any tasks were invoked during the run. + bool RunUntilIdle() { return native_dispatcher_.RunUntilIdle(); } /// Run the dispatcher until Now() has reached `end_time`, executing all tasks /// that come due before then. - void RunUntil(chrono::SystemClock::time_point end_time) { - native_dispatcher_.RunUntil(end_time); + /// Returns true iff any tasks were invoked during the run. + bool RunUntil(chrono::SystemClock::time_point end_time) { + return native_dispatcher_.RunUntil(end_time); } /// Run the Dispatcher until `duration` has elapsed, executing all tasks that /// come due in that period. - void RunFor(chrono::SystemClock::duration duration) { - native_dispatcher_.RunFor(duration); + /// Returns true iff any tasks were invoked during the run. + bool RunFor(chrono::SystemClock::duration duration) { + return native_dispatcher_.RunFor(duration); } /// Stop processing tasks. After calling RequestStop(), the next time the @@ -57,15 +66,6 @@ class FakeDispatcher final : public Dispatcher { void PostAt(Task& task, chrono::SystemClock::time_point time) override { native_dispatcher_.PostAt(task, time); } - void PostPeriodic(Task& task, - chrono::SystemClock::duration interval) override { - native_dispatcher_.PostPeriodic(task, interval); - } - void PostPeriodicAt(Task& task, - chrono::SystemClock::duration interval, - chrono::SystemClock::time_point start_time) override { - native_dispatcher_.PostPeriodicAt(task, interval, start_time); - } bool Cancel(Task& task) override { return native_dispatcher_.Cancel(task); } // VirtualSystemClock overrides: diff --git a/pw_async/public/pw_async/fake_dispatcher_fixture.h b/pw_async/public/pw_async/fake_dispatcher_fixture.h index b32efeb92..b8c02c6a6 100644 --- a/pw_async/public/pw_async/fake_dispatcher_fixture.h +++ b/pw_async/public/pw_async/fake_dispatcher_fixture.h @@ -28,11 +28,11 @@ namespace pw::async::test { /// MyClass obj(dispatcher()); /// /// obj.ScheduleSomeTasks(); -/// RunUntilIdle(); +/// EXPECT_TRUE(RunUntilIdle()); /// EXPECT_TRUE(some condition); /// /// obj.ScheduleTaskToRunIn30Seconds(); -/// RunFor(30s); +/// EXPECT_TRUE(RunFor(30s)); /// EXPECT_TRUE(task ran); /// } /// @endcode @@ -45,18 +45,21 @@ class FakeDispatcherFixture : public ::testing::Test { chrono::SystemClock::time_point now() { return dispatcher_.now(); } /// Dispatches all tasks with due times up until `now()`. - void RunUntilIdle() { dispatcher_.RunUntilIdle(); } + /// Returns true iff any tasks were invoked during the run. + bool RunUntilIdle() { return dispatcher_.RunUntilIdle(); } /// Dispatches all tasks with due times up to `end_time`, progressively /// advancing the fake clock. - void RunUntil(chrono::SystemClock::time_point end_time) { - dispatcher_.RunUntil(end_time); + /// Returns true iff any tasks were invoked during the run. + bool RunUntil(chrono::SystemClock::time_point end_time) { + return dispatcher_.RunUntil(end_time); } /// Dispatches all tasks with due times up to `now() + duration`, /// progressively advancing the fake clock. - void RunFor(chrono::SystemClock::duration duration) { - dispatcher_.RunFor(duration); + /// Returns true iff any tasks were invoked during the run. + bool RunFor(chrono::SystemClock::duration duration) { + return dispatcher_.RunFor(duration); } private: diff --git a/pw_async/public/pw_async/function_dispatcher.h b/pw_async/public/pw_async/function_dispatcher.h new file mode 100644 index 000000000..e3316e10e --- /dev/null +++ b/pw_async/public/pw_async/function_dispatcher.h @@ -0,0 +1,52 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include <utility> + +#include "pw_async/dispatcher.h" +#include "pw_async/task_function.h" +#include "pw_status/status.h" + +namespace pw::async { + +/// FunctionDispatcher extends Dispatcher with Post*() methods that take a +/// TaskFunction instead of a Task. This implies that Tasks are allocated or +/// are taken from a Task pool. Tasks are owned and managed by the Dispatcher. +class FunctionDispatcher : public Dispatcher { + public: + ~FunctionDispatcher() override = default; + + // Prevent hiding of overloaded virtual methods. + using Dispatcher::Post; + using Dispatcher::PostAfter; + using Dispatcher::PostAt; + + /// Post dispatcher owned |task_func| function. + virtual Status Post(TaskFunction&& task_func) { + return PostAt(std::move(task_func), now()); + } + + /// Post dispatcher owned |task_func| function to be run after |delay|. + virtual Status PostAfter(TaskFunction&& task_func, + chrono::SystemClock::duration delay) { + return PostAt(std::move(task_func), now() + delay); + } + + /// Post dispatcher owned |task_func| function to be run at |time|. + virtual Status PostAt(TaskFunction&& task_func, + chrono::SystemClock::time_point time) = 0; +}; + +} // namespace pw::async diff --git a/pw_async/public/pw_async/heap_dispatcher.h b/pw_async/public/pw_async/heap_dispatcher.h new file mode 100644 index 000000000..199aa9059 --- /dev/null +++ b/pw_async/public/pw_async/heap_dispatcher.h @@ -0,0 +1,48 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_async/function_dispatcher.h" + +namespace pw::async { + +/// HeapDispatcher wraps an existing Dispatcher and allocates Task objects on +/// the heap before posting them to the existing Dispatcher. After Tasks run, +/// they are automatically freed. +class HeapDispatcher final : public FunctionDispatcher { + public: + HeapDispatcher(Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + ~HeapDispatcher() override = default; + + // FunctionDispatcher overrides: + Status PostAt(TaskFunction&& task_func, + chrono::SystemClock::time_point time) override; + + // Dispatcher overrides: + inline void PostAt(Task& task, + chrono::SystemClock::time_point time) override { + return dispatcher_.PostAt(task, time); + } + inline bool Cancel(Task& task) override { return dispatcher_.Cancel(task); } + + // VirtualSystemClock overrides: + inline chrono::SystemClock::time_point now() override { + return dispatcher_.now(); + } + + private: + Dispatcher& dispatcher_; +}; + +} // namespace pw::async diff --git a/pw_async/public/pw_async/task.h b/pw_async/public/pw_async/task.h index c1fa8e665..82618f5cf 100644 --- a/pw_async/public/pw_async/task.h +++ b/pw_async/public/pw_async/task.h @@ -15,7 +15,8 @@ #include <optional> -#include "pw_async/internal/types.h" +#include "pw_async/context.h" +#include "pw_async/task_function.h" #include "pw_async_backend/task.h" namespace pw::async { @@ -24,16 +25,16 @@ namespace test { class FakeDispatcher; } -/// A Task represents a unit of work (TaskFunction) that can be executed on a -/// Dispatcher. To support various Dispatcher backends, it wraps a +/// A `Task` represents a unit of work (`TaskFunction`) that can be executed on +/// a `Dispatcher`. To support various `Dispatcher` backends, it wraps a /// `backend::NativeTask`, which contains backend-specific state and methods. class Task final { public: - /// The default constructor creates a Task without a function. - /// `set_function()` must be called before posting the Task. + /// The default constructor creates a `Task` without a function. + /// `set_function()` must be called before posting the `Task`. Task() : native_type_(*this) {} - /// Constructs a Task that calls `f` when executed on a Dispatcher. + /// Constructs a Task that calls `f` when executed on a `Dispatcher`. explicit Task(TaskFunction&& f) : native_type_(*this, std::move(f)) {} Task(const Task&) = delete; @@ -41,8 +42,8 @@ class Task final { Task(Task&&) = delete; Task& operator=(Task&&) = delete; - /// Configure the TaskFunction after construction. This MUST NOT be called - /// while this Task is pending in a Dispatcher. + /// Configure the `TaskFunction` after construction. This MUST NOT be called + /// while this `Task` is pending in a `Dispatcher`. void set_function(TaskFunction&& f) { native_type_.set_function(std::move(f)); } @@ -50,8 +51,8 @@ class Task final { /// Executes this task. void operator()(Context& ctx, Status status) { native_type_(ctx, status); } - /// Returns the inner NativeTask containing backend-specific state. Only - /// Dispatcher backends or non-portable code should call these methods! + /// Returns the inner `NativeTask` containing backend-specific state. Only + /// `Dispatcher` backends or non-portable code should call these methods! backend::NativeTask& native_type() { return native_type_; } const backend::NativeTask& native_type() const { return native_type_; } diff --git a/pw_async/public/pw_async/task_function.h b/pw_async/public/pw_async/task_function.h new file mode 100644 index 000000000..3c4f4706e --- /dev/null +++ b/pw_async/public/pw_async/task_function.h @@ -0,0 +1,39 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_async/context.h" +#include "pw_function/function.h" +#include "pw_status/status.h" + +namespace pw::async { + +/// A `TaskFunction` is a unit of work that is wrapped by a Task and executed on +/// a `Dispatcher`. +/// +/// `TaskFunction`s take a `Context` as their first argument. Before executing a +/// `Task`, the `Dispatcher` sets the pointer to itself and to the `Task` in +/// `Context`. +/// +/// `TaskFunction`s take a `Status` as their second argument. When a Task is +/// running as normal, |status| is `PW_STATUS_OK`. If a `Task` will not be able +/// to run as scheduled, the `Dispatcher` will still invoke the `TaskFunction` +/// with |status| `PW_STATUS_CANCELLED`. This provides an opportunity to reclaim +/// resources held by the Task. +/// +/// A `Task` will not run as scheduled if, for example, it is still waiting when +/// the `Dispatcher` shuts down. +using TaskFunction = Function<void(Context&, Status)>; + +} // namespace pw::async |