diff options
author | Ryan Keane <rwkeane@google.com> | 2019-10-04 14:34:38 -0700 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2019-10-04 21:43:52 +0000 |
commit | ecca5d148a59121159b8fb436a9b0b47fd2b79ad (patch) | |
tree | b2ca8d9b040a70f977f6ed078fbe676415206cd4 | |
parent | 05eb540d5e18b4e80a1b4ebf801ec50ed9bdae88 (diff) | |
download | openscreen-ecca5d148a59121159b8fb436a9b0b47fd2b79ad.tar.gz |
Unify Networking Thread: Class Definiton
We have multiple classes that must execute wait loops on the networking
thread:
- SocketHandleWaiter (to watch for new TlsConnections and for
UdpSocket data)
- TlsDataRouter (to watch for TlsConnection data to read + write)
This class provides a mechanism to integrate these two classes into a
single wait loop
Change-Id: Ic242f298bb0b07594c7ec7a745777f0f4e9a92bc
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1834106
Commit-Queue: Ryan Keane <rwkeane@google.com>
Reviewed-by: Max Yakimakha <yakimakha@chromium.org>
Reviewed-by: mark a. foltz <mfoltz@chromium.org>
-rw-r--r-- | util/BUILD.gn | 3 | ||||
-rw-r--r-- | util/operation_loop.cc | 57 | ||||
-rw-r--r-- | util/operation_loop.h | 76 | ||||
-rw-r--r-- | util/operation_loop_unittest.cc | 42 |
4 files changed, 178 insertions, 0 deletions
diff --git a/util/BUILD.gn b/util/BUILD.gn index b653a856..1c4c6625 100644 --- a/util/BUILD.gn +++ b/util/BUILD.gn @@ -23,6 +23,8 @@ source_set("util") { "json/json_reader.h", "json/json_writer.cc", "json/json_writer.h", + "operation_loop.cc", + "operation_loop.h", "saturate_cast.h", "serial_delete_ptr.h", "std_util.h", @@ -52,6 +54,7 @@ source_set("unittests") { "integer_division_unittest.cc", "json/json_reader_unittest.cc", "json/json_writer_unittest.cc", + "operation_loop_unittest.cc", "saturate_cast_unittest.cc", "serial_delete_ptr_unittest.cc", "yet_another_bit_vector_unittest.cc", diff --git a/util/operation_loop.cc b/util/operation_loop.cc new file mode 100644 index 00000000..f35b39d3 --- /dev/null +++ b/util/operation_loop.cc @@ -0,0 +1,57 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#include "util/operation_loop.h" + +#include <algorithm> + +#include "platform/api/logging.h" + +namespace openscreen { + +OperationLoop::OperationLoop(std::vector<OperationWithTimeout> operations, + Clock::duration timeout, + Clock::duration min_loop_execution_time) + : perform_all_operations_min_execution_time_(min_loop_execution_time), + operation_timeout_(timeout), + operations_(operations) { + OSP_DCHECK(operations_.size()); +} + +void OperationLoop::RunUntilStopped() { + OSP_CHECK(!is_running_.exchange(true)); + + while (is_running_.load()) { + PerformAllOperations(); + } +} + +void OperationLoop::RequestStopSoon() { + { + std::unique_lock<std::mutex> lock(wait_mutex_); + is_running_.store(false); + } + + perform_all_operations_waiter_.notify_all(); +} + +void OperationLoop::PerformAllOperations() { + auto start_time = Clock::now(); + + for (OperationWithTimeout operation : operations_) { + if (is_running_.load()) { + operation(operation_timeout_); + } + } + + const auto duration = Clock::now() - start_time; + const auto remaining_duration = + perform_all_operations_min_execution_time_ - duration; + if (remaining_duration > Clock::duration{0} && is_running_.load()) { + std::unique_lock<std::mutex> lock(wait_mutex_); + perform_all_operations_waiter_.wait_for( + lock, remaining_duration, [this]() { return !is_running_.load(); }); + } +} + +} // namespace openscreen diff --git a/util/operation_loop.h b/util/operation_loop.h new file mode 100644 index 00000000..155fe078 --- /dev/null +++ b/util/operation_loop.h @@ -0,0 +1,76 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef UTIL_OPERATION_LOOP_H_ +#define UTIL_OPERATION_LOOP_H_ + +#include <atomic> +#include <condition_variable> +#include <functional> +#include <vector> + +#include "platform/api/time.h" +#include "platform/base/macros.h" + +namespace openscreen { + +using Clock = platform::Clock; + +class OperationLoop { + public: + using OperationWithTimeout = std::function<void(Clock::duration)>; + + // Creates a new OperationLoop from a variable number of operations. The + // provided functions will be called repeatedly, at a minimum interval equal + // to min_loop_execution_time, and are expected to exit after the time period + // provided to their call has passed. This is because some operations may not + // be safe to be interrupted from this class. + // NOTE: If n operations are provided with operation timeout T, each iteration + // of the operation loop may take as long as n * T, and will not exit after + // min_loop_execution_time has elapsed. In order to avoid this behavior, the + // caller can set min_loop_execution_time = n * T. + // + // operations = Functions to execute repeatedly. All functions are expected to + // be valid the duration of this object's lifetime. + // timeout = Timeout for each individual function above. + // min_loop_execution_time = Minimum time that OperationLoop should wait + // before successive calls to members of the + // provided operations vector. + OperationLoop(std::vector<OperationWithTimeout> operations, + Clock::duration timeout, + Clock::duration min_loop_execution_time); + + // Runs the PerformAllOperations function in a loop until the below + // RequestStopSoon function is called. + void RunUntilStopped(); + + // Signals for the RunUntilStopped loop to cease running. + void RequestStopSoon(); + + OSP_DISALLOW_COPY_AND_ASSIGN(OperationLoop); + + private: + // Performs all operations which have been provided to this instance. + void PerformAllOperations(); + + const Clock::duration perform_all_operations_min_execution_time_; + + const Clock::duration operation_timeout_; + + // Used to wait in PerformAllOperations() if not enough time has elapsed. + std::condition_variable perform_all_operations_waiter_; + + // Mutex used by the above condition_variable. + std::mutex wait_mutex_; + + // Represents whether this instance is currently "running". + std::atomic_bool is_running_{false}; + + // Operations currently being run by this object. + const std::vector<OperationWithTimeout> operations_; +}; + +} // namespace openscreen + +#endif // UTIL_OPERATION_LOOP_H_ diff --git a/util/operation_loop_unittest.cc b/util/operation_loop_unittest.cc new file mode 100644 index 00000000..4b4ea5df --- /dev/null +++ b/util/operation_loop_unittest.cc @@ -0,0 +1,42 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "util/operation_loop.h" + +#include <atomic> +#include <chrono> +#include <thread> + +#include "gtest/gtest.h" + +namespace openscreen { + +TEST(OperationsLoopTest, PerformAllOperationsWaits) { + constexpr Clock::duration kTimeout{0}; + constexpr Clock::duration kMinRuntime{500}; + + const auto start_time = Clock::now(); + std::atomic<Clock::time_point> last_run{start_time}; + std::atomic<Clock::time_point> current_run{start_time}; + std::function<void(Clock::duration)> test_function = + [last = &last_run, current = ¤t_run](Clock::duration timeout) { + last->store(current->load()); + current->store(Clock::now()); + }; + OperationLoop loop({test_function}, kTimeout, kMinRuntime); + + std::thread run_loop([&loop]() { loop.RunUntilStopped(); }); + + while (last_run.load() == start_time) { + } + + loop.RequestStopSoon(); + run_loop.join(); + + EXPECT_GE( + std::chrono::nanoseconds(current_run.load() - last_run.load()).count(), + std::chrono::nanoseconds(kMinRuntime).count()); +} + +} // namespace openscreen |