aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Keane <rwkeane@google.com>2019-10-04 14:34:38 -0700
committerCommit Bot <commit-bot@chromium.org>2019-10-04 21:43:52 +0000
commitecca5d148a59121159b8fb436a9b0b47fd2b79ad (patch)
treeb2ca8d9b040a70f977f6ed078fbe676415206cd4
parent05eb540d5e18b4e80a1b4ebf801ec50ed9bdae88 (diff)
downloadopenscreen-ecca5d148a59121159b8fb436a9b0b47fd2b79ad.tar.gz
Unify Networking Thread: Class Definiton
We have multiple classes that must execute wait loops on the networking thread: - SocketHandleWaiter (to watch for new TlsConnections and for UdpSocket data) - TlsDataRouter (to watch for TlsConnection data to read + write) This class provides a mechanism to integrate these two classes into a single wait loop Change-Id: Ic242f298bb0b07594c7ec7a745777f0f4e9a92bc Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1834106 Commit-Queue: Ryan Keane <rwkeane@google.com> Reviewed-by: Max Yakimakha <yakimakha@chromium.org> Reviewed-by: mark a. foltz <mfoltz@chromium.org>
-rw-r--r--util/BUILD.gn3
-rw-r--r--util/operation_loop.cc57
-rw-r--r--util/operation_loop.h76
-rw-r--r--util/operation_loop_unittest.cc42
4 files changed, 178 insertions, 0 deletions
diff --git a/util/BUILD.gn b/util/BUILD.gn
index b653a856..1c4c6625 100644
--- a/util/BUILD.gn
+++ b/util/BUILD.gn
@@ -23,6 +23,8 @@ source_set("util") {
"json/json_reader.h",
"json/json_writer.cc",
"json/json_writer.h",
+ "operation_loop.cc",
+ "operation_loop.h",
"saturate_cast.h",
"serial_delete_ptr.h",
"std_util.h",
@@ -52,6 +54,7 @@ source_set("unittests") {
"integer_division_unittest.cc",
"json/json_reader_unittest.cc",
"json/json_writer_unittest.cc",
+ "operation_loop_unittest.cc",
"saturate_cast_unittest.cc",
"serial_delete_ptr_unittest.cc",
"yet_another_bit_vector_unittest.cc",
diff --git a/util/operation_loop.cc b/util/operation_loop.cc
new file mode 100644
index 00000000..f35b39d3
--- /dev/null
+++ b/util/operation_loop.cc
@@ -0,0 +1,57 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "util/operation_loop.h"
+
+#include <algorithm>
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+
+OperationLoop::OperationLoop(std::vector<OperationWithTimeout> operations,
+ Clock::duration timeout,
+ Clock::duration min_loop_execution_time)
+ : perform_all_operations_min_execution_time_(min_loop_execution_time),
+ operation_timeout_(timeout),
+ operations_(operations) {
+ OSP_DCHECK(operations_.size());
+}
+
+void OperationLoop::RunUntilStopped() {
+ OSP_CHECK(!is_running_.exchange(true));
+
+ while (is_running_.load()) {
+ PerformAllOperations();
+ }
+}
+
+void OperationLoop::RequestStopSoon() {
+ {
+ std::unique_lock<std::mutex> lock(wait_mutex_);
+ is_running_.store(false);
+ }
+
+ perform_all_operations_waiter_.notify_all();
+}
+
+void OperationLoop::PerformAllOperations() {
+ auto start_time = Clock::now();
+
+ for (OperationWithTimeout operation : operations_) {
+ if (is_running_.load()) {
+ operation(operation_timeout_);
+ }
+ }
+
+ const auto duration = Clock::now() - start_time;
+ const auto remaining_duration =
+ perform_all_operations_min_execution_time_ - duration;
+ if (remaining_duration > Clock::duration{0} && is_running_.load()) {
+ std::unique_lock<std::mutex> lock(wait_mutex_);
+ perform_all_operations_waiter_.wait_for(
+ lock, remaining_duration, [this]() { return !is_running_.load(); });
+ }
+}
+
+} // namespace openscreen
diff --git a/util/operation_loop.h b/util/operation_loop.h
new file mode 100644
index 00000000..155fe078
--- /dev/null
+++ b/util/operation_loop.h
@@ -0,0 +1,76 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef UTIL_OPERATION_LOOP_H_
+#define UTIL_OPERATION_LOOP_H_
+
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <vector>
+
+#include "platform/api/time.h"
+#include "platform/base/macros.h"
+
+namespace openscreen {
+
+using Clock = platform::Clock;
+
+class OperationLoop {
+ public:
+ using OperationWithTimeout = std::function<void(Clock::duration)>;
+
+ // Creates a new OperationLoop from a variable number of operations. The
+ // provided functions will be called repeatedly, at a minimum interval equal
+ // to min_loop_execution_time, and are expected to exit after the time period
+ // provided to their call has passed. This is because some operations may not
+ // be safe to be interrupted from this class.
+ // NOTE: If n operations are provided with operation timeout T, each iteration
+ // of the operation loop may take as long as n * T, and will not exit after
+ // min_loop_execution_time has elapsed. In order to avoid this behavior, the
+ // caller can set min_loop_execution_time = n * T.
+ //
+ // operations = Functions to execute repeatedly. All functions are expected to
+ // be valid the duration of this object's lifetime.
+ // timeout = Timeout for each individual function above.
+ // min_loop_execution_time = Minimum time that OperationLoop should wait
+ // before successive calls to members of the
+ // provided operations vector.
+ OperationLoop(std::vector<OperationWithTimeout> operations,
+ Clock::duration timeout,
+ Clock::duration min_loop_execution_time);
+
+ // Runs the PerformAllOperations function in a loop until the below
+ // RequestStopSoon function is called.
+ void RunUntilStopped();
+
+ // Signals for the RunUntilStopped loop to cease running.
+ void RequestStopSoon();
+
+ OSP_DISALLOW_COPY_AND_ASSIGN(OperationLoop);
+
+ private:
+ // Performs all operations which have been provided to this instance.
+ void PerformAllOperations();
+
+ const Clock::duration perform_all_operations_min_execution_time_;
+
+ const Clock::duration operation_timeout_;
+
+ // Used to wait in PerformAllOperations() if not enough time has elapsed.
+ std::condition_variable perform_all_operations_waiter_;
+
+ // Mutex used by the above condition_variable.
+ std::mutex wait_mutex_;
+
+ // Represents whether this instance is currently "running".
+ std::atomic_bool is_running_{false};
+
+ // Operations currently being run by this object.
+ const std::vector<OperationWithTimeout> operations_;
+};
+
+} // namespace openscreen
+
+#endif // UTIL_OPERATION_LOOP_H_
diff --git a/util/operation_loop_unittest.cc b/util/operation_loop_unittest.cc
new file mode 100644
index 00000000..4b4ea5df
--- /dev/null
+++ b/util/operation_loop_unittest.cc
@@ -0,0 +1,42 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "util/operation_loop.h"
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+
+#include "gtest/gtest.h"
+
+namespace openscreen {
+
+TEST(OperationsLoopTest, PerformAllOperationsWaits) {
+ constexpr Clock::duration kTimeout{0};
+ constexpr Clock::duration kMinRuntime{500};
+
+ const auto start_time = Clock::now();
+ std::atomic<Clock::time_point> last_run{start_time};
+ std::atomic<Clock::time_point> current_run{start_time};
+ std::function<void(Clock::duration)> test_function =
+ [last = &last_run, current = &current_run](Clock::duration timeout) {
+ last->store(current->load());
+ current->store(Clock::now());
+ };
+ OperationLoop loop({test_function}, kTimeout, kMinRuntime);
+
+ std::thread run_loop([&loop]() { loop.RunUntilStopped(); });
+
+ while (last_run.load() == start_time) {
+ }
+
+ loop.RequestStopSoon();
+ run_loop.join();
+
+ EXPECT_GE(
+ std::chrono::nanoseconds(current_run.load() - last_run.load()).count(),
+ std::chrono::nanoseconds(kMinRuntime).count());
+}
+
+} // namespace openscreen