aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbtolsch <btolsch@chromium.org>2021-03-31 17:28:06 -0700
committerCommit Bot <commit-bot@chromium.org>2021-04-01 05:39:53 +0000
commit207f3b2b5814bbbe2530b3d0f8fb4da1665a02ce (patch)
treed7db89ed9d5966e338ace0a8c3b21b719ece81b1
parentfc203961115e4e9a0e2216c7abc6f46191e99105 (diff)
downloadopenscreen-207f3b2b5814bbbe2530b3d0f8fb4da1665a02ce.tar.gz
Remove OperationLoop
This change removes OperationLoop because it's only used in one place and is arguably confusing. OperationLoop mixes sleep() and select() usage, with separate timeouts, which has resulted in some "tuning" bugs in the past. Since there's no benefit to the sleep portion and it is also what causes the confusion, this change removes it and thing directly uses SocketHandleWaiter in PlatformClientPosix. Bug: None Change-Id: I9f62c9253bee7d29b26d1838d34c1399f4d472b5 Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2798852 Commit-Queue: Jordan Bayles <jophba@chromium.org> Reviewed-by: Jordan Bayles <jophba@chromium.org>
-rw-r--r--cast/common/discovery/e2e_test/tests.cc2
-rw-r--r--cast/standalone_receiver/main.cc4
-rw-r--r--cast/standalone_sender/main.cc4
-rw-r--r--cast/test/cast_socket_e2e_test.cc3
-rw-r--r--osp/demo/osp_demo.cc3
-rw-r--r--osp/impl/discovery/mdns/mdns_demo.cc3
-rw-r--r--platform/impl/platform_client_posix.cc55
-rw-r--r--platform/impl/platform_client_posix.h23
-rw-r--r--util/BUILD.gn3
-rw-r--r--util/operation_loop.cc57
-rw-r--r--util/operation_loop.h74
-rw-r--r--util/operation_loop_unittest.cc42
12 files changed, 33 insertions, 240 deletions
diff --git a/cast/common/discovery/e2e_test/tests.cc b/cast/common/discovery/e2e_test/tests.cc
index 9a02053b..7c294418 100644
--- a/cast/common/discovery/e2e_test/tests.cc
+++ b/cast/common/discovery/e2e_test/tests.cc
@@ -143,7 +143,7 @@ class DiscoveryE2ETest : public testing::Test {
// Sleep to let any packets clear off the network before further tests.
std::this_thread::sleep_for(milliseconds(500));
- PlatformClientPosix::Create(milliseconds(50), milliseconds(50));
+ PlatformClientPosix::Create(milliseconds(50));
task_runner_ = PlatformClientPosix::GetInstance()->GetTaskRunner();
}
diff --git a/cast/standalone_receiver/main.cc b/cast/standalone_receiver/main.cc
index 9faee412..9e305c8b 100644
--- a/cast/standalone_receiver/main.cc
+++ b/cast/standalone_receiver/main.cc
@@ -230,9 +230,7 @@ int RunStandaloneReceiver(int argc, char* argv[]) {
}
auto* const task_runner = new TaskRunnerImpl(&Clock::now);
- // Cast has high networking demands--network operation timing and timeout must
- // be kept extremely small.
- PlatformClientPosix::Create(microseconds(50), microseconds(50),
+ PlatformClientPosix::Create(milliseconds(50),
std::unique_ptr<TaskRunnerImpl>(task_runner));
RunCastService(task_runner, interface, std::move(creds.value()),
friendly_name, model_name, discovery_enabled);
diff --git a/cast/standalone_sender/main.cc b/cast/standalone_sender/main.cc
index 7479adc1..75b50553 100644
--- a/cast/standalone_sender/main.cc
+++ b/cast/standalone_sender/main.cc
@@ -174,9 +174,7 @@ int StandaloneSenderMain(int argc, char* argv[]) {
#endif
auto* const task_runner = new TaskRunnerImpl(&Clock::now);
- // Cast has high networking demands--network operation timing and timeout must
- // be kept extremely small.
- PlatformClientPosix::Create(microseconds(50), microseconds(50),
+ PlatformClientPosix::Create(milliseconds(50),
std::unique_ptr<TaskRunnerImpl>(task_runner));
IPEndpoint remote_endpoint = ParseAsEndpoint(iface_or_endpoint);
diff --git a/cast/test/cast_socket_e2e_test.cc b/cast/test/cast_socket_e2e_test.cc
index 1f3273fa..446a880c 100644
--- a/cast/test/cast_socket_e2e_test.cc
+++ b/cast/test/cast_socket_e2e_test.cc
@@ -136,8 +136,7 @@ class ReceiverSocketsClient
class CastSocketE2ETest : public ::testing::Test {
public:
void SetUp() override {
- PlatformClientPosix::Create(std::chrono::milliseconds(10),
- std::chrono::milliseconds(0));
+ PlatformClientPosix::Create(std::chrono::milliseconds(10));
task_runner_ = PlatformClientPosix::GetInstance()->GetTaskRunner();
sender_router_ = MakeSerialDelete<VirtualConnectionRouter>(task_runner_);
diff --git a/osp/demo/osp_demo.cc b/osp/demo/osp_demo.cc
index 443536f3..952a925f 100644
--- a/osp/demo/osp_demo.cc
+++ b/osp/demo/osp_demo.cc
@@ -644,8 +644,7 @@ int main(int argc, char** argv) {
// TODO(jophba): Mac on Mojave hangs on this command forever.
openscreen::SetLogFifoOrDie(log_filename);
- PlatformClientPosix::Create(std::chrono::milliseconds(50),
- std::chrono::milliseconds(50));
+ PlatformClientPosix::Create(std::chrono::milliseconds(50));
if (is_receiver_demo) {
OSP_LOG_INFO << "Running publisher demo...";
diff --git a/osp/impl/discovery/mdns/mdns_demo.cc b/osp/impl/discovery/mdns/mdns_demo.cc
index 62860995..1fc1513e 100644
--- a/osp/impl/discovery/mdns/mdns_demo.cc
+++ b/osp/impl/discovery/mdns/mdns_demo.cc
@@ -361,8 +361,7 @@ int main(int argc, char** argv) {
openscreen::osp::ServiceMap services;
openscreen::osp::g_services = &services;
- PlatformClientPosix::Create(std::chrono::milliseconds(50),
- std::chrono::milliseconds(50));
+ PlatformClientPosix::Create(std::chrono::milliseconds(50));
openscreen::osp::BrowseDemo(
PlatformClientPosix::GetInstance()->GetTaskRunner(), labels[0], labels[1],
diff --git a/platform/impl/platform_client_posix.cc b/platform/impl/platform_client_posix.cc
index d1415258..328b8bfa 100644
--- a/platform/impl/platform_client_posix.cc
+++ b/platform/impl/platform_client_posix.cc
@@ -5,6 +5,7 @@
#include "platform/impl/platform_client_posix.h"
#include <functional>
+#include <utility>
#include <vector>
#include "platform/impl/udp_socket_reader_posix.h"
@@ -16,18 +17,14 @@ PlatformClientPosix* PlatformClientPosix::instance_ = nullptr;
// static
void PlatformClientPosix::Create(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval,
std::unique_ptr<TaskRunnerImpl> task_runner) {
SetInstance(new PlatformClientPosix(networking_operation_timeout,
- networking_loop_interval,
std::move(task_runner)));
}
// static
-void PlatformClientPosix::Create(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval) {
- SetInstance(new PlatformClientPosix(networking_operation_timeout,
- networking_loop_interval));
+void PlatformClientPosix::Create(Clock::duration networking_operation_timeout) {
+ SetInstance(new PlatformClientPosix(networking_operation_timeout));
}
// static
@@ -67,7 +64,7 @@ PlatformClientPosix::~PlatformClientPosix() {
}
OSP_DVLOG << "Shutting down network operations...";
- networking_loop_.RequestStopSoon();
+ networking_loop_running_.store(false);
networking_loop_thread_.join();
OSP_DVLOG << "\tNetwork operation shutdown complete!";
}
@@ -79,27 +76,21 @@ void PlatformClientPosix::SetInstance(PlatformClientPosix* instance) {
}
PlatformClientPosix::PlatformClientPosix(
- Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval)
- : networking_loop_(networking_operations(),
- networking_operation_timeout,
- networking_loop_interval),
- task_runner_(new TaskRunnerImpl(Clock::now)),
- networking_loop_thread_(&OperationLoop::RunUntilStopped,
- &networking_loop_),
+ Clock::duration networking_operation_timeout)
+ : task_runner_(new TaskRunnerImpl(Clock::now)),
+ networking_loop_timeout_(networking_operation_timeout),
+ networking_loop_thread_(&PlatformClientPosix::RunNetworkLoopUntilStopped,
+ this),
task_runner_thread_(
std::thread(&TaskRunnerImpl::RunUntilStopped, task_runner_.get())) {}
PlatformClientPosix::PlatformClientPosix(
Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval,
std::unique_ptr<TaskRunnerImpl> task_runner)
- : networking_loop_(networking_operations(),
- networking_operation_timeout,
- networking_loop_interval),
- task_runner_(std::move(task_runner)),
- networking_loop_thread_(&OperationLoop::RunUntilStopped,
- &networking_loop_) {}
+ : task_runner_(std::move(task_runner)),
+ networking_loop_timeout_(networking_operation_timeout),
+ networking_loop_thread_(&PlatformClientPosix::RunNetworkLoopUntilStopped,
+ this) {}
SocketHandleWaiterPosix* PlatformClientPosix::socket_handle_waiter() {
std::call_once(waiter_initialization_, [this]() {
@@ -109,20 +100,14 @@ SocketHandleWaiterPosix* PlatformClientPosix::socket_handle_waiter() {
return waiter_.get();
}
-void PlatformClientPosix::PerformSocketHandleWaiterActions(
- Clock::duration timeout) {
- if (!waiter_created_.load()) {
- return;
+void PlatformClientPosix::RunNetworkLoopUntilStopped() {
+ while (networking_loop_running_.load()) {
+ if (!waiter_created_.load()) {
+ std::this_thread::sleep_for(networking_loop_timeout_);
+ continue;
+ }
+ socket_handle_waiter()->ProcessHandles(networking_loop_timeout_);
}
-
- socket_handle_waiter()->ProcessHandles(timeout);
-}
-
-std::vector<std::function<void(Clock::duration)>>
-PlatformClientPosix::networking_operations() {
- return {[this](Clock::duration timeout) {
- PerformSocketHandleWaiterActions(timeout);
- }};
}
} // namespace openscreen
diff --git a/platform/impl/platform_client_posix.h b/platform/impl/platform_client_posix.h
index 9f086f58..bb320f55 100644
--- a/platform/impl/platform_client_posix.h
+++ b/platform/impl/platform_client_posix.h
@@ -17,7 +17,6 @@
#include "platform/impl/socket_handle_waiter_posix.h"
#include "platform/impl/task_runner.h"
#include "platform/impl/tls_data_router_posix.h"
-#include "util/operation_loop.h"
namespace openscreen {
@@ -50,13 +49,11 @@ class PlatformClientPosix {
//
// |task_runner| is a client-provided TaskRunner implementation.
static void Create(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval,
std::unique_ptr<TaskRunnerImpl> task_runner);
// Initializes the platform implementation and creates a new TaskRunner (which
// starts a new thread).
- static void Create(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval);
+ static void Create(Clock::duration networking_operation_timeout);
// Shuts down and deletes the PlatformClient instance currently stored as a
// singleton. This method is expected to be called before program exit. After
@@ -85,25 +82,15 @@ class PlatformClientPosix {
static void SetInstance(PlatformClientPosix* client);
private:
- PlatformClientPosix(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval);
+ explicit PlatformClientPosix(Clock::duration networking_operation_timeout);
PlatformClientPosix(Clock::duration networking_operation_timeout,
- Clock::duration networking_loop_interval,
std::unique_ptr<TaskRunnerImpl> task_runner);
// This method is thread-safe.
SocketHandleWaiterPosix* socket_handle_waiter();
- // Helper functions to use when creating and calling the OperationLoop used
- // for the networking thread.
- void PerformSocketHandleWaiterActions(Clock::duration timeout);
- void PerformTlsDataRouterActions(Clock::duration timeout);
- std::vector<std::function<void(Clock::duration)>> networking_operations();
-
- // Instance objects with threads are created at object-creation time.
- // NOTE: Delayed instantiation of networking_loop_ may be useful in future.
- OperationLoop networking_loop_;
+ void RunNetworkLoopUntilStopped();
std::unique_ptr<TaskRunnerImpl> task_runner_;
@@ -111,6 +98,10 @@ class PlatformClientPosix {
std::atomic_bool waiter_created_{false};
std::atomic_bool tls_data_router_created_{false};
+ // Parameters for networking loop.
+ std::atomic_bool networking_loop_running_{true};
+ Clock::duration networking_loop_timeout_;
+
// Flags used to ensure that initialization of below instance objects occurs
// only once across all threads.
std::once_flag waiter_initialization_;
diff --git a/util/BUILD.gn b/util/BUILD.gn
index d379fde7..3f97e09e 100644
--- a/util/BUILD.gn
+++ b/util/BUILD.gn
@@ -51,8 +51,6 @@ source_set("util") {
"json/json_serialization.h",
"json/json_value.cc",
"json/json_value.h",
- "operation_loop.cc",
- "operation_loop.h",
"osp_logging.h",
"saturate_cast.h",
"simple_fraction.cc",
@@ -112,7 +110,6 @@ source_set("unittests") {
"json/json_helpers_unittest.cc",
"json/json_serialization_unittest.cc",
"json/json_value_unittest.cc",
- "operation_loop_unittest.cc",
"saturate_cast_unittest.cc",
"simple_fraction_unittest.cc",
"stringprintf_unittest.cc",
diff --git a/util/operation_loop.cc b/util/operation_loop.cc
deleted file mode 100644
index a20f1447..00000000
--- a/util/operation_loop.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright 2019 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-#include "util/operation_loop.h"
-
-#include <algorithm>
-
-#include "util/osp_logging.h"
-
-namespace openscreen {
-
-OperationLoop::OperationLoop(std::vector<OperationWithTimeout> operations,
- Clock::duration timeout,
- Clock::duration min_loop_execution_time)
- : perform_all_operations_min_execution_time_(min_loop_execution_time),
- operation_timeout_(timeout),
- operations_(operations) {
- OSP_DCHECK(operations_.size());
-}
-
-void OperationLoop::RunUntilStopped() {
- OSP_CHECK(!is_running_.exchange(true));
-
- while (is_running_.load()) {
- PerformAllOperations();
- }
-}
-
-void OperationLoop::RequestStopSoon() {
- {
- std::unique_lock<std::mutex> lock(wait_mutex_);
- is_running_.store(false);
- }
-
- perform_all_operations_waiter_.notify_all();
-}
-
-void OperationLoop::PerformAllOperations() {
- auto start_time = Clock::now();
-
- for (OperationWithTimeout operation : operations_) {
- if (is_running_.load()) {
- operation(operation_timeout_);
- }
- }
-
- const auto duration = Clock::now() - start_time;
- const auto remaining_duration =
- perform_all_operations_min_execution_time_ - duration;
- if (remaining_duration > Clock::duration{0} && is_running_.load()) {
- std::unique_lock<std::mutex> lock(wait_mutex_);
- perform_all_operations_waiter_.wait_for(
- lock, remaining_duration, [this]() { return !is_running_.load(); });
- }
-}
-
-} // namespace openscreen
diff --git a/util/operation_loop.h b/util/operation_loop.h
deleted file mode 100644
index ddb7846f..00000000
--- a/util/operation_loop.h
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2019 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef UTIL_OPERATION_LOOP_H_
-#define UTIL_OPERATION_LOOP_H_
-
-#include <atomic>
-#include <condition_variable>
-#include <functional>
-#include <vector>
-
-#include "platform/api/time.h"
-#include "platform/base/macros.h"
-
-namespace openscreen {
-
-class OperationLoop {
- public:
- using OperationWithTimeout = std::function<void(Clock::duration)>;
-
- // Creates a new OperationLoop from a variable number of operations. The
- // provided functions will be called repeatedly, at a minimum interval equal
- // to min_loop_execution_time, and are expected to exit after the time period
- // provided to their call has passed. This is because some operations may not
- // be safe to be interrupted from this class.
- // NOTE: If n operations are provided with operation timeout T, each iteration
- // of the operation loop may take as long as n * T, and will not exit after
- // min_loop_execution_time has elapsed. In order to avoid this behavior, the
- // caller can set min_loop_execution_time = n * T.
- //
- // operations = Functions to execute repeatedly. All functions are expected to
- // be valid the duration of this object's lifetime.
- // timeout = Timeout for each individual function above.
- // min_loop_execution_time = Minimum time that OperationLoop should wait
- // before successive calls to members of the
- // provided operations vector.
- OperationLoop(std::vector<OperationWithTimeout> operations,
- Clock::duration timeout,
- Clock::duration min_loop_execution_time);
-
- // Runs the PerformAllOperations function in a loop until the below
- // RequestStopSoon function is called.
- void RunUntilStopped();
-
- // Signals for the RunUntilStopped loop to cease running.
- void RequestStopSoon();
-
- OSP_DISALLOW_COPY_AND_ASSIGN(OperationLoop);
-
- private:
- // Performs all operations which have been provided to this instance.
- void PerformAllOperations();
-
- const Clock::duration perform_all_operations_min_execution_time_;
-
- const Clock::duration operation_timeout_;
-
- // Used to wait in PerformAllOperations() if not enough time has elapsed.
- std::condition_variable perform_all_operations_waiter_;
-
- // Mutex used by the above condition_variable.
- std::mutex wait_mutex_;
-
- // Represents whether this instance is currently "running".
- std::atomic_bool is_running_{false};
-
- // Operations currently being run by this object.
- const std::vector<OperationWithTimeout> operations_;
-};
-
-} // namespace openscreen
-
-#endif // UTIL_OPERATION_LOOP_H_
diff --git a/util/operation_loop_unittest.cc b/util/operation_loop_unittest.cc
deleted file mode 100644
index 4b4ea5df..00000000
--- a/util/operation_loop_unittest.cc
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2019 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "util/operation_loop.h"
-
-#include <atomic>
-#include <chrono>
-#include <thread>
-
-#include "gtest/gtest.h"
-
-namespace openscreen {
-
-TEST(OperationsLoopTest, PerformAllOperationsWaits) {
- constexpr Clock::duration kTimeout{0};
- constexpr Clock::duration kMinRuntime{500};
-
- const auto start_time = Clock::now();
- std::atomic<Clock::time_point> last_run{start_time};
- std::atomic<Clock::time_point> current_run{start_time};
- std::function<void(Clock::duration)> test_function =
- [last = &last_run, current = &current_run](Clock::duration timeout) {
- last->store(current->load());
- current->store(Clock::now());
- };
- OperationLoop loop({test_function}, kTimeout, kMinRuntime);
-
- std::thread run_loop([&loop]() { loop.RunUntilStopped(); });
-
- while (last_run.load() == start_time) {
- }
-
- loop.RequestStopSoon();
- run_loop.join();
-
- EXPECT_GE(
- std::chrono::nanoseconds(current_run.load() - last_run.load()).count(),
- std::chrono::nanoseconds(kMinRuntime).count());
-}
-
-} // namespace openscreen