aboutsummaryrefslogtreecommitdiff
path: root/cast
diff options
context:
space:
mode:
authorJordan Bayles <jophba@chromium.org>2021-07-01 15:13:23 -0700
committerOpenscreen LUCI CQ <openscreen-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-07-01 22:52:43 +0000
commit2e2730fe0190f72dadd631176d2f380b4a6186a8 (patch)
tree16948b153573e25eafe4dfc986de016013faeed3 /cast
parentf2d9d267c1363c42d6eef6f9190033af98fb236b (diff)
downloadopenscreen-2e2730fe0190f72dadd631176d2f380b4a6186a8.tar.gz
[Cast Streaming] Add codec negotiation over RPC
This patch adds support for negotiating codec information over the RPC messenger class for both the standalone sender and receiver. Although a full remoting implementation in the standalone implementations is not a goal and not particularly practical, it is important that we exercise the public remoting APIs and get basic usage and flows handled in our library. To that end, this patch specifically adds support for sending RPC_DS_INITIALIZE and RPC_DS_INITIALIZE_CALLBACK messages, simulating the process of setting up a remoting media stream and demuxer in Chrome. NOTE: this does not enable "true" remoting--the standalone sender is still decoding and reencoding the video. For "true" remoting support we need the ability to unpack raw frames (probably exclusively from a WebM container that holds VP8 frames) and pass them directly. In future patches, we can also consider adding additional features to the standalone implementation, such as play/pause and volume control methods. Bug: b/190078859 Change-Id: I49c2acb537aa3c647b8c3a53c9545f5eb3893982 Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2973373 Commit-Queue: Jordan Bayles <jophba@chromium.org> Reviewed-by: mark a. foltz <mfoltz@chromium.org>
Diffstat (limited to 'cast')
-rw-r--r--cast/protocol/castv2/streaming_examples/rpc.json1
-rw-r--r--cast/protocol/castv2/streaming_schema.json30
-rw-r--r--cast/standalone_receiver/BUILD.gn2
-rw-r--r--cast/standalone_receiver/simple_remoting_receiver.cc107
-rw-r--r--cast/standalone_receiver/simple_remoting_receiver.h52
-rw-r--r--cast/standalone_receiver/streaming_playback_controller.cc56
-rw-r--r--cast/standalone_receiver/streaming_playback_controller.h5
-rw-r--r--cast/standalone_sender/BUILD.gn2
-rw-r--r--cast/standalone_sender/looping_file_cast_agent.cc31
-rw-r--r--cast/standalone_sender/looping_file_cast_agent.h21
-rw-r--r--cast/standalone_sender/remoting_sender.cc95
-rw-r--r--cast/standalone_sender/remoting_sender.h67
-rw-r--r--cast/streaming/constants.h2
-rw-r--r--cast/streaming/offer_messages.h2
-rw-r--r--cast/streaming/receiver_message.cc12
-rw-r--r--cast/streaming/receiver_session.cc20
-rw-r--r--cast/streaming/receiver_session.h9
-rw-r--r--cast/streaming/receiver_session_unittest.cc129
-rw-r--r--cast/streaming/rpc_messenger.cc3
-rw-r--r--cast/streaming/rpc_messenger_unittest.cc8
-rw-r--r--cast/streaming/sender_session.cc35
-rw-r--r--cast/streaming/sender_session.h24
-rw-r--r--cast/streaming/sender_session_unittest.cc5
-rw-r--r--cast/streaming/session_messenger.cc14
24 files changed, 650 insertions, 82 deletions
diff --git a/cast/protocol/castv2/streaming_examples/rpc.json b/cast/protocol/castv2/streaming_examples/rpc.json
index 6ebfc0e0..880ca641 100644
--- a/cast/protocol/castv2/streaming_examples/rpc.json
+++ b/cast/protocol/castv2/streaming_examples/rpc.json
@@ -1,5 +1,4 @@
{
- "seqNum": 12345,
"sessionId": 735189,
"type": "RPC",
"result": "ok",
diff --git a/cast/protocol/castv2/streaming_schema.json b/cast/protocol/castv2/streaming_schema.json
index e659e7bf..7044c8ea 100644
--- a/cast/protocol/castv2/streaming_schema.json
+++ b/cast/protocol/castv2/streaming_schema.json
@@ -121,10 +121,7 @@
"maxBitRate": {"type": "integer", "minimum": 300000},
"maxDelay": {"$ref": "#/definitions/delay"}
},
- "required": [
- "maxDimensions",
- "maxBitRate"
- ]
+ "required": ["maxDimensions", "maxBitRate"]
},
"constraints": {
"properties": {
@@ -220,15 +217,36 @@
]
}
},
- "required": ["type", "seqNum"],
+ "required": ["type"],
"allOf": [
{
"if": {
- "properties": {"type": {"enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"]}}
+ "properties": {
+ "type": {
+ "enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"]
+ }
+ }
},
"then": {"required": ["result"]}
},
{
+ "if": {
+ "properties": {
+ "type": {
+ "enum": [
+ "OFFER",
+ "ANSWER",
+ "GET_CAPABILITIES",
+ "CAPABILITIES_RESPONSE",
+ "GET_STATUS",
+ "STATUS_RESPONSE"
+ ]
+ }
+ }
+ },
+ "then": {"required": ["seqNum"]}
+ },
+ {
"if": {"properties": {"type": {"const": "OFFER"}}},
"then": {"required": ["offer"]}
},
diff --git a/cast/standalone_receiver/BUILD.gn b/cast/standalone_receiver/BUILD.gn
index f1b5fd31..956a1a71 100644
--- a/cast/standalone_receiver/BUILD.gn
+++ b/cast/standalone_receiver/BUILD.gn
@@ -16,6 +16,8 @@ if (!build_with_chromium) {
"cast_service.h",
"mirroring_application.cc",
"mirroring_application.h",
+ "simple_remoting_receiver.cc",
+ "simple_remoting_receiver.h",
"streaming_playback_controller.cc",
"streaming_playback_controller.h",
]
diff --git a/cast/standalone_receiver/simple_remoting_receiver.cc b/cast/standalone_receiver/simple_remoting_receiver.cc
new file mode 100644
index 00000000..119be44f
--- /dev/null
+++ b/cast/standalone_receiver/simple_remoting_receiver.cc
@@ -0,0 +1,107 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "cast/standalone_receiver/simple_remoting_receiver.h"
+
+#include <utility>
+
+#include "cast/streaming/message_fields.h"
+#include "cast/streaming/remoting.pb.h"
+
+namespace openscreen {
+namespace cast {
+
+namespace {
+
+VideoCodec ParseProtoCodec(VideoDecoderConfig::Codec value) {
+ switch (value) {
+ case VideoDecoderConfig_Codec_kCodecHEVC:
+ return VideoCodec::kHevc;
+
+ case VideoDecoderConfig_Codec_kCodecH264:
+ return VideoCodec::kH264;
+
+ case VideoDecoderConfig_Codec_kCodecVP8:
+ return VideoCodec::kVp8;
+
+ case VideoDecoderConfig_Codec_kCodecVP9:
+ return VideoCodec::kVp9;
+
+ default:
+ return VideoCodec::kNotSpecified;
+ }
+}
+
+AudioCodec ParseProtoCodec(AudioDecoderConfig::Codec value) {
+ switch (value) {
+ case AudioDecoderConfig_Codec_kCodecAAC:
+ return AudioCodec::kAac;
+
+ case AudioDecoderConfig_Codec_kCodecOpus:
+ return AudioCodec::kOpus;
+
+ default:
+ return AudioCodec::kNotSpecified;
+ }
+}
+
+} // namespace
+
+SimpleRemotingReceiver::SimpleRemotingReceiver(RpcMessenger* messenger)
+ : messenger_(messenger) {
+ messenger_->RegisterMessageReceiverCallback(
+ RpcMessenger::kFirstHandle, [this](std::unique_ptr<RpcMessage> message) {
+ this->OnInitializeCallbackMessage(std::move(message));
+ });
+}
+
+SimpleRemotingReceiver::~SimpleRemotingReceiver() {
+ messenger_->UnregisterMessageReceiverCallback(RpcMessenger::kFirstHandle);
+}
+
+void SimpleRemotingReceiver::SendInitializeMessage(
+ SimpleRemotingReceiver::InitializeCallback initialize_cb) {
+ initialize_cb_ = std::move(initialize_cb);
+
+ OSP_DVLOG
+ << "Indicating to the sender we are ready for remoting initialization.";
+ openscreen::cast::RpcMessage rpc;
+ rpc.set_handle(RpcMessenger::kAcquireRendererHandle);
+ rpc.set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE);
+
+ // The initialize message contains the handle to be used for sending the
+ // initialization callback message.
+ rpc.set_integer_value(RpcMessenger::kFirstHandle);
+ messenger_->SendMessageToRemote(rpc);
+}
+
+void SimpleRemotingReceiver::OnInitializeCallbackMessage(
+ std::unique_ptr<RpcMessage> message) {
+ OSP_DCHECK(message->proc() == RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
+ if (!initialize_cb_) {
+ OSP_DLOG_INFO << "Received an initialization callback message but no "
+ "callback was set.";
+ return;
+ }
+
+ const DemuxerStreamInitializeCallback& callback_message =
+ message->demuxerstream_initializecb_rpc();
+ const auto audio_codec =
+ callback_message.has_audio_decoder_config()
+ ? ParseProtoCodec(callback_message.audio_decoder_config().codec())
+ : AudioCodec::kNotSpecified;
+ const auto video_codec =
+ callback_message.has_video_decoder_config()
+ ? ParseProtoCodec(callback_message.video_decoder_config().codec())
+ : VideoCodec::kNotSpecified;
+
+ OSP_DLOG_INFO << "Initializing remoting with audio codec "
+ << CodecToString(audio_codec) << " and video codec "
+ << CodecToString(video_codec);
+ initialize_cb_(audio_codec, video_codec);
+ initialize_cb_ = nullptr;
+}
+
+} // namespace cast
+} // namespace openscreen
diff --git a/cast/standalone_receiver/simple_remoting_receiver.h b/cast/standalone_receiver/simple_remoting_receiver.h
new file mode 100644
index 00000000..52fe131f
--- /dev/null
+++ b/cast/standalone_receiver/simple_remoting_receiver.h
@@ -0,0 +1,52 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
+#define CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
+
+#include <functional>
+#include <memory>
+
+#include "cast/streaming/constants.h"
+#include "cast/streaming/rpc_messenger.h"
+
+namespace openscreen {
+namespace cast {
+
+// This class behaves like a pared-down version of Chrome's DemuxerStreamAdapter
+// (see
+// https://source.chromium.org/chromium/chromium/src/+/main:/media/remoting/demuxer_stream_adapter.h
+// ). Instead of providing a full adapter implementation, it just provides a
+// callback register that can be used to notify a component when the
+// RemotingProvider sends an initialization message with audio and video codec
+// information.
+//
+// Due to the sheer complexity of remoting, we don't have a fully functional
+// implementation of remoting in the standalone_* components, instead Chrome is
+// the reference implementation and we have these simple classes to exercise
+// the public APIs.
+class SimpleRemotingReceiver {
+ public:
+ explicit SimpleRemotingReceiver(RpcMessenger* messenger);
+ ~SimpleRemotingReceiver();
+
+ // The flow here closely mirrors the remoting.proto. The standalone receiver
+ // indicates it is ready for initialization by calling
+ // |SendInitializeMessage|, then this class sends an initialize message to the
+ // sender. The sender then replies with an initialization message containing
+ // configurations, which is passed to |initialize_cb|.
+ using InitializeCallback = std::function<void(AudioCodec, VideoCodec)>;
+ void SendInitializeMessage(InitializeCallback initialize_cb);
+
+ private:
+ void OnInitializeCallbackMessage(std::unique_ptr<RpcMessage> message);
+
+ RpcMessenger* messenger_;
+ InitializeCallback initialize_cb_;
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
diff --git a/cast/standalone_receiver/streaming_playback_controller.cc b/cast/standalone_receiver/streaming_playback_controller.cc
index 2a39ab8e..46ee0bbb 100644
--- a/cast/standalone_receiver/streaming_playback_controller.cc
+++ b/cast/standalone_receiver/streaming_playback_controller.cc
@@ -57,6 +57,42 @@ void StreamingPlaybackController::OnNegotiated(
const ReceiverSession* session,
ReceiverSession::ConfiguredReceivers receivers) {
TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneReceiver);
+ Initialize(receivers);
+}
+
+void StreamingPlaybackController::OnRemotingNegotiated(
+ const ReceiverSession* session,
+ ReceiverSession::RemotingNegotiation negotiation) {
+ remoting_receiver_ =
+ std::make_unique<SimpleRemotingReceiver>(negotiation.messenger);
+ remoting_receiver_->SendInitializeMessage(
+ [this, receivers = negotiation.receivers](AudioCodec audio_codec,
+ VideoCodec video_codec) {
+ // The configurations in |negotiation| do not have the actual codecs,
+ // only REMOTE_AUDIO and REMOTE_VIDEO. Once we receive the
+ // initialization callback method, we can override with the actual
+ // codecs here.
+ auto mutable_receivers = receivers;
+ mutable_receivers.audio_config.codec = audio_codec;
+ mutable_receivers.video_config.codec = video_codec;
+ Initialize(mutable_receivers);
+ });
+}
+
+void StreamingPlaybackController::OnReceiversDestroying(
+ const ReceiverSession* session,
+ ReceiversDestroyingReason reason) {
+ audio_player_.reset();
+ video_player_.reset();
+}
+
+void StreamingPlaybackController::OnError(const ReceiverSession* session,
+ Error error) {
+ client_->OnPlaybackError(this, error);
+}
+
+void StreamingPlaybackController::Initialize(
+ ReceiverSession::ConfiguredReceivers receivers) {
#if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
if (receivers.audio_receiver) {
audio_player_ = std::make_unique<SDLAudioPlayer>(
@@ -83,25 +119,5 @@ void StreamingPlaybackController::OnNegotiated(
#endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
}
-void StreamingPlaybackController::OnRemotingNegotiated(
- const ReceiverSession* session,
- ReceiverSession::RemotingNegotiation negotiation) {
- // TODO(issuetracker.google.com/190078859): need ability to initialize
- // remoting codecs.
- OSP_UNIMPLEMENTED();
-}
-
-void StreamingPlaybackController::OnReceiversDestroying(
- const ReceiverSession* session,
- ReceiversDestroyingReason reason) {
- audio_player_.reset();
- video_player_.reset();
-}
-
-void StreamingPlaybackController::OnError(const ReceiverSession* session,
- Error error) {
- client_->OnPlaybackError(this, error);
-}
-
} // namespace cast
} // namespace openscreen
diff --git a/cast/standalone_receiver/streaming_playback_controller.h b/cast/standalone_receiver/streaming_playback_controller.h
index ea288046..65a233e5 100644
--- a/cast/standalone_receiver/streaming_playback_controller.h
+++ b/cast/standalone_receiver/streaming_playback_controller.h
@@ -7,6 +7,7 @@
#include <memory>
+#include "cast/standalone_receiver/simple_remoting_receiver.h"
#include "cast/streaming/receiver_session.h"
#include "platform/impl/task_runner.h"
@@ -46,6 +47,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client {
TaskRunner* const task_runner_;
StreamingPlaybackController::Client* client_;
+ void Initialize(ReceiverSession::ConfiguredReceivers receivers);
+
#if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
// NOTE: member ordering is important, since the sub systems must be
// first-constructed, last-destroyed. Make sure any new SDL related
@@ -62,6 +65,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client {
std::unique_ptr<DummyPlayer> audio_player_;
std::unique_ptr<DummyPlayer> video_player_;
#endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
+
+ std::unique_ptr<SimpleRemotingReceiver> remoting_receiver_;
};
} // namespace cast
diff --git a/cast/standalone_sender/BUILD.gn b/cast/standalone_sender/BUILD.gn
index afa73d7d..780e0609 100644
--- a/cast/standalone_sender/BUILD.gn
+++ b/cast/standalone_sender/BUILD.gn
@@ -49,6 +49,8 @@ if (!build_with_chromium) {
"looping_file_sender.h",
"receiver_chooser.cc",
"receiver_chooser.h",
+ "remoting_sender.cc",
+ "remoting_sender.h",
"simulated_capturer.cc",
"simulated_capturer.h",
"streaming_opus_encoder.cc",
diff --git a/cast/standalone_sender/looping_file_cast_agent.cc b/cast/standalone_sender/looping_file_cast_agent.cc
index ce9826de..bfb970bf 100644
--- a/cast/standalone_sender/looping_file_cast_agent.cc
+++ b/cast/standalone_sender/looping_file_cast_agent.cc
@@ -270,6 +270,10 @@ void LoopingFileCastAgent::CreateAndStartSession() {
OSP_VLOG << "Starting session negotiation.";
Error negotiation_error;
if (connection_settings_->use_remoting) {
+ remoting_sender_ = std::make_unique<RemotingSender>(
+ current_session_->rpc_messenger(), AudioCodec::kOpus, VideoCodec::kVp8,
+ [this]() { OnRemotingReceiverReady(); });
+
negotiation_error =
current_session_->NegotiateRemoting(audio_config, video_config);
} else {
@@ -298,17 +302,17 @@ void LoopingFileCastAgent::OnNegotiated(
void LoopingFileCastAgent::OnRemotingNegotiated(
const SenderSession* session,
SenderSession::RemotingNegotiation negotiation) {
- // TODO(jophba): this needs to be hashed out as part of
- // figuring out the embedder workflow.
if (negotiation.senders.audio_sender == nullptr &&
negotiation.senders.video_sender == nullptr) {
OSP_LOG_ERROR << "Missing both audio and video, so exiting...";
return;
}
- file_sender_ = std::make_unique<LoopingFileSender>(
- environment_.get(), connection_settings_.value(), session,
- std::move(negotiation.senders), [this]() { shutdown_callback_(); });
+ current_negotiation_ =
+ std::make_unique<SenderSession::RemotingNegotiation>(negotiation);
+ if (is_ready_for_remoting_) {
+ StartRemotingSenders();
+ }
}
void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
@@ -316,6 +320,23 @@ void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
Shutdown();
}
+void LoopingFileCastAgent::OnRemotingReceiverReady() {
+ is_ready_for_remoting_ = true;
+ if (current_negotiation_) {
+ StartRemotingSenders();
+ }
+}
+
+void LoopingFileCastAgent::StartRemotingSenders() {
+ OSP_DCHECK(current_negotiation_);
+ file_sender_ = std::make_unique<LoopingFileSender>(
+ environment_.get(), connection_settings_.value(), current_session_.get(),
+ std::move(current_negotiation_->senders),
+ [this]() { shutdown_callback_(); });
+ current_negotiation_.reset();
+ is_ready_for_remoting_ = false;
+}
+
void LoopingFileCastAgent::Shutdown() {
TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
diff --git a/cast/standalone_sender/looping_file_cast_agent.h b/cast/standalone_sender/looping_file_cast_agent.h
index cff0f6ba..895d1324 100644
--- a/cast/standalone_sender/looping_file_cast_agent.h
+++ b/cast/standalone_sender/looping_file_cast_agent.h
@@ -21,6 +21,7 @@
#include "cast/sender/public/sender_socket_factory.h"
#include "cast/standalone_sender/connection_settings.h"
#include "cast/standalone_sender/looping_file_sender.h"
+#include "cast/standalone_sender/remoting_sender.h"
#include "cast/streaming/environment.h"
#include "cast/streaming/sender_session.h"
#include "platform/api/scoped_wake_lock.h"
@@ -134,6 +135,15 @@ class LoopingFileCastAgent final
SenderSession::RemotingNegotiation negotiation) override;
void OnError(const SenderSession* session, Error error) override;
+ // Callback for when the RemotingSender indicates that the receiver
+ // is ready.
+ void OnRemotingReceiverReady();
+
+ // Starts the remoting sender. This may occur when remoting is "ready" if the
+ // session is already negotiated, or upon session negotiation if the receiver
+ // is already ready.
+ void StartRemotingSenders();
+
// Helper for stopping the current session, and/or unwinding a remote
// connection request (pre-session). This ensures LoopingFileCastAgent is in a
// terminal shutdown state.
@@ -168,6 +178,17 @@ class LoopingFileCastAgent final
std::unique_ptr<Environment> environment_;
std::unique_ptr<SenderSession> current_session_;
std::unique_ptr<LoopingFileSender> file_sender_;
+
+ // Remoting specific member variables.
+ std::unique_ptr<RemotingSender> remoting_sender_;
+
+ // Set when remoting is successfully negotiated. However, remoting streams
+ // won't start until |is_ready_for_remoting_| is true.
+ std::unique_ptr<SenderSession::RemotingNegotiation> current_negotiation_;
+
+ // Set to true when the remoting receiver is ready. However, remoting streams
+ // won't start until remoting is successfully negotiated.
+ bool is_ready_for_remoting_ = false;
};
} // namespace cast
diff --git a/cast/standalone_sender/remoting_sender.cc b/cast/standalone_sender/remoting_sender.cc
new file mode 100644
index 00000000..1566f0a7
--- /dev/null
+++ b/cast/standalone_sender/remoting_sender.cc
@@ -0,0 +1,95 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "cast/standalone_sender/remoting_sender.h"
+
+#include <utility>
+
+#include "cast/streaming/message_fields.h"
+
+namespace openscreen {
+namespace cast {
+
+namespace {
+
+VideoDecoderConfig::Codec ToProtoCodec(VideoCodec value) {
+ switch (value) {
+ case VideoCodec::kHevc:
+ return VideoDecoderConfig_Codec_kCodecHEVC;
+ case VideoCodec::kH264:
+ return VideoDecoderConfig_Codec_kCodecH264;
+ case VideoCodec::kVp8:
+ return VideoDecoderConfig_Codec_kCodecVP8;
+ case VideoCodec::kVp9:
+ return VideoDecoderConfig_Codec_kCodecVP9;
+ default:
+ return VideoDecoderConfig_Codec_kUnknownVideoCodec;
+ }
+}
+
+AudioDecoderConfig::Codec ToProtoCodec(AudioCodec value) {
+ switch (value) {
+ case AudioCodec::kAac:
+ return AudioDecoderConfig_Codec_kCodecAAC;
+ case AudioCodec::kOpus:
+ return AudioDecoderConfig_Codec_kCodecOpus;
+ default:
+ return AudioDecoderConfig_Codec_kUnknownAudioCodec;
+ }
+}
+
+} // namespace
+
+RemotingSender::RemotingSender(RpcMessenger* messenger,
+ AudioCodec audio_codec,
+ VideoCodec video_codec,
+ ReadyCallback ready_cb)
+ : messenger_(messenger),
+ audio_codec_(audio_codec),
+ video_codec_(video_codec),
+ ready_cb_(std::move(ready_cb)) {
+ messenger_->RegisterMessageReceiverCallback(
+ RpcMessenger::kAcquireRendererHandle,
+ [this](std::unique_ptr<RpcMessage> message) {
+ OSP_DCHECK(message);
+ this->OnInitializeMessage(*message);
+ });
+}
+
+RemotingSender::~RemotingSender() {
+ messenger_->UnregisterMessageReceiverCallback(
+ RpcMessenger::kAcquireRendererHandle);
+}
+
+void RemotingSender::OnInitializeMessage(const RpcMessage& message) {
+ receiver_handle_ = message.integer_value();
+
+ RpcMessage callback_message;
+ callback_message.set_handle(receiver_handle_);
+ callback_message.set_proc(RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
+
+ auto* callback_body =
+ callback_message.mutable_demuxerstream_initializecb_rpc();
+
+ // In Chrome, separate calls are used for the audio and video configs, but
+ // for simplicity's sake we combine them here.
+ callback_body->mutable_audio_decoder_config()->set_codec(
+ ToProtoCodec(audio_codec_));
+ callback_body->mutable_video_decoder_config()->set_codec(
+ ToProtoCodec(video_codec_));
+
+ OSP_DLOG_INFO << "Initializing receiver handle " << receiver_handle_
+ << " with audio codec " << CodecToString(audio_codec_)
+ << " and video codec " << CodecToString(video_codec_);
+ messenger_->SendMessageToRemote(callback_message);
+
+ if (ready_cb_) {
+ ready_cb_();
+ } else {
+ OSP_DLOG_INFO << "Received a ready message, but no ready callback.";
+ }
+}
+
+} // namespace cast
+} // namespace openscreen
diff --git a/cast/standalone_sender/remoting_sender.h b/cast/standalone_sender/remoting_sender.h
new file mode 100644
index 00000000..d331009b
--- /dev/null
+++ b/cast/standalone_sender/remoting_sender.h
@@ -0,0 +1,67 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
+#define CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
+
+#include <memory>
+
+#include "cast/streaming/constants.h"
+#include "cast/streaming/rpc_messenger.h"
+
+namespace openscreen {
+namespace cast {
+
+// This class behaves like a pared-down version of Chrome's StreamProvider (see
+// https://source.chromium.org/chromium/chromium/src/+/main:media/remoting/stream_provider.h
+// ). Instead of fully managing a media::DemuxerStream however, it just provides
+// an RPC initialization routine that notifies the standalone receiver's
+// SimpleRemotingReceiver instance (if configured) that initialization has been
+// complete and what codecs were selected.
+//
+// Due to the sheer complexity of remoting, we don't have a fully functional
+// implementation of remoting in the standalone_* components, instead Chrome is
+// the reference implementation and we have these simple classes to exercise
+// the public APIs.
+class RemotingSender {
+ public:
+ using ReadyCallback = std::function<void()>;
+ RemotingSender(RpcMessenger* messenger,
+ AudioCodec audio_codec,
+ VideoCodec video_codec,
+ ReadyCallback ready_cb);
+ ~RemotingSender();
+
+ private:
+ // When the receiver indicates that it is ready for initialization, it will
+ // The receiver sends us an "initialization" message that we respond to
+ // here with an "initialization callback" message that contains codec
+ // information.
+ void OnInitializeMessage(const RpcMessage& message);
+
+ // The messenger is the only caller of OnInitializeMessage, so there are no
+ // lifetime concerns. However, if this class outlives |messenger_|, it will
+ // no longer receive initialization messages.
+ RpcMessenger* messenger_;
+
+ // Unlike in Chrome, here we should know the video and audio codecs before any
+ // of the remoting code gets set up, and for simplicity's sake we can only
+ // populate the AudioDecoderConfig and VideoDecoderConfig objects with the
+ // codecs and use the rest of the fields as-is from the OFFER/ANSWER exchange.
+ const AudioCodec audio_codec_;
+ const VideoCodec video_codec_;
+
+ // The callback method to be called once we get the initialization message
+ // from the receiver.
+ ReadyCallback ready_cb_;
+
+ // The initialization message from the receiver contains the handle the
+ // callback should go to.
+ RpcMessenger::Handle receiver_handle_ = RpcMessenger::kInvalidHandle;
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
diff --git a/cast/streaming/constants.h b/cast/streaming/constants.h
index c640796d..668b6bca 100644
--- a/cast/streaming/constants.h
+++ b/cast/streaming/constants.h
@@ -111,6 +111,8 @@ constexpr int kSupportedRemotingVersion = 2;
enum class AudioCodec { kAac, kOpus, kNotSpecified };
enum class VideoCodec { kH264, kVp8, kHevc, kVp9, kNotSpecified };
+enum class CastMode : uint8_t { kMirroring, kRemoting };
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/offer_messages.h b/cast/streaming/offer_messages.h
index c4899eec..c2be5bfa 100644
--- a/cast/streaming/offer_messages.h
+++ b/cast/streaming/offer_messages.h
@@ -97,8 +97,6 @@ struct VideoStream {
std::string error_recovery_mode = {};
};
-enum class CastMode : uint8_t { kMirroring, kRemoting };
-
struct Offer {
// TODO(jophba): remove deprecated declaration in a separate patch.
static ErrorOr<Offer> Parse(const Json::Value& root);
diff --git a/cast/streaming/receiver_message.cc b/cast/streaming/receiver_message.cc
index e1af610c..4fb1a8a6 100644
--- a/cast/streaming/receiver_message.cc
+++ b/cast/streaming/receiver_message.cc
@@ -128,10 +128,8 @@ Json::Value ReceiverCapability::ToJson() const {
// static
ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
ReceiverMessage message;
- if (!value ||
- !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
- return Error(Error::Code::kJsonParseError,
- "Failed to parse sequence number");
+ if (!value) {
+ return Error(Error::Code::kJsonParseError, "Invalid message body");
}
std::string result;
@@ -184,6 +182,12 @@ ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
break;
}
+ if (message.type != ReceiverMessage::Type::kRpc &&
+ !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
+ message.sequence_number = -1;
+ message.valid = false;
+ }
+
return message;
}
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index 36a47bca..056bbabf 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -240,6 +240,10 @@ ReceiverSession::ReceiverSession(Client* const client,
[this](SenderMessage message) {
OnCapabilitiesRequest(std::move(message));
});
+ messenger_.SetHandler(SenderMessage::Type::kRpc,
+ [this](SenderMessage message) {
+ this->OnRpcMessage(std::move(message));
+ });
environment_->SetSocketSubscriber(this);
}
@@ -354,6 +358,21 @@ void ReceiverSession::OnCapabilitiesRequest(SenderMessage message) {
}
}
+void ReceiverSession::OnRpcMessage(SenderMessage message) {
+ if (!message.valid) {
+ OSP_DLOG_WARN
+ << "Bad RPC message. This may or may not represent a serious problem.";
+ return;
+ }
+
+ const auto& body = absl::get<std::vector<uint8_t>>(message.body);
+ if (!rpc_messenger_) {
+ OSP_DLOG_INFO << "Received an RPC message without having a messenger.";
+ return;
+ }
+ rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+}
+
void ReceiverSession::SelectStreams(const Offer& offer,
SessionProperties* properties) {
if (offer.cast_mode == CastMode::kMirroring) {
@@ -465,6 +484,7 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
client_->OnReceiversDestroying(this, reason);
current_audio_receiver_.reset();
current_video_receiver_.reset();
+ rpc_messenger_.reset();
}
}
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index fe4f9481..d928da5e 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -67,10 +67,16 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// once we get a remoting request from a Sender.
struct RemotingNegotiation {
// The configured receivers set to be used for handling audio and
- // video streams.
+ // video streams. Unlike in the general streaming case, when we are remoting
+ // we don't know the codec and other information about the stream until
+ // the sender provices that information through the
+ // DemuxerStreamInitializeCallback RPC method.
ConfiguredReceivers receivers;
// The RPC messenger to be used for subscribing to remoting proto messages.
+ // Unlike the SenderSession API, the RPC messenger is negotiation specific.
+ // The messenger is torn down when |OnReceiversDestroying| is called, and
+ // is owned by the ReceiverSession.
RpcMessenger* messenger;
};
@@ -303,6 +309,7 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// Specific message type handler methods.
void OnOffer(SenderMessage message);
void OnCapabilitiesRequest(SenderMessage message);
+ void OnRpcMessage(SenderMessage message);
// Selects streams from an offer based on its configuration, and sets
// them in the session properties.
diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc
index 25b567ee..b8d64afa 100644
--- a/cast/streaming/receiver_session_unittest.cc
+++ b/cast/streaming/receiver_session_unittest.cc
@@ -94,6 +94,48 @@ constexpr char kValidOfferMessage[] = R"({
}
})";
+constexpr char kValidRemotingOfferMessage[] = R"({
+ "type": "OFFER",
+ "seqNum": 419,
+ "offer": {
+ "castMode": "remoting",
+ "supportedStreams": [
+ {
+ "index": 31339,
+ "type": "video_source",
+ "codecName": "REMOTE_VIDEO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 127,
+ "ssrc": 19088745,
+ "maxFrameRate": "60000/1000",
+ "timeBase": "1/90000",
+ "maxBitRate": 5432101,
+ "aesKey": "040d756791711fd3adb939066e6d8690",
+ "aesIvMask": "9ff0f022a959150e70a2d05a6c184aed",
+ "resolutions": [
+ {
+ "width": 1920,
+ "height":1080
+ }
+ ]
+ },
+ {
+ "index": 31340,
+ "type": "audio_source",
+ "codecName": "REMOTE_AUDIO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 97,
+ "ssrc": 19088747,
+ "bitRate": 125000,
+ "timeBase": "1/48000",
+ "channels": 2,
+ "aesKey": "51027e4e2347cbcb49d57ef10177aebc",
+ "aesIvMask": "7f12a19be62a36c04ae4116caaeff6d1"
+ }
+ ]
+ }
+})";
+
constexpr char kNoAudioOfferMessage[] = R"({
"type": "OFFER",
"seqNum": 1337,
@@ -244,6 +286,12 @@ constexpr char kGetCapabilitiesMessage[] = R"({
"type": "GET_CAPABILITIES"
})";
+constexpr char kRpcMessage[] = R"({
+ "rpc" : "CGQQnBiCGQgSAggMGgIIBg==",
+ "seqNum" : 2,
+ "type" : "RPC"
+})";
+
class FakeClient : public ReceiverSession::Client {
public:
MOCK_METHOD(void,
@@ -251,6 +299,10 @@ class FakeClient : public ReceiverSession::Client {
(const ReceiverSession*, ReceiverSession::ConfiguredReceivers),
(override));
MOCK_METHOD(void,
+ OnRemotingNegotiated,
+ (const ReceiverSession*, ReceiverSession::RemotingNegotiation),
+ (override));
+ MOCK_METHOD(void,
OnReceiversDestroying,
(const ReceiverSession*, ReceiversDestroyingReason),
(override));
@@ -752,6 +804,83 @@ TEST_F(ReceiverSessionTest, ReturnsCapabilitiesWithRemotingPreferences) {
MediaCapability::k4k));
}
+TEST_F(ReceiverSessionTest, CanNegotiateRemoting) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ InSequence s;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([](const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) {
+ const auto& cr = negotiation.receivers;
+ EXPECT_TRUE(cr.audio_receiver);
+ EXPECT_EQ(cr.audio_receiver->config().sender_ssrc, 19088747u);
+ EXPECT_EQ(cr.audio_receiver->config().receiver_ssrc, 19088748u);
+ EXPECT_EQ(cr.audio_receiver->config().channels, 2);
+ EXPECT_EQ(cr.audio_receiver->config().rtp_timebase, 48000);
+ EXPECT_EQ(cr.audio_config.codec, AudioCodec::kNotSpecified);
+
+ EXPECT_TRUE(cr.video_receiver);
+ EXPECT_EQ(cr.video_receiver->config().sender_ssrc, 19088745u);
+ EXPECT_EQ(cr.video_receiver->config().receiver_ssrc, 19088746u);
+ EXPECT_EQ(cr.video_receiver->config().channels, 1);
+ EXPECT_EQ(cr.video_receiver->config().rtp_timebase, 90000);
+ EXPECT_EQ(cr.video_config.codec, VideoCodec::kNotSpecified);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+}
+
+TEST_F(ReceiverSessionTest, HandlesRpcMessage) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ const auto& messages = message_port_->posted_messages();
+ // Nothing should happen yet, the session doesn't have a messenger.
+ ASSERT_EQ(0u, messages.size());
+
+ // We don't need to fully test that the subscription model on the RpcMessenger
+ // works, but we do want to test that the ReceiverSession has properly wired
+ // the RpcMessenger up to the backing SessionMessenger and can properly
+ // handle received RPC messages.
+ InSequence s;
+ bool received_initialize_message = false;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([this, &received_initialize_message](
+ const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) mutable {
+ negotiation.messenger->RegisterMessageReceiverCallback(
+ 100, [&received_initialize_message](
+ std::unique_ptr<RpcMessage> message) mutable {
+ ASSERT_EQ(100, message->handle());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE_CALLBACK,
+ message->proc());
+ ASSERT_EQ(0, message->integer_value());
+ received_initialize_message = true;
+ });
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+ ASSERT_TRUE(received_initialize_message);
+}
+
TEST_F(ReceiverSessionTest, VideoLimitsIsSupersetOf) {
ReceiverSession::VideoLimits first{};
ReceiverSession::VideoLimits second = first;
diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc
index 54b2e279..63176f7c 100644
--- a/cast/streaming/rpc_messenger.cc
+++ b/cast/streaming/rpc_messenger.cc
@@ -72,7 +72,8 @@ void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
std::size_t message_len) {
auto rpc = std::make_unique<RpcMessage>();
if (!rpc->ParseFromArray(message, message_len)) {
- OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message;
+ OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message
+ << "\"";
return;
}
OSP_DVLOG << "Received RPC message: " << *rpc;
diff --git a/cast/streaming/rpc_messenger_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc
index e0627542..758a9036 100644
--- a/cast/streaming/rpc_messenger_unittest.cc
+++ b/cast/streaming/rpc_messenger_unittest.cc
@@ -125,16 +125,16 @@ TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) {
// Send message for RPC messenger to process.
RpcMessage sent_rpc;
sent_rpc.set_handle(fake_messenger_->handle());
- sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
- sent_rpc.set_double_value(3.4);
+ sent_rpc.set_proc(RpcMessage::RPC_DS_INITIALIZE);
+ sent_rpc.set_integer_value(4004);
ProcessMessage(sent_rpc);
// Checks if received message is identical to the one sent earlier.
ASSERT_EQ(1, fake_messenger_->received_count());
const RpcMessage& received_rpc = fake_messenger_->received_rpc();
ASSERT_EQ(fake_messenger_->handle(), received_rpc.handle());
- ASSERT_EQ(RpcMessage::RPC_R_SETVOLUME, received_rpc.proc());
- ASSERT_EQ(3.4, received_rpc.double_value());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE, received_rpc.proc());
+ ASSERT_EQ(4004, received_rpc.integer_value());
}
TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) {
diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc
index bbb3972f..f59f0d70 100644
--- a/cast/streaming/sender_session.cc
+++ b/cast/streaming/sender_session.cc
@@ -210,6 +210,9 @@ SenderSession::SenderSession(Configuration config)
config_.client->OnError(this, error);
},
config_.environment->task_runner()),
+ rpc_messenger_([this](std::vector<uint8_t> message) {
+ SendRpcMessage(std::move(message));
+ }),
packet_router_(config_.environment) {
OSP_DCHECK(config_.client);
OSP_DCHECK(config_.environment);
@@ -265,7 +268,6 @@ void SenderSession::ResetState() {
current_negotiation_.reset();
current_audio_sender_.reset();
current_video_sender_.reset();
- rpc_messenger_.reset();
}
Error SenderSession::StartNegotiation(
@@ -299,7 +301,7 @@ void SenderSession::OnAnswer(ReceiverMessage message) {
return;
}
- state_ = State::kMirroring;
+ state_ = State::kStreaming;
config_.client->OnNegotiated(
this, std::move(senders),
capture_recommendations::GetRecommendations(answer));
@@ -361,27 +363,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) {
"Failed to negotiate a remoting session."));
return;
}
- rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
- Error error = this->messenger_.SendOutboundMessage(SenderMessage{
- SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
- std::move(message)});
-
- if (!error.ok()) {
- OSP_LOG_WARN << "Failed to send RPC message: " << error;
- }
- });
config_.client->OnRemotingNegotiated(
- this, RemotingNegotiation{std::move(senders), ToCapabilities(caps),
- rpc_messenger_.get()});
+ this, RemotingNegotiation{std::move(senders), ToCapabilities(caps)});
}
void SenderSession::OnRpcMessage(ReceiverMessage message) {
- if (!rpc_messenger_) {
- OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger.";
- return;
- }
-
if (!message.valid) {
HandleErrorMessage(
message,
@@ -390,7 +377,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) {
}
const auto& body = absl::get<std::vector<uint8_t>>(message.body);
- rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+ rpc_messenger_.ProcessMessageFromRemote(body.data(), body.size());
}
void SenderSession::HandleErrorMessage(ReceiverMessage message,
@@ -492,5 +479,15 @@ SenderSession::ConfiguredSenders SenderSession::SpawnSenders(
return senders;
}
+void SenderSession::SendRpcMessage(std::vector<uint8_t> message_body) {
+ Error error = this->messenger_.SendOutboundMessage(SenderMessage{
+ SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
+ std::move(message_body)});
+
+ if (!error.ok()) {
+ OSP_LOG_WARN << "Failed to send RPC message: " << error;
+ }
+}
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h
index 7995181f..f7fbb97a 100644
--- a/cast/streaming/sender_session.h
+++ b/cast/streaming/sender_session.h
@@ -58,9 +58,6 @@ class SenderSession final {
// about legacy devices, such as pre-1.27 Earth receivers should do
// a version check when using these capabilities to offer remoting.
RemotingCapabilities capabilities;
-
- // The RPC messenger to be used for subscribing to remoting proto messages.
- RpcMessenger* messenger;
};
// The embedder should provide a client for handling negotiation events.
@@ -158,6 +155,11 @@ class SenderSession final {
// feedback. Embedders may use this information to throttle capture devices.
int GetEstimatedNetworkBandwidth() const;
+ // The RPC messenger for this session. NOTE: RPC messages may come at
+ // any time from the receiver, so subscriptions to RPC remoting messages
+ // should be done before calling |NegotiateRemoting|.
+ RpcMessenger* rpc_messenger() { return &rpc_messenger_; }
+
private:
// We store the current negotiation, so that when we get an answer from the
// receiver we can line up the selected streams with the original
@@ -183,7 +185,7 @@ class SenderSession final {
kIdle,
// Currently mirroring content to a receiver.
- kMirroring,
+ kStreaming,
// Currently remoting content to a receiver.
kRemoting
@@ -225,6 +227,9 @@ class SenderSession final {
// Spawn a set of configured senders from the currently stored negotiation.
ConfiguredSenders SpawnSenders(const Answer& answer);
+ // Used by the RPC messenger to send outbound messages.
+ void SendRpcMessage(std::vector<uint8_t> message_body);
+
// This session's configuration.
Configuration config_;
@@ -233,6 +238,10 @@ class SenderSession final {
// cast/protocol/castv2/streaming_schema.json.
SenderSessionMessenger messenger_;
+ // The RPC messenger, which uses the session messager for sending RPC messages
+ // and handles subscriptions to RPC messages.
+ RpcMessenger rpc_messenger_;
+
// The packet router used for RTP/RTCP messaging across all senders.
SenderPacketRouter packet_router_;
@@ -246,7 +255,7 @@ class SenderSession final {
std::unique_ptr<InProcessNegotiation> current_negotiation_;
// The current state of the session. Note that the state is intentionally
- // limited. |kMirroring| or |kRemoting| means that we are either starting
+ // limited. |kStreaming| or |kRemoting| means that we are either starting
// a negotiation or actively sending to a receiver.
State state_ = State::kIdle;
@@ -254,11 +263,6 @@ class SenderSession final {
// senders used for this session. Either or both may be nullptr.
std::unique_ptr<Sender> current_audio_sender_;
std::unique_ptr<Sender> current_video_sender_;
-
- // If remoting, we store the RpcMessenger used by the embedder to send RPC
- // messages from the remoting protobuf specification. For more information,
- // see //cast/streaming/remoting.proto.
- std::unique_ptr<RpcMessenger> rpc_messenger_;
}; // namespace cast
} // namespace cast
diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc
index 36b1bae8..bfe4cfab 100644
--- a/cast/streaming/sender_session_unittest.cc
+++ b/cast/streaming/sender_session_unittest.cc
@@ -545,8 +545,9 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) {
// The messenger is tested elsewhere, but we can sanity check that we got a valid
// one here.
- EXPECT_TRUE(negotiation.messenger);
- const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle();
+ EXPECT_TRUE(session_->rpc_messenger());
+ const RpcMessenger::Handle handle =
+ session_->rpc_messenger()->GetUniqueHandle();
EXPECT_NE(RpcMessenger::kInvalidHandle, handle);
}
diff --git a/cast/streaming/session_messenger.cc b/cast/streaming/session_messenger.cc
index 34c4c02f..389d87bb 100644
--- a/cast/streaming/session_messenger.cc
+++ b/cast/streaming/session_messenger.cc
@@ -146,13 +146,6 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
return;
}
- int sequence_number;
- if (!json::TryParseInt(message_body.value()[kSequenceNumber],
- &sequence_number)) {
- OSP_DLOG_WARN << "Received a message without a sequence number";
- return;
- }
-
// If the message is valid JSON and we don't understand it, there are two
// options: (1) it's an unknown type, or (2) the receiver filled out the
// message incorrectly. In the first case we can drop it, it's likely just
@@ -173,6 +166,13 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
}
} else {
+ int sequence_number;
+ if (!json::TryParseInt(message_body.value()[kSequenceNumber],
+ &sequence_number)) {
+ OSP_DLOG_WARN << "Received a message without a sequence number";
+ return;
+ }
+
auto it = awaiting_replies_.find(sequence_number);
if (it == awaiting_replies_.end()) {
OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "