diff options
author | Jordan Bayles <jophba@chromium.org> | 2021-06-10 21:06:54 -0700 |
---|---|---|
committer | Openscreen LUCI CQ <openscreen-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2021-06-12 03:43:45 +0000 |
commit | a6c92a803725d46f4aa6ef46bf66a5ccd1f69961 (patch) | |
tree | eab1300a2a162db7b731ba3fde6ead51bdd2af0c /cast/streaming/rpc_messenger.cc | |
parent | 0f37d8d302ee7da5c2ed5eafb93898709b9a0d9c (diff) | |
download | openscreen-a6c92a803725d46f4aa6ef46bf66a5ccd1f69961.tar.gz |
[Cast Streaming] RpcBroker -> RpcMessenger
This patch renames Open Screen's RpcBroker to RpcMessenger, in order to avoid
confusion when discussing this implementation versus Chrome's RpcBroker
implementation--which will be going away soon.
Change-Id: I23127074db3f7d807c6f70c26c7b596b6edb503a
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2953900
Reviewed-by: Ryan Keane <rwkeane@google.com>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
Diffstat (limited to 'cast/streaming/rpc_messenger.cc')
-rw-r--r-- | cast/streaming/rpc_messenger.cc | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc new file mode 100644 index 00000000..54b2e279 --- /dev/null +++ b/cast/streaming/rpc_messenger.cc @@ -0,0 +1,101 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "cast/streaming/rpc_messenger.h" + +#include <memory> +#include <string> +#include <utility> + +#include "util/osp_logging.h" + +namespace openscreen { +namespace cast { + +namespace { + +std::ostream& operator<<(std::ostream& out, const RpcMessage& message) { + out << "handle=" << message.handle() << ", proc=" << message.proc(); + switch (message.rpc_oneof_case()) { + case RpcMessage::kIntegerValue: + return out << ", integer_value=" << message.integer_value(); + case RpcMessage::kInteger64Value: + return out << ", integer64_value=" << message.integer64_value(); + case RpcMessage::kDoubleValue: + return out << ", double_value=" << message.double_value(); + case RpcMessage::kBooleanValue: + return out << ", boolean_value=" << message.boolean_value(); + case RpcMessage::kStringValue: + return out << ", string_value=" << message.string_value(); + default: + return out << ", rpc_oneof=" << message.rpc_oneof_case(); + } + + OSP_NOTREACHED(); +} + +} // namespace + +constexpr RpcMessenger::Handle RpcMessenger::kInvalidHandle; +constexpr RpcMessenger::Handle RpcMessenger::kAcquireRendererHandle; +constexpr RpcMessenger::Handle RpcMessenger::kAcquireDemuxerHandle; +constexpr RpcMessenger::Handle RpcMessenger::kFirstHandle; + +RpcMessenger::RpcMessenger(SendMessageCallback send_message_cb) + : next_handle_(kFirstHandle), + send_message_cb_(std::move(send_message_cb)) {} + +RpcMessenger::~RpcMessenger() { + receive_callbacks_.clear(); +} + +RpcMessenger::Handle RpcMessenger::GetUniqueHandle() { + return next_handle_++; +} + +void RpcMessenger::RegisterMessageReceiverCallback( + RpcMessenger::Handle handle, + ReceiveMessageCallback callback) { + OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end()) + << "must deregister before re-registering"; + OSP_DVLOG << "registering handle: " << handle; + receive_callbacks_.emplace_back(handle, std::move(callback)); +} + +void RpcMessenger::UnregisterMessageReceiverCallback(RpcMessenger::Handle handle) { + OSP_DVLOG << "unregistering handle: " << handle; + receive_callbacks_.erase_key(handle); +} + +void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message, + std::size_t message_len) { + auto rpc = std::make_unique<RpcMessage>(); + if (!rpc->ParseFromArray(message, message_len)) { + OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message; + return; + } + OSP_DVLOG << "Received RPC message: " << *rpc; + + const auto entry = receive_callbacks_.find(rpc->handle()); + if (entry == receive_callbacks_.end()) { + OSP_DVLOG << "Dropping message due to unregistered handle: " + << rpc->handle(); + return; + } + entry->second(std::move(rpc)); +} + +void RpcMessenger::SendMessageToRemote(const RpcMessage& rpc) { + OSP_DVLOG << "Sending RPC message: " << rpc; + std::vector<uint8_t> message(rpc.ByteSizeLong()); + rpc.SerializeToArray(message.data(), message.size()); + send_message_cb_(std::move(message)); +} + +bool RpcMessenger::IsRegisteredForTesting(RpcMessenger::Handle handle) { + return receive_callbacks_.find(handle) != receive_callbacks_.end(); +} + +} // namespace cast +} // namespace openscreen |