diff options
author | Jordan Bayles <jophba@chromium.org> | 2021-06-10 21:06:54 -0700 |
---|---|---|
committer | Openscreen LUCI CQ <openscreen-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-06-12 03:43:45 +0000 |
commit | a6c92a803725d46f4aa6ef46bf66a5ccd1f69961 (patch) | |
tree | eab1300a2a162db7b731ba3fde6ead51bdd2af0c /cast | |
parent | 0f37d8d302ee7da5c2ed5eafb93898709b9a0d9c (diff) | |
download | openscreen-a6c92a803725d46f4aa6ef46bf66a5ccd1f69961.tar.gz |
[Cast Streaming] RpcBroker -> RpcMessenger
This patch renames Open Screen's RpcBroker to RpcMessenger, in order to avoid
confusion when discussing this implementation versus Chrome's RpcBroker
implementation--which will be going away soon.
Change-Id: I23127074db3f7d807c6f70c26c7b596b6edb503a
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2953900
Reviewed-by: Ryan Keane <rwkeane@google.com>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
Diffstat (limited to 'cast')
-rw-r--r-- | cast/streaming/BUILD.gn | 6 | ||||
-rw-r--r-- | cast/streaming/receiver_session.cc | 4 | ||||
-rw-r--r-- | cast/streaming/receiver_session.h | 10 | ||||
-rw-r--r-- | cast/streaming/rpc_messenger.cc (renamed from cast/streaming/rpc_broker.cc) | 28 | ||||
-rw-r--r-- | cast/streaming/rpc_messenger.h (renamed from cast/streaming/rpc_broker.h) | 26 | ||||
-rw-r--r-- | cast/streaming/rpc_messenger_unittest.cc (renamed from cast/streaming/rpc_broker_unittest.cc) | 54 | ||||
-rw-r--r-- | cast/streaming/sender_session.cc | 12 | ||||
-rw-r--r-- | cast/streaming/sender_session.h | 13 | ||||
-rw-r--r-- | cast/streaming/sender_session_unittest.cc | 8 |
9 files changed, 81 insertions, 80 deletions
diff --git a/cast/streaming/BUILD.gn b/cast/streaming/BUILD.gn index baacb905..985cd1e2 100644 --- a/cast/streaming/BUILD.gn +++ b/cast/streaming/BUILD.gn @@ -59,8 +59,8 @@ source_set("common") { "packet_util.h", "receiver_message.cc", "receiver_message.h", - "rpc_broker.cc", - "rpc_broker.h", + "rpc_messenger.cc", + "rpc_messenger.h", "rtcp_common.cc", "rtcp_common.h", "rtcp_session.cc", @@ -193,7 +193,7 @@ source_set("unittests") { "packet_util_unittest.cc", "receiver_session_unittest.cc", "receiver_unittest.cc", - "rpc_broker_unittest.cc", + "rpc_messenger_unittest.cc", "rtcp_common_unittest.cc", "rtp_packet_parser_unittest.cc", "rtp_packetizer_unittest.cc", diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc index a49acbca..02c57e60 100644 --- a/cast/streaming/receiver_session.cc +++ b/cast/streaming/receiver_session.cc @@ -291,7 +291,7 @@ void ReceiverSession::InitializeSession(const SessionProperties& properties) { client_->OnNegotiated(this, std::move(receivers)); } else { // TODO(jophba): cleanup sequence number usage. - broker_ = std::make_unique<RpcBroker>([this](std::vector<uint8_t> message) { + rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) { Error error = this->messenger_.SendMessage( ReceiverMessage{ReceiverMessage::Type::kRpc, -1, true /* valid */, std::move(message)}); @@ -301,7 +301,7 @@ void ReceiverSession::InitializeSession(const SessionProperties& properties) { } }); client_->OnRemotingNegotiated( - this, RemotingNegotiation{std::move(receivers), broker_.get()}); + this, RemotingNegotiation{std::move(receivers), rpc_messenger_.get()}); } const Error result = messenger_.SendMessage(ReceiverMessage{ ReceiverMessage::Type::kAnswer, properties.sequence_number, diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h index 0f9f00cf..c27d64ab 100644 --- a/cast/streaming/receiver_session.h +++ b/cast/streaming/receiver_session.h @@ -16,7 +16,7 @@ #include "cast/streaming/offer_messages.h" #include "cast/streaming/receiver_packet_router.h" #include "cast/streaming/resolution.h" -#include "cast/streaming/rpc_broker.h" +#include "cast/streaming/rpc_messenger.h" #include "cast/streaming/sender_message.h" #include "cast/streaming/session_config.h" #include "cast/streaming/session_messenger.h" @@ -68,8 +68,8 @@ class ReceiverSession final : public Environment::SocketSubscriber { struct RemotingNegotiation { ConfiguredReceivers receivers; - // The RPC broker to be used for subscribing to remoting proto messages. - RpcBroker* broker; + // The RPC messenger to be used for subscribing to remoting proto messages. + RpcMessenger* messenger; }; // The embedder should provide a client for handling connections. @@ -335,9 +335,9 @@ class ReceiverSession final : public Environment::SocketSubscriber { std::unique_ptr<Receiver> current_audio_receiver_; std::unique_ptr<Receiver> current_video_receiver_; - // If remoting, we store the RpcBroker used by the embedder to send RPC + // If remoting, we store the RpcMessenger used by the embedder to send RPC // messages from the remoting protobuf specification. - std::unique_ptr<RpcBroker> broker_; + std::unique_ptr<RpcMessenger> rpc_messenger_; }; } // namespace cast diff --git a/cast/streaming/rpc_broker.cc b/cast/streaming/rpc_messenger.cc index 58379d79..54b2e279 100644 --- a/cast/streaming/rpc_broker.cc +++ b/cast/streaming/rpc_messenger.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "cast/streaming/rpc_broker.h" +#include "cast/streaming/rpc_messenger.h" #include <memory> #include <string> @@ -37,25 +37,25 @@ std::ostream& operator<<(std::ostream& out, const RpcMessage& message) { } // namespace -constexpr RpcBroker::Handle RpcBroker::kInvalidHandle; -constexpr RpcBroker::Handle RpcBroker::kAcquireRendererHandle; -constexpr RpcBroker::Handle RpcBroker::kAcquireDemuxerHandle; -constexpr RpcBroker::Handle RpcBroker::kFirstHandle; +constexpr RpcMessenger::Handle RpcMessenger::kInvalidHandle; +constexpr RpcMessenger::Handle RpcMessenger::kAcquireRendererHandle; +constexpr RpcMessenger::Handle RpcMessenger::kAcquireDemuxerHandle; +constexpr RpcMessenger::Handle RpcMessenger::kFirstHandle; -RpcBroker::RpcBroker(SendMessageCallback send_message_cb) +RpcMessenger::RpcMessenger(SendMessageCallback send_message_cb) : next_handle_(kFirstHandle), send_message_cb_(std::move(send_message_cb)) {} -RpcBroker::~RpcBroker() { +RpcMessenger::~RpcMessenger() { receive_callbacks_.clear(); } -RpcBroker::Handle RpcBroker::GetUniqueHandle() { +RpcMessenger::Handle RpcMessenger::GetUniqueHandle() { return next_handle_++; } -void RpcBroker::RegisterMessageReceiverCallback( - RpcBroker::Handle handle, +void RpcMessenger::RegisterMessageReceiverCallback( + RpcMessenger::Handle handle, ReceiveMessageCallback callback) { OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end()) << "must deregister before re-registering"; @@ -63,12 +63,12 @@ void RpcBroker::RegisterMessageReceiverCallback( receive_callbacks_.emplace_back(handle, std::move(callback)); } -void RpcBroker::UnregisterMessageReceiverCallback(RpcBroker::Handle handle) { +void RpcMessenger::UnregisterMessageReceiverCallback(RpcMessenger::Handle handle) { OSP_DVLOG << "unregistering handle: " << handle; receive_callbacks_.erase_key(handle); } -void RpcBroker::ProcessMessageFromRemote(const uint8_t* message, +void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message, std::size_t message_len) { auto rpc = std::make_unique<RpcMessage>(); if (!rpc->ParseFromArray(message, message_len)) { @@ -86,14 +86,14 @@ void RpcBroker::ProcessMessageFromRemote(const uint8_t* message, entry->second(std::move(rpc)); } -void RpcBroker::SendMessageToRemote(const RpcMessage& rpc) { +void RpcMessenger::SendMessageToRemote(const RpcMessage& rpc) { OSP_DVLOG << "Sending RPC message: " << rpc; std::vector<uint8_t> message(rpc.ByteSizeLong()); rpc.SerializeToArray(message.data(), message.size()); send_message_cb_(std::move(message)); } -bool RpcBroker::IsRegisteredForTesting(RpcBroker::Handle handle) { +bool RpcMessenger::IsRegisteredForTesting(RpcMessenger::Handle handle) { return receive_callbacks_.find(handle) != receive_callbacks_.end(); } diff --git a/cast/streaming/rpc_broker.h b/cast/streaming/rpc_messenger.h index 3cccec83..c997a7b9 100644 --- a/cast/streaming/rpc_broker.h +++ b/cast/streaming/rpc_messenger.h @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef CAST_STREAMING_RPC_BROKER_H_ -#define CAST_STREAMING_RPC_BROKER_H_ +#ifndef CAST_STREAMING_RPC_MESSENGER_H_ +#define CAST_STREAMING_RPC_MESSENGER_H_ #include <memory> #include <string> @@ -24,23 +24,23 @@ namespace cast { // value in the existing RPC communication channel using the special handles // |kAcquire*Handle|. // -// NOTE: RpcBroker doesn't actually send RPC messages to the remote. The session +// NOTE: RpcMessenger doesn't actually send RPC messages to the remote. The session // messenger needs to set SendMessageCallback, and call ProcessMessageFromRemote -// as appropriate. The RpcBroker then distributes each RPC message to the +// as appropriate. The RpcMessenger then distributes each RPC message to the // subscribed component. -class RpcBroker { +class RpcMessenger { public: using Handle = int; using ReceiveMessageCallback = std::function<void(std::unique_ptr<RpcMessage>)>; using SendMessageCallback = std::function<void(std::vector<uint8_t>)>; - explicit RpcBroker(SendMessageCallback send_message_cb); - RpcBroker(const RpcBroker&) = delete; - RpcBroker(RpcBroker&&) noexcept; - RpcBroker& operator=(const RpcBroker&) = delete; - RpcBroker& operator=(RpcBroker&&); - ~RpcBroker(); + explicit RpcMessenger(SendMessageCallback send_message_cb); + RpcMessenger(const RpcMessenger&) = delete; + RpcMessenger(RpcMessenger&&) noexcept; + RpcMessenger& operator=(const RpcMessenger&) = delete; + RpcMessenger& operator=(RpcMessenger&&); + ~RpcMessenger(); // Get unique handle value for RPC message handles. Handle GetUniqueHandle(); @@ -68,7 +68,7 @@ class RpcBroker { // Checks if the handle is registered for receiving messages. Test-only. bool IsRegisteredForTesting(Handle handle); - // Consumers of RPCBroker may set the send message callback post-hoc + // Consumers of RPCMessenger may set the send message callback post-hoc // in order to simulate different scenarios. void set_send_message_cb_for_testing(SendMessageCallback cb) { send_message_cb_ = std::move(cb); @@ -99,4 +99,4 @@ class RpcBroker { } // namespace cast } // namespace openscreen -#endif // CAST_STREAMING_RPC_BROKER_H_ +#endif // CAST_STREAMING_RPC_MESSENGER_H_ diff --git a/cast/streaming/rpc_broker_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc index a837285c..e0627542 100644 --- a/cast/streaming/rpc_broker_unittest.cc +++ b/cast/streaming/rpc_messenger_unittest.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "cast/streaming/rpc_broker.h" +#include "cast/streaming/rpc_messenger.h" #include <memory> #include <string> @@ -39,8 +39,8 @@ class FakeMessenger { int sent_count() const { return sent_count_; } const RpcMessage& sent_rpc() const { return sent_rpc_; } - void set_handle(RpcBroker::Handle handle) { handle_ = handle; } - RpcBroker::Handle handle() { return handle_; } + void set_handle(RpcMessenger::Handle handle) { handle_ = handle; } + RpcMessenger::Handle handle() { return handle_; } private: std::unique_ptr<RpcMessage> received_rpc_; @@ -49,25 +49,25 @@ class FakeMessenger { RpcMessage sent_rpc_; int sent_count_ = 0; - RpcBroker::Handle handle_ = -1; + RpcMessenger::Handle handle_ = -1; }; } // namespace -class RpcBrokerTest : public testing::Test { +class RpcMessengerTest : public testing::Test { protected: void SetUp() override { fake_messenger_ = std::make_unique<FakeMessenger>(); ASSERT_FALSE(fake_messenger_->received_count()); - rpc_broker_ = std::make_unique<RpcBroker>( + rpc_messenger_ = std::make_unique<RpcMessenger>( [p = fake_messenger_.get()](std::vector<uint8_t> message) { p->OnSentRpc(message); }); - const auto handle = rpc_broker_->GetUniqueHandle(); + const auto handle = rpc_messenger_->GetUniqueHandle(); fake_messenger_->set_handle(handle); - rpc_broker_->RegisterMessageReceiverCallback( + rpc_messenger_->RegisterMessageReceiverCallback( handle, [p = fake_messenger_.get()](std::unique_ptr<RpcMessage> message) { p->OnReceivedRpc(std::move(message)); @@ -77,41 +77,41 @@ class RpcBrokerTest : public testing::Test { void ProcessMessage(const RpcMessage& rpc) { std::vector<uint8_t> message(rpc.ByteSizeLong()); rpc.SerializeToArray(message.data(), message.size()); - rpc_broker_->ProcessMessageFromRemote(message.data(), message.size()); + rpc_messenger_->ProcessMessageFromRemote(message.data(), message.size()); } std::unique_ptr<FakeMessenger> fake_messenger_; - std::unique_ptr<RpcBroker> rpc_broker_; + std::unique_ptr<RpcMessenger> rpc_messenger_; }; -TEST_F(RpcBrokerTest, TestProcessMessageFromRemoteRegistered) { +TEST_F(RpcMessengerTest, TestProcessMessageFromRemoteRegistered) { RpcMessage rpc; rpc.set_handle(fake_messenger_->handle()); ProcessMessage(rpc); ASSERT_EQ(1, fake_messenger_->received_count()); } -TEST_F(RpcBrokerTest, TestProcessMessageFromRemoteUnregistered) { +TEST_F(RpcMessengerTest, TestProcessMessageFromRemoteUnregistered) { RpcMessage rpc; - rpc_broker_->UnregisterMessageReceiverCallback(fake_messenger_->handle()); + rpc_messenger_->UnregisterMessageReceiverCallback(fake_messenger_->handle()); ProcessMessage(rpc); ASSERT_EQ(0, fake_messenger_->received_count()); } -TEST_F(RpcBrokerTest, CanSendMultipleMessages) { +TEST_F(RpcMessengerTest, CanSendMultipleMessages) { for (int i = 0; i < 10; ++i) { - rpc_broker_->SendMessageToRemote(RpcMessage{}); + rpc_messenger_->SendMessageToRemote(RpcMessage{}); } EXPECT_EQ(10, fake_messenger_->sent_count()); } -TEST_F(RpcBrokerTest, SendMessageCallback) { - // Send message for RPC broker to process. +TEST_F(RpcMessengerTest, SendMessageCallback) { + // Send message for RPC messenger to process. RpcMessage sent_rpc; sent_rpc.set_handle(fake_messenger_->handle()); sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME); sent_rpc.set_double_value(2.3); - rpc_broker_->SendMessageToRemote(sent_rpc); + rpc_messenger_->SendMessageToRemote(sent_rpc); // Check if received message is identical to the one sent earlier. ASSERT_EQ(1, fake_messenger_->sent_count()); @@ -121,8 +121,8 @@ TEST_F(RpcBrokerTest, SendMessageCallback) { ASSERT_EQ(2.3, message.double_value()); } -TEST_F(RpcBrokerTest, ProcessMessageWithRegisteredHandle) { - // Send message for RPC broker to process. +TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) { + // Send message for RPC messenger to process. RpcMessage sent_rpc; sent_rpc.set_handle(fake_messenger_->handle()); sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME); @@ -137,10 +137,10 @@ TEST_F(RpcBrokerTest, ProcessMessageWithRegisteredHandle) { ASSERT_EQ(3.4, received_rpc.double_value()); } -TEST_F(RpcBrokerTest, ProcessMessageWithUnregisteredHandle) { - // Send message for RPC broker to process. +TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) { + // Send message for RPC messenger to process. RpcMessage sent_rpc; - RpcBroker::Handle different_handle = fake_messenger_->handle() + 1; + RpcMessenger::Handle different_handle = fake_messenger_->handle() + 1; sent_rpc.set_handle(different_handle); sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME); sent_rpc.set_double_value(4.5); @@ -150,12 +150,12 @@ TEST_F(RpcBrokerTest, ProcessMessageWithUnregisteredHandle) { ASSERT_EQ(0, fake_messenger_->received_count()); } -TEST_F(RpcBrokerTest, Registration) { +TEST_F(RpcMessengerTest, Registration) { const auto handle = fake_messenger_->handle(); - ASSERT_TRUE(rpc_broker_->IsRegisteredForTesting(handle)); + ASSERT_TRUE(rpc_messenger_->IsRegisteredForTesting(handle)); - rpc_broker_->UnregisterMessageReceiverCallback(handle); - ASSERT_FALSE(rpc_broker_->IsRegisteredForTesting(handle)); + rpc_messenger_->UnregisterMessageReceiverCallback(handle); + ASSERT_FALSE(rpc_messenger_->IsRegisteredForTesting(handle)); } } // namespace cast diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc index ca5ce027..bbb3972f 100644 --- a/cast/streaming/sender_session.cc +++ b/cast/streaming/sender_session.cc @@ -265,7 +265,7 @@ void SenderSession::ResetState() { current_negotiation_.reset(); current_audio_sender_.reset(); current_video_sender_.reset(); - broker_.reset(); + rpc_messenger_.reset(); } Error SenderSession::StartNegotiation( @@ -361,7 +361,7 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) { "Failed to negotiate a remoting session.")); return; } - broker_ = std::make_unique<RpcBroker>([this](std::vector<uint8_t> message) { + rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) { Error error = this->messenger_.SendOutboundMessage(SenderMessage{ SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true, std::move(message)}); @@ -373,12 +373,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) { config_.client->OnRemotingNegotiated( this, RemotingNegotiation{std::move(senders), ToCapabilities(caps), - broker_.get()}); + rpc_messenger_.get()}); } void SenderSession::OnRpcMessage(ReceiverMessage message) { - if (!broker_) { - OSP_LOG_INFO << "Received an RPC message without having an RPCBroker."; + if (!rpc_messenger_) { + OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger."; return; } @@ -390,7 +390,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) { } const auto& body = absl::get<std::vector<uint8_t>>(message.body); - broker_->ProcessMessageFromRemote(body.data(), body.size()); + rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size()); } void SenderSession::HandleErrorMessage(ReceiverMessage message, diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h index d95c61aa..7995181f 100644 --- a/cast/streaming/sender_session.h +++ b/cast/streaming/sender_session.h @@ -16,7 +16,7 @@ #include "cast/streaming/capture_recommendations.h" #include "cast/streaming/offer_messages.h" #include "cast/streaming/remoting_capabilities.h" -#include "cast/streaming/rpc_broker.h" +#include "cast/streaming/rpc_messenger.h" #include "cast/streaming/sender.h" #include "cast/streaming/sender_packet_router.h" #include "cast/streaming/session_config.h" @@ -59,8 +59,8 @@ class SenderSession final { // a version check when using these capabilities to offer remoting. RemotingCapabilities capabilities; - // The RPC broker to be used for subscribing to remoting proto messages. - RpcBroker* broker; + // The RPC messenger to be used for subscribing to remoting proto messages. + RpcMessenger* messenger; }; // The embedder should provide a client for handling negotiation events. @@ -255,9 +255,10 @@ class SenderSession final { std::unique_ptr<Sender> current_audio_sender_; std::unique_ptr<Sender> current_video_sender_; - // If remoting, we store the RpcBroker used by the embedder to send RPC - // messages from the remoting protobuf specification. - std::unique_ptr<RpcBroker> broker_; + // If remoting, we store the RpcMessenger used by the embedder to send RPC + // messages from the remoting protobuf specification. For more information, + // see //cast/streaming/remoting.proto. + std::unique_ptr<RpcMessenger> rpc_messenger_; }; // namespace cast } // namespace cast diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc index c5231633..36b1bae8 100644 --- a/cast/streaming/sender_session_unittest.cc +++ b/cast/streaming/sender_session_unittest.cc @@ -543,11 +543,11 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) { EXPECT_THAT(negotiation.capabilities.video, testing::ElementsAre(VideoCapability::kVp8)); - // The broker is tested elsewhere, but we can sanity check that we got a valid + // The messenger is tested elsewhere, but we can sanity check that we got a valid // one here. - EXPECT_TRUE(negotiation.broker); - const RpcBroker::Handle handle = negotiation.broker->GetUniqueHandle(); - EXPECT_NE(RpcBroker::kInvalidHandle, handle); + EXPECT_TRUE(negotiation.messenger); + const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle(); + EXPECT_NE(RpcMessenger::kInvalidHandle, handle); } } // namespace cast |