aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordan Bayles <jophba@chromium.org>2021-06-10 21:06:54 -0700
committerOpenscreen LUCI CQ <openscreen-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-06-12 03:43:45 +0000
commita6c92a803725d46f4aa6ef46bf66a5ccd1f69961 (patch)
treeeab1300a2a162db7b731ba3fde6ead51bdd2af0c
parent0f37d8d302ee7da5c2ed5eafb93898709b9a0d9c (diff)
downloadopenscreen-a6c92a803725d46f4aa6ef46bf66a5ccd1f69961.tar.gz
[Cast Streaming] RpcBroker -> RpcMessenger
This patch renames Open Screen's RpcBroker to RpcMessenger, in order to avoid confusion when discussing this implementation versus Chrome's RpcBroker implementation--which will be going away soon. Change-Id: I23127074db3f7d807c6f70c26c7b596b6edb503a Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2953900 Reviewed-by: Ryan Keane <rwkeane@google.com> Commit-Queue: Jordan Bayles <jophba@chromium.org>
-rw-r--r--cast/streaming/BUILD.gn6
-rw-r--r--cast/streaming/receiver_session.cc4
-rw-r--r--cast/streaming/receiver_session.h10
-rw-r--r--cast/streaming/rpc_messenger.cc (renamed from cast/streaming/rpc_broker.cc)28
-rw-r--r--cast/streaming/rpc_messenger.h (renamed from cast/streaming/rpc_broker.h)26
-rw-r--r--cast/streaming/rpc_messenger_unittest.cc (renamed from cast/streaming/rpc_broker_unittest.cc)54
-rw-r--r--cast/streaming/sender_session.cc12
-rw-r--r--cast/streaming/sender_session.h13
-rw-r--r--cast/streaming/sender_session_unittest.cc8
9 files changed, 81 insertions, 80 deletions
diff --git a/cast/streaming/BUILD.gn b/cast/streaming/BUILD.gn
index baacb905..985cd1e2 100644
--- a/cast/streaming/BUILD.gn
+++ b/cast/streaming/BUILD.gn
@@ -59,8 +59,8 @@ source_set("common") {
"packet_util.h",
"receiver_message.cc",
"receiver_message.h",
- "rpc_broker.cc",
- "rpc_broker.h",
+ "rpc_messenger.cc",
+ "rpc_messenger.h",
"rtcp_common.cc",
"rtcp_common.h",
"rtcp_session.cc",
@@ -193,7 +193,7 @@ source_set("unittests") {
"packet_util_unittest.cc",
"receiver_session_unittest.cc",
"receiver_unittest.cc",
- "rpc_broker_unittest.cc",
+ "rpc_messenger_unittest.cc",
"rtcp_common_unittest.cc",
"rtp_packet_parser_unittest.cc",
"rtp_packetizer_unittest.cc",
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index a49acbca..02c57e60 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -291,7 +291,7 @@ void ReceiverSession::InitializeSession(const SessionProperties& properties) {
client_->OnNegotiated(this, std::move(receivers));
} else {
// TODO(jophba): cleanup sequence number usage.
- broker_ = std::make_unique<RpcBroker>([this](std::vector<uint8_t> message) {
+ rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
Error error = this->messenger_.SendMessage(
ReceiverMessage{ReceiverMessage::Type::kRpc, -1, true /* valid */,
std::move(message)});
@@ -301,7 +301,7 @@ void ReceiverSession::InitializeSession(const SessionProperties& properties) {
}
});
client_->OnRemotingNegotiated(
- this, RemotingNegotiation{std::move(receivers), broker_.get()});
+ this, RemotingNegotiation{std::move(receivers), rpc_messenger_.get()});
}
const Error result = messenger_.SendMessage(ReceiverMessage{
ReceiverMessage::Type::kAnswer, properties.sequence_number,
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index 0f9f00cf..c27d64ab 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -16,7 +16,7 @@
#include "cast/streaming/offer_messages.h"
#include "cast/streaming/receiver_packet_router.h"
#include "cast/streaming/resolution.h"
-#include "cast/streaming/rpc_broker.h"
+#include "cast/streaming/rpc_messenger.h"
#include "cast/streaming/sender_message.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/session_messenger.h"
@@ -68,8 +68,8 @@ class ReceiverSession final : public Environment::SocketSubscriber {
struct RemotingNegotiation {
ConfiguredReceivers receivers;
- // The RPC broker to be used for subscribing to remoting proto messages.
- RpcBroker* broker;
+ // The RPC messenger to be used for subscribing to remoting proto messages.
+ RpcMessenger* messenger;
};
// The embedder should provide a client for handling connections.
@@ -335,9 +335,9 @@ class ReceiverSession final : public Environment::SocketSubscriber {
std::unique_ptr<Receiver> current_audio_receiver_;
std::unique_ptr<Receiver> current_video_receiver_;
- // If remoting, we store the RpcBroker used by the embedder to send RPC
+ // If remoting, we store the RpcMessenger used by the embedder to send RPC
// messages from the remoting protobuf specification.
- std::unique_ptr<RpcBroker> broker_;
+ std::unique_ptr<RpcMessenger> rpc_messenger_;
};
} // namespace cast
diff --git a/cast/streaming/rpc_broker.cc b/cast/streaming/rpc_messenger.cc
index 58379d79..54b2e279 100644
--- a/cast/streaming/rpc_broker.cc
+++ b/cast/streaming/rpc_messenger.cc
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "cast/streaming/rpc_broker.h"
+#include "cast/streaming/rpc_messenger.h"
#include <memory>
#include <string>
@@ -37,25 +37,25 @@ std::ostream& operator<<(std::ostream& out, const RpcMessage& message) {
} // namespace
-constexpr RpcBroker::Handle RpcBroker::kInvalidHandle;
-constexpr RpcBroker::Handle RpcBroker::kAcquireRendererHandle;
-constexpr RpcBroker::Handle RpcBroker::kAcquireDemuxerHandle;
-constexpr RpcBroker::Handle RpcBroker::kFirstHandle;
+constexpr RpcMessenger::Handle RpcMessenger::kInvalidHandle;
+constexpr RpcMessenger::Handle RpcMessenger::kAcquireRendererHandle;
+constexpr RpcMessenger::Handle RpcMessenger::kAcquireDemuxerHandle;
+constexpr RpcMessenger::Handle RpcMessenger::kFirstHandle;
-RpcBroker::RpcBroker(SendMessageCallback send_message_cb)
+RpcMessenger::RpcMessenger(SendMessageCallback send_message_cb)
: next_handle_(kFirstHandle),
send_message_cb_(std::move(send_message_cb)) {}
-RpcBroker::~RpcBroker() {
+RpcMessenger::~RpcMessenger() {
receive_callbacks_.clear();
}
-RpcBroker::Handle RpcBroker::GetUniqueHandle() {
+RpcMessenger::Handle RpcMessenger::GetUniqueHandle() {
return next_handle_++;
}
-void RpcBroker::RegisterMessageReceiverCallback(
- RpcBroker::Handle handle,
+void RpcMessenger::RegisterMessageReceiverCallback(
+ RpcMessenger::Handle handle,
ReceiveMessageCallback callback) {
OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end())
<< "must deregister before re-registering";
@@ -63,12 +63,12 @@ void RpcBroker::RegisterMessageReceiverCallback(
receive_callbacks_.emplace_back(handle, std::move(callback));
}
-void RpcBroker::UnregisterMessageReceiverCallback(RpcBroker::Handle handle) {
+void RpcMessenger::UnregisterMessageReceiverCallback(RpcMessenger::Handle handle) {
OSP_DVLOG << "unregistering handle: " << handle;
receive_callbacks_.erase_key(handle);
}
-void RpcBroker::ProcessMessageFromRemote(const uint8_t* message,
+void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
std::size_t message_len) {
auto rpc = std::make_unique<RpcMessage>();
if (!rpc->ParseFromArray(message, message_len)) {
@@ -86,14 +86,14 @@ void RpcBroker::ProcessMessageFromRemote(const uint8_t* message,
entry->second(std::move(rpc));
}
-void RpcBroker::SendMessageToRemote(const RpcMessage& rpc) {
+void RpcMessenger::SendMessageToRemote(const RpcMessage& rpc) {
OSP_DVLOG << "Sending RPC message: " << rpc;
std::vector<uint8_t> message(rpc.ByteSizeLong());
rpc.SerializeToArray(message.data(), message.size());
send_message_cb_(std::move(message));
}
-bool RpcBroker::IsRegisteredForTesting(RpcBroker::Handle handle) {
+bool RpcMessenger::IsRegisteredForTesting(RpcMessenger::Handle handle) {
return receive_callbacks_.find(handle) != receive_callbacks_.end();
}
diff --git a/cast/streaming/rpc_broker.h b/cast/streaming/rpc_messenger.h
index 3cccec83..c997a7b9 100644
--- a/cast/streaming/rpc_broker.h
+++ b/cast/streaming/rpc_messenger.h
@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef CAST_STREAMING_RPC_BROKER_H_
-#define CAST_STREAMING_RPC_BROKER_H_
+#ifndef CAST_STREAMING_RPC_MESSENGER_H_
+#define CAST_STREAMING_RPC_MESSENGER_H_
#include <memory>
#include <string>
@@ -24,23 +24,23 @@ namespace cast {
// value in the existing RPC communication channel using the special handles
// |kAcquire*Handle|.
//
-// NOTE: RpcBroker doesn't actually send RPC messages to the remote. The session
+// NOTE: RpcMessenger doesn't actually send RPC messages to the remote. The session
// messenger needs to set SendMessageCallback, and call ProcessMessageFromRemote
-// as appropriate. The RpcBroker then distributes each RPC message to the
+// as appropriate. The RpcMessenger then distributes each RPC message to the
// subscribed component.
-class RpcBroker {
+class RpcMessenger {
public:
using Handle = int;
using ReceiveMessageCallback =
std::function<void(std::unique_ptr<RpcMessage>)>;
using SendMessageCallback = std::function<void(std::vector<uint8_t>)>;
- explicit RpcBroker(SendMessageCallback send_message_cb);
- RpcBroker(const RpcBroker&) = delete;
- RpcBroker(RpcBroker&&) noexcept;
- RpcBroker& operator=(const RpcBroker&) = delete;
- RpcBroker& operator=(RpcBroker&&);
- ~RpcBroker();
+ explicit RpcMessenger(SendMessageCallback send_message_cb);
+ RpcMessenger(const RpcMessenger&) = delete;
+ RpcMessenger(RpcMessenger&&) noexcept;
+ RpcMessenger& operator=(const RpcMessenger&) = delete;
+ RpcMessenger& operator=(RpcMessenger&&);
+ ~RpcMessenger();
// Get unique handle value for RPC message handles.
Handle GetUniqueHandle();
@@ -68,7 +68,7 @@ class RpcBroker {
// Checks if the handle is registered for receiving messages. Test-only.
bool IsRegisteredForTesting(Handle handle);
- // Consumers of RPCBroker may set the send message callback post-hoc
+ // Consumers of RPCMessenger may set the send message callback post-hoc
// in order to simulate different scenarios.
void set_send_message_cb_for_testing(SendMessageCallback cb) {
send_message_cb_ = std::move(cb);
@@ -99,4 +99,4 @@ class RpcBroker {
} // namespace cast
} // namespace openscreen
-#endif // CAST_STREAMING_RPC_BROKER_H_
+#endif // CAST_STREAMING_RPC_MESSENGER_H_
diff --git a/cast/streaming/rpc_broker_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc
index a837285c..e0627542 100644
--- a/cast/streaming/rpc_broker_unittest.cc
+++ b/cast/streaming/rpc_messenger_unittest.cc
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "cast/streaming/rpc_broker.h"
+#include "cast/streaming/rpc_messenger.h"
#include <memory>
#include <string>
@@ -39,8 +39,8 @@ class FakeMessenger {
int sent_count() const { return sent_count_; }
const RpcMessage& sent_rpc() const { return sent_rpc_; }
- void set_handle(RpcBroker::Handle handle) { handle_ = handle; }
- RpcBroker::Handle handle() { return handle_; }
+ void set_handle(RpcMessenger::Handle handle) { handle_ = handle; }
+ RpcMessenger::Handle handle() { return handle_; }
private:
std::unique_ptr<RpcMessage> received_rpc_;
@@ -49,25 +49,25 @@ class FakeMessenger {
RpcMessage sent_rpc_;
int sent_count_ = 0;
- RpcBroker::Handle handle_ = -1;
+ RpcMessenger::Handle handle_ = -1;
};
} // namespace
-class RpcBrokerTest : public testing::Test {
+class RpcMessengerTest : public testing::Test {
protected:
void SetUp() override {
fake_messenger_ = std::make_unique<FakeMessenger>();
ASSERT_FALSE(fake_messenger_->received_count());
- rpc_broker_ = std::make_unique<RpcBroker>(
+ rpc_messenger_ = std::make_unique<RpcMessenger>(
[p = fake_messenger_.get()](std::vector<uint8_t> message) {
p->OnSentRpc(message);
});
- const auto handle = rpc_broker_->GetUniqueHandle();
+ const auto handle = rpc_messenger_->GetUniqueHandle();
fake_messenger_->set_handle(handle);
- rpc_broker_->RegisterMessageReceiverCallback(
+ rpc_messenger_->RegisterMessageReceiverCallback(
handle,
[p = fake_messenger_.get()](std::unique_ptr<RpcMessage> message) {
p->OnReceivedRpc(std::move(message));
@@ -77,41 +77,41 @@ class RpcBrokerTest : public testing::Test {
void ProcessMessage(const RpcMessage& rpc) {
std::vector<uint8_t> message(rpc.ByteSizeLong());
rpc.SerializeToArray(message.data(), message.size());
- rpc_broker_->ProcessMessageFromRemote(message.data(), message.size());
+ rpc_messenger_->ProcessMessageFromRemote(message.data(), message.size());
}
std::unique_ptr<FakeMessenger> fake_messenger_;
- std::unique_ptr<RpcBroker> rpc_broker_;
+ std::unique_ptr<RpcMessenger> rpc_messenger_;
};
-TEST_F(RpcBrokerTest, TestProcessMessageFromRemoteRegistered) {
+TEST_F(RpcMessengerTest, TestProcessMessageFromRemoteRegistered) {
RpcMessage rpc;
rpc.set_handle(fake_messenger_->handle());
ProcessMessage(rpc);
ASSERT_EQ(1, fake_messenger_->received_count());
}
-TEST_F(RpcBrokerTest, TestProcessMessageFromRemoteUnregistered) {
+TEST_F(RpcMessengerTest, TestProcessMessageFromRemoteUnregistered) {
RpcMessage rpc;
- rpc_broker_->UnregisterMessageReceiverCallback(fake_messenger_->handle());
+ rpc_messenger_->UnregisterMessageReceiverCallback(fake_messenger_->handle());
ProcessMessage(rpc);
ASSERT_EQ(0, fake_messenger_->received_count());
}
-TEST_F(RpcBrokerTest, CanSendMultipleMessages) {
+TEST_F(RpcMessengerTest, CanSendMultipleMessages) {
for (int i = 0; i < 10; ++i) {
- rpc_broker_->SendMessageToRemote(RpcMessage{});
+ rpc_messenger_->SendMessageToRemote(RpcMessage{});
}
EXPECT_EQ(10, fake_messenger_->sent_count());
}
-TEST_F(RpcBrokerTest, SendMessageCallback) {
- // Send message for RPC broker to process.
+TEST_F(RpcMessengerTest, SendMessageCallback) {
+ // Send message for RPC messenger to process.
RpcMessage sent_rpc;
sent_rpc.set_handle(fake_messenger_->handle());
sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
sent_rpc.set_double_value(2.3);
- rpc_broker_->SendMessageToRemote(sent_rpc);
+ rpc_messenger_->SendMessageToRemote(sent_rpc);
// Check if received message is identical to the one sent earlier.
ASSERT_EQ(1, fake_messenger_->sent_count());
@@ -121,8 +121,8 @@ TEST_F(RpcBrokerTest, SendMessageCallback) {
ASSERT_EQ(2.3, message.double_value());
}
-TEST_F(RpcBrokerTest, ProcessMessageWithRegisteredHandle) {
- // Send message for RPC broker to process.
+TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) {
+ // Send message for RPC messenger to process.
RpcMessage sent_rpc;
sent_rpc.set_handle(fake_messenger_->handle());
sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
@@ -137,10 +137,10 @@ TEST_F(RpcBrokerTest, ProcessMessageWithRegisteredHandle) {
ASSERT_EQ(3.4, received_rpc.double_value());
}
-TEST_F(RpcBrokerTest, ProcessMessageWithUnregisteredHandle) {
- // Send message for RPC broker to process.
+TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) {
+ // Send message for RPC messenger to process.
RpcMessage sent_rpc;
- RpcBroker::Handle different_handle = fake_messenger_->handle() + 1;
+ RpcMessenger::Handle different_handle = fake_messenger_->handle() + 1;
sent_rpc.set_handle(different_handle);
sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
sent_rpc.set_double_value(4.5);
@@ -150,12 +150,12 @@ TEST_F(RpcBrokerTest, ProcessMessageWithUnregisteredHandle) {
ASSERT_EQ(0, fake_messenger_->received_count());
}
-TEST_F(RpcBrokerTest, Registration) {
+TEST_F(RpcMessengerTest, Registration) {
const auto handle = fake_messenger_->handle();
- ASSERT_TRUE(rpc_broker_->IsRegisteredForTesting(handle));
+ ASSERT_TRUE(rpc_messenger_->IsRegisteredForTesting(handle));
- rpc_broker_->UnregisterMessageReceiverCallback(handle);
- ASSERT_FALSE(rpc_broker_->IsRegisteredForTesting(handle));
+ rpc_messenger_->UnregisterMessageReceiverCallback(handle);
+ ASSERT_FALSE(rpc_messenger_->IsRegisteredForTesting(handle));
}
} // namespace cast
diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc
index ca5ce027..bbb3972f 100644
--- a/cast/streaming/sender_session.cc
+++ b/cast/streaming/sender_session.cc
@@ -265,7 +265,7 @@ void SenderSession::ResetState() {
current_negotiation_.reset();
current_audio_sender_.reset();
current_video_sender_.reset();
- broker_.reset();
+ rpc_messenger_.reset();
}
Error SenderSession::StartNegotiation(
@@ -361,7 +361,7 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) {
"Failed to negotiate a remoting session."));
return;
}
- broker_ = std::make_unique<RpcBroker>([this](std::vector<uint8_t> message) {
+ rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
Error error = this->messenger_.SendOutboundMessage(SenderMessage{
SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
std::move(message)});
@@ -373,12 +373,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) {
config_.client->OnRemotingNegotiated(
this, RemotingNegotiation{std::move(senders), ToCapabilities(caps),
- broker_.get()});
+ rpc_messenger_.get()});
}
void SenderSession::OnRpcMessage(ReceiverMessage message) {
- if (!broker_) {
- OSP_LOG_INFO << "Received an RPC message without having an RPCBroker.";
+ if (!rpc_messenger_) {
+ OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger.";
return;
}
@@ -390,7 +390,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) {
}
const auto& body = absl::get<std::vector<uint8_t>>(message.body);
- broker_->ProcessMessageFromRemote(body.data(), body.size());
+ rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
}
void SenderSession::HandleErrorMessage(ReceiverMessage message,
diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h
index d95c61aa..7995181f 100644
--- a/cast/streaming/sender_session.h
+++ b/cast/streaming/sender_session.h
@@ -16,7 +16,7 @@
#include "cast/streaming/capture_recommendations.h"
#include "cast/streaming/offer_messages.h"
#include "cast/streaming/remoting_capabilities.h"
-#include "cast/streaming/rpc_broker.h"
+#include "cast/streaming/rpc_messenger.h"
#include "cast/streaming/sender.h"
#include "cast/streaming/sender_packet_router.h"
#include "cast/streaming/session_config.h"
@@ -59,8 +59,8 @@ class SenderSession final {
// a version check when using these capabilities to offer remoting.
RemotingCapabilities capabilities;
- // The RPC broker to be used for subscribing to remoting proto messages.
- RpcBroker* broker;
+ // The RPC messenger to be used for subscribing to remoting proto messages.
+ RpcMessenger* messenger;
};
// The embedder should provide a client for handling negotiation events.
@@ -255,9 +255,10 @@ class SenderSession final {
std::unique_ptr<Sender> current_audio_sender_;
std::unique_ptr<Sender> current_video_sender_;
- // If remoting, we store the RpcBroker used by the embedder to send RPC
- // messages from the remoting protobuf specification.
- std::unique_ptr<RpcBroker> broker_;
+ // If remoting, we store the RpcMessenger used by the embedder to send RPC
+ // messages from the remoting protobuf specification. For more information,
+ // see //cast/streaming/remoting.proto.
+ std::unique_ptr<RpcMessenger> rpc_messenger_;
}; // namespace cast
} // namespace cast
diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc
index c5231633..36b1bae8 100644
--- a/cast/streaming/sender_session_unittest.cc
+++ b/cast/streaming/sender_session_unittest.cc
@@ -543,11 +543,11 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) {
EXPECT_THAT(negotiation.capabilities.video,
testing::ElementsAre(VideoCapability::kVp8));
- // The broker is tested elsewhere, but we can sanity check that we got a valid
+ // The messenger is tested elsewhere, but we can sanity check that we got a valid
// one here.
- EXPECT_TRUE(negotiation.broker);
- const RpcBroker::Handle handle = negotiation.broker->GetUniqueHandle();
- EXPECT_NE(RpcBroker::kInvalidHandle, handle);
+ EXPECT_TRUE(negotiation.messenger);
+ const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle();
+ EXPECT_NE(RpcMessenger::kInvalidHandle, handle);
}
} // namespace cast