aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cast/protocol/castv2/streaming_examples/rpc.json1
-rw-r--r--cast/protocol/castv2/streaming_schema.json30
-rw-r--r--cast/standalone_receiver/BUILD.gn2
-rw-r--r--cast/standalone_receiver/simple_remoting_receiver.cc107
-rw-r--r--cast/standalone_receiver/simple_remoting_receiver.h52
-rw-r--r--cast/standalone_receiver/streaming_playback_controller.cc56
-rw-r--r--cast/standalone_receiver/streaming_playback_controller.h5
-rw-r--r--cast/standalone_sender/BUILD.gn2
-rw-r--r--cast/standalone_sender/looping_file_cast_agent.cc31
-rw-r--r--cast/standalone_sender/looping_file_cast_agent.h21
-rw-r--r--cast/standalone_sender/remoting_sender.cc95
-rw-r--r--cast/standalone_sender/remoting_sender.h67
-rw-r--r--cast/streaming/constants.h2
-rw-r--r--cast/streaming/offer_messages.h2
-rw-r--r--cast/streaming/receiver_message.cc12
-rw-r--r--cast/streaming/receiver_session.cc20
-rw-r--r--cast/streaming/receiver_session.h9
-rw-r--r--cast/streaming/receiver_session_unittest.cc129
-rw-r--r--cast/streaming/rpc_messenger.cc3
-rw-r--r--cast/streaming/rpc_messenger_unittest.cc8
-rw-r--r--cast/streaming/sender_session.cc35
-rw-r--r--cast/streaming/sender_session.h24
-rw-r--r--cast/streaming/sender_session_unittest.cc5
-rw-r--r--cast/streaming/session_messenger.cc14
24 files changed, 650 insertions, 82 deletions
diff --git a/cast/protocol/castv2/streaming_examples/rpc.json b/cast/protocol/castv2/streaming_examples/rpc.json
index 6ebfc0e0..880ca641 100644
--- a/cast/protocol/castv2/streaming_examples/rpc.json
+++ b/cast/protocol/castv2/streaming_examples/rpc.json
@@ -1,5 +1,4 @@
{
- "seqNum": 12345,
"sessionId": 735189,
"type": "RPC",
"result": "ok",
diff --git a/cast/protocol/castv2/streaming_schema.json b/cast/protocol/castv2/streaming_schema.json
index e659e7bf..7044c8ea 100644
--- a/cast/protocol/castv2/streaming_schema.json
+++ b/cast/protocol/castv2/streaming_schema.json
@@ -121,10 +121,7 @@
"maxBitRate": {"type": "integer", "minimum": 300000},
"maxDelay": {"$ref": "#/definitions/delay"}
},
- "required": [
- "maxDimensions",
- "maxBitRate"
- ]
+ "required": ["maxDimensions", "maxBitRate"]
},
"constraints": {
"properties": {
@@ -220,15 +217,36 @@
]
}
},
- "required": ["type", "seqNum"],
+ "required": ["type"],
"allOf": [
{
"if": {
- "properties": {"type": {"enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"]}}
+ "properties": {
+ "type": {
+ "enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"]
+ }
+ }
},
"then": {"required": ["result"]}
},
{
+ "if": {
+ "properties": {
+ "type": {
+ "enum": [
+ "OFFER",
+ "ANSWER",
+ "GET_CAPABILITIES",
+ "CAPABILITIES_RESPONSE",
+ "GET_STATUS",
+ "STATUS_RESPONSE"
+ ]
+ }
+ }
+ },
+ "then": {"required": ["seqNum"]}
+ },
+ {
"if": {"properties": {"type": {"const": "OFFER"}}},
"then": {"required": ["offer"]}
},
diff --git a/cast/standalone_receiver/BUILD.gn b/cast/standalone_receiver/BUILD.gn
index f1b5fd31..956a1a71 100644
--- a/cast/standalone_receiver/BUILD.gn
+++ b/cast/standalone_receiver/BUILD.gn
@@ -16,6 +16,8 @@ if (!build_with_chromium) {
"cast_service.h",
"mirroring_application.cc",
"mirroring_application.h",
+ "simple_remoting_receiver.cc",
+ "simple_remoting_receiver.h",
"streaming_playback_controller.cc",
"streaming_playback_controller.h",
]
diff --git a/cast/standalone_receiver/simple_remoting_receiver.cc b/cast/standalone_receiver/simple_remoting_receiver.cc
new file mode 100644
index 00000000..119be44f
--- /dev/null
+++ b/cast/standalone_receiver/simple_remoting_receiver.cc
@@ -0,0 +1,107 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "cast/standalone_receiver/simple_remoting_receiver.h"
+
+#include <utility>
+
+#include "cast/streaming/message_fields.h"
+#include "cast/streaming/remoting.pb.h"
+
+namespace openscreen {
+namespace cast {
+
+namespace {
+
+VideoCodec ParseProtoCodec(VideoDecoderConfig::Codec value) {
+ switch (value) {
+ case VideoDecoderConfig_Codec_kCodecHEVC:
+ return VideoCodec::kHevc;
+
+ case VideoDecoderConfig_Codec_kCodecH264:
+ return VideoCodec::kH264;
+
+ case VideoDecoderConfig_Codec_kCodecVP8:
+ return VideoCodec::kVp8;
+
+ case VideoDecoderConfig_Codec_kCodecVP9:
+ return VideoCodec::kVp9;
+
+ default:
+ return VideoCodec::kNotSpecified;
+ }
+}
+
+AudioCodec ParseProtoCodec(AudioDecoderConfig::Codec value) {
+ switch (value) {
+ case AudioDecoderConfig_Codec_kCodecAAC:
+ return AudioCodec::kAac;
+
+ case AudioDecoderConfig_Codec_kCodecOpus:
+ return AudioCodec::kOpus;
+
+ default:
+ return AudioCodec::kNotSpecified;
+ }
+}
+
+} // namespace
+
+SimpleRemotingReceiver::SimpleRemotingReceiver(RpcMessenger* messenger)
+ : messenger_(messenger) {
+ messenger_->RegisterMessageReceiverCallback(
+ RpcMessenger::kFirstHandle, [this](std::unique_ptr<RpcMessage> message) {
+ this->OnInitializeCallbackMessage(std::move(message));
+ });
+}
+
+SimpleRemotingReceiver::~SimpleRemotingReceiver() {
+ messenger_->UnregisterMessageReceiverCallback(RpcMessenger::kFirstHandle);
+}
+
+void SimpleRemotingReceiver::SendInitializeMessage(
+ SimpleRemotingReceiver::InitializeCallback initialize_cb) {
+ initialize_cb_ = std::move(initialize_cb);
+
+ OSP_DVLOG
+ << "Indicating to the sender we are ready for remoting initialization.";
+ openscreen::cast::RpcMessage rpc;
+ rpc.set_handle(RpcMessenger::kAcquireRendererHandle);
+ rpc.set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE);
+
+ // The initialize message contains the handle to be used for sending the
+ // initialization callback message.
+ rpc.set_integer_value(RpcMessenger::kFirstHandle);
+ messenger_->SendMessageToRemote(rpc);
+}
+
+void SimpleRemotingReceiver::OnInitializeCallbackMessage(
+ std::unique_ptr<RpcMessage> message) {
+ OSP_DCHECK(message->proc() == RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
+ if (!initialize_cb_) {
+ OSP_DLOG_INFO << "Received an initialization callback message but no "
+ "callback was set.";
+ return;
+ }
+
+ const DemuxerStreamInitializeCallback& callback_message =
+ message->demuxerstream_initializecb_rpc();
+ const auto audio_codec =
+ callback_message.has_audio_decoder_config()
+ ? ParseProtoCodec(callback_message.audio_decoder_config().codec())
+ : AudioCodec::kNotSpecified;
+ const auto video_codec =
+ callback_message.has_video_decoder_config()
+ ? ParseProtoCodec(callback_message.video_decoder_config().codec())
+ : VideoCodec::kNotSpecified;
+
+ OSP_DLOG_INFO << "Initializing remoting with audio codec "
+ << CodecToString(audio_codec) << " and video codec "
+ << CodecToString(video_codec);
+ initialize_cb_(audio_codec, video_codec);
+ initialize_cb_ = nullptr;
+}
+
+} // namespace cast
+} // namespace openscreen
diff --git a/cast/standalone_receiver/simple_remoting_receiver.h b/cast/standalone_receiver/simple_remoting_receiver.h
new file mode 100644
index 00000000..52fe131f
--- /dev/null
+++ b/cast/standalone_receiver/simple_remoting_receiver.h
@@ -0,0 +1,52 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
+#define CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
+
+#include <functional>
+#include <memory>
+
+#include "cast/streaming/constants.h"
+#include "cast/streaming/rpc_messenger.h"
+
+namespace openscreen {
+namespace cast {
+
+// This class behaves like a pared-down version of Chrome's DemuxerStreamAdapter
+// (see
+// https://source.chromium.org/chromium/chromium/src/+/main:/media/remoting/demuxer_stream_adapter.h
+// ). Instead of providing a full adapter implementation, it just provides a
+// callback register that can be used to notify a component when the
+// RemotingProvider sends an initialization message with audio and video codec
+// information.
+//
+// Due to the sheer complexity of remoting, we don't have a fully functional
+// implementation of remoting in the standalone_* components, instead Chrome is
+// the reference implementation and we have these simple classes to exercise
+// the public APIs.
+class SimpleRemotingReceiver {
+ public:
+ explicit SimpleRemotingReceiver(RpcMessenger* messenger);
+ ~SimpleRemotingReceiver();
+
+ // The flow here closely mirrors the remoting.proto. The standalone receiver
+ // indicates it is ready for initialization by calling
+ // |SendInitializeMessage|, then this class sends an initialize message to the
+ // sender. The sender then replies with an initialization message containing
+ // configurations, which is passed to |initialize_cb|.
+ using InitializeCallback = std::function<void(AudioCodec, VideoCodec)>;
+ void SendInitializeMessage(InitializeCallback initialize_cb);
+
+ private:
+ void OnInitializeCallbackMessage(std::unique_ptr<RpcMessage> message);
+
+ RpcMessenger* messenger_;
+ InitializeCallback initialize_cb_;
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_
diff --git a/cast/standalone_receiver/streaming_playback_controller.cc b/cast/standalone_receiver/streaming_playback_controller.cc
index 2a39ab8e..46ee0bbb 100644
--- a/cast/standalone_receiver/streaming_playback_controller.cc
+++ b/cast/standalone_receiver/streaming_playback_controller.cc
@@ -57,6 +57,42 @@ void StreamingPlaybackController::OnNegotiated(
const ReceiverSession* session,
ReceiverSession::ConfiguredReceivers receivers) {
TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneReceiver);
+ Initialize(receivers);
+}
+
+void StreamingPlaybackController::OnRemotingNegotiated(
+ const ReceiverSession* session,
+ ReceiverSession::RemotingNegotiation negotiation) {
+ remoting_receiver_ =
+ std::make_unique<SimpleRemotingReceiver>(negotiation.messenger);
+ remoting_receiver_->SendInitializeMessage(
+ [this, receivers = negotiation.receivers](AudioCodec audio_codec,
+ VideoCodec video_codec) {
+ // The configurations in |negotiation| do not have the actual codecs,
+ // only REMOTE_AUDIO and REMOTE_VIDEO. Once we receive the
+ // initialization callback method, we can override with the actual
+ // codecs here.
+ auto mutable_receivers = receivers;
+ mutable_receivers.audio_config.codec = audio_codec;
+ mutable_receivers.video_config.codec = video_codec;
+ Initialize(mutable_receivers);
+ });
+}
+
+void StreamingPlaybackController::OnReceiversDestroying(
+ const ReceiverSession* session,
+ ReceiversDestroyingReason reason) {
+ audio_player_.reset();
+ video_player_.reset();
+}
+
+void StreamingPlaybackController::OnError(const ReceiverSession* session,
+ Error error) {
+ client_->OnPlaybackError(this, error);
+}
+
+void StreamingPlaybackController::Initialize(
+ ReceiverSession::ConfiguredReceivers receivers) {
#if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
if (receivers.audio_receiver) {
audio_player_ = std::make_unique<SDLAudioPlayer>(
@@ -83,25 +119,5 @@ void StreamingPlaybackController::OnNegotiated(
#endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
}
-void StreamingPlaybackController::OnRemotingNegotiated(
- const ReceiverSession* session,
- ReceiverSession::RemotingNegotiation negotiation) {
- // TODO(issuetracker.google.com/190078859): need ability to initialize
- // remoting codecs.
- OSP_UNIMPLEMENTED();
-}
-
-void StreamingPlaybackController::OnReceiversDestroying(
- const ReceiverSession* session,
- ReceiversDestroyingReason reason) {
- audio_player_.reset();
- video_player_.reset();
-}
-
-void StreamingPlaybackController::OnError(const ReceiverSession* session,
- Error error) {
- client_->OnPlaybackError(this, error);
-}
-
} // namespace cast
} // namespace openscreen
diff --git a/cast/standalone_receiver/streaming_playback_controller.h b/cast/standalone_receiver/streaming_playback_controller.h
index ea288046..65a233e5 100644
--- a/cast/standalone_receiver/streaming_playback_controller.h
+++ b/cast/standalone_receiver/streaming_playback_controller.h
@@ -7,6 +7,7 @@
#include <memory>
+#include "cast/standalone_receiver/simple_remoting_receiver.h"
#include "cast/streaming/receiver_session.h"
#include "platform/impl/task_runner.h"
@@ -46,6 +47,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client {
TaskRunner* const task_runner_;
StreamingPlaybackController::Client* client_;
+ void Initialize(ReceiverSession::ConfiguredReceivers receivers);
+
#if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
// NOTE: member ordering is important, since the sub systems must be
// first-constructed, last-destroyed. Make sure any new SDL related
@@ -62,6 +65,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client {
std::unique_ptr<DummyPlayer> audio_player_;
std::unique_ptr<DummyPlayer> video_player_;
#endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS)
+
+ std::unique_ptr<SimpleRemotingReceiver> remoting_receiver_;
};
} // namespace cast
diff --git a/cast/standalone_sender/BUILD.gn b/cast/standalone_sender/BUILD.gn
index afa73d7d..780e0609 100644
--- a/cast/standalone_sender/BUILD.gn
+++ b/cast/standalone_sender/BUILD.gn
@@ -49,6 +49,8 @@ if (!build_with_chromium) {
"looping_file_sender.h",
"receiver_chooser.cc",
"receiver_chooser.h",
+ "remoting_sender.cc",
+ "remoting_sender.h",
"simulated_capturer.cc",
"simulated_capturer.h",
"streaming_opus_encoder.cc",
diff --git a/cast/standalone_sender/looping_file_cast_agent.cc b/cast/standalone_sender/looping_file_cast_agent.cc
index ce9826de..bfb970bf 100644
--- a/cast/standalone_sender/looping_file_cast_agent.cc
+++ b/cast/standalone_sender/looping_file_cast_agent.cc
@@ -270,6 +270,10 @@ void LoopingFileCastAgent::CreateAndStartSession() {
OSP_VLOG << "Starting session negotiation.";
Error negotiation_error;
if (connection_settings_->use_remoting) {
+ remoting_sender_ = std::make_unique<RemotingSender>(
+ current_session_->rpc_messenger(), AudioCodec::kOpus, VideoCodec::kVp8,
+ [this]() { OnRemotingReceiverReady(); });
+
negotiation_error =
current_session_->NegotiateRemoting(audio_config, video_config);
} else {
@@ -298,17 +302,17 @@ void LoopingFileCastAgent::OnNegotiated(
void LoopingFileCastAgent::OnRemotingNegotiated(
const SenderSession* session,
SenderSession::RemotingNegotiation negotiation) {
- // TODO(jophba): this needs to be hashed out as part of
- // figuring out the embedder workflow.
if (negotiation.senders.audio_sender == nullptr &&
negotiation.senders.video_sender == nullptr) {
OSP_LOG_ERROR << "Missing both audio and video, so exiting...";
return;
}
- file_sender_ = std::make_unique<LoopingFileSender>(
- environment_.get(), connection_settings_.value(), session,
- std::move(negotiation.senders), [this]() { shutdown_callback_(); });
+ current_negotiation_ =
+ std::make_unique<SenderSession::RemotingNegotiation>(negotiation);
+ if (is_ready_for_remoting_) {
+ StartRemotingSenders();
+ }
}
void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
@@ -316,6 +320,23 @@ void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
Shutdown();
}
+void LoopingFileCastAgent::OnRemotingReceiverReady() {
+ is_ready_for_remoting_ = true;
+ if (current_negotiation_) {
+ StartRemotingSenders();
+ }
+}
+
+void LoopingFileCastAgent::StartRemotingSenders() {
+ OSP_DCHECK(current_negotiation_);
+ file_sender_ = std::make_unique<LoopingFileSender>(
+ environment_.get(), connection_settings_.value(), current_session_.get(),
+ std::move(current_negotiation_->senders),
+ [this]() { shutdown_callback_(); });
+ current_negotiation_.reset();
+ is_ready_for_remoting_ = false;
+}
+
void LoopingFileCastAgent::Shutdown() {
TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
diff --git a/cast/standalone_sender/looping_file_cast_agent.h b/cast/standalone_sender/looping_file_cast_agent.h
index cff0f6ba..895d1324 100644
--- a/cast/standalone_sender/looping_file_cast_agent.h
+++ b/cast/standalone_sender/looping_file_cast_agent.h
@@ -21,6 +21,7 @@
#include "cast/sender/public/sender_socket_factory.h"
#include "cast/standalone_sender/connection_settings.h"
#include "cast/standalone_sender/looping_file_sender.h"
+#include "cast/standalone_sender/remoting_sender.h"
#include "cast/streaming/environment.h"
#include "cast/streaming/sender_session.h"
#include "platform/api/scoped_wake_lock.h"
@@ -134,6 +135,15 @@ class LoopingFileCastAgent final
SenderSession::RemotingNegotiation negotiation) override;
void OnError(const SenderSession* session, Error error) override;
+ // Callback for when the RemotingSender indicates that the receiver
+ // is ready.
+ void OnRemotingReceiverReady();
+
+ // Starts the remoting sender. This may occur when remoting is "ready" if the
+ // session is already negotiated, or upon session negotiation if the receiver
+ // is already ready.
+ void StartRemotingSenders();
+
// Helper for stopping the current session, and/or unwinding a remote
// connection request (pre-session). This ensures LoopingFileCastAgent is in a
// terminal shutdown state.
@@ -168,6 +178,17 @@ class LoopingFileCastAgent final
std::unique_ptr<Environment> environment_;
std::unique_ptr<SenderSession> current_session_;
std::unique_ptr<LoopingFileSender> file_sender_;
+
+ // Remoting specific member variables.
+ std::unique_ptr<RemotingSender> remoting_sender_;
+
+ // Set when remoting is successfully negotiated. However, remoting streams
+ // won't start until |is_ready_for_remoting_| is true.
+ std::unique_ptr<SenderSession::RemotingNegotiation> current_negotiation_;
+
+ // Set to true when the remoting receiver is ready. However, remoting streams
+ // won't start until remoting is successfully negotiated.
+ bool is_ready_for_remoting_ = false;
};
} // namespace cast
diff --git a/cast/standalone_sender/remoting_sender.cc b/cast/standalone_sender/remoting_sender.cc
new file mode 100644
index 00000000..1566f0a7
--- /dev/null
+++ b/cast/standalone_sender/remoting_sender.cc
@@ -0,0 +1,95 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "cast/standalone_sender/remoting_sender.h"
+
+#include <utility>
+
+#include "cast/streaming/message_fields.h"
+
+namespace openscreen {
+namespace cast {
+
+namespace {
+
+VideoDecoderConfig::Codec ToProtoCodec(VideoCodec value) {
+ switch (value) {
+ case VideoCodec::kHevc:
+ return VideoDecoderConfig_Codec_kCodecHEVC;
+ case VideoCodec::kH264:
+ return VideoDecoderConfig_Codec_kCodecH264;
+ case VideoCodec::kVp8:
+ return VideoDecoderConfig_Codec_kCodecVP8;
+ case VideoCodec::kVp9:
+ return VideoDecoderConfig_Codec_kCodecVP9;
+ default:
+ return VideoDecoderConfig_Codec_kUnknownVideoCodec;
+ }
+}
+
+AudioDecoderConfig::Codec ToProtoCodec(AudioCodec value) {
+ switch (value) {
+ case AudioCodec::kAac:
+ return AudioDecoderConfig_Codec_kCodecAAC;
+ case AudioCodec::kOpus:
+ return AudioDecoderConfig_Codec_kCodecOpus;
+ default:
+ return AudioDecoderConfig_Codec_kUnknownAudioCodec;
+ }
+}
+
+} // namespace
+
+RemotingSender::RemotingSender(RpcMessenger* messenger,
+ AudioCodec audio_codec,
+ VideoCodec video_codec,
+ ReadyCallback ready_cb)
+ : messenger_(messenger),
+ audio_codec_(audio_codec),
+ video_codec_(video_codec),
+ ready_cb_(std::move(ready_cb)) {
+ messenger_->RegisterMessageReceiverCallback(
+ RpcMessenger::kAcquireRendererHandle,
+ [this](std::unique_ptr<RpcMessage> message) {
+ OSP_DCHECK(message);
+ this->OnInitializeMessage(*message);
+ });
+}
+
+RemotingSender::~RemotingSender() {
+ messenger_->UnregisterMessageReceiverCallback(
+ RpcMessenger::kAcquireRendererHandle);
+}
+
+void RemotingSender::OnInitializeMessage(const RpcMessage& message) {
+ receiver_handle_ = message.integer_value();
+
+ RpcMessage callback_message;
+ callback_message.set_handle(receiver_handle_);
+ callback_message.set_proc(RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
+
+ auto* callback_body =
+ callback_message.mutable_demuxerstream_initializecb_rpc();
+
+ // In Chrome, separate calls are used for the audio and video configs, but
+ // for simplicity's sake we combine them here.
+ callback_body->mutable_audio_decoder_config()->set_codec(
+ ToProtoCodec(audio_codec_));
+ callback_body->mutable_video_decoder_config()->set_codec(
+ ToProtoCodec(video_codec_));
+
+ OSP_DLOG_INFO << "Initializing receiver handle " << receiver_handle_
+ << " with audio codec " << CodecToString(audio_codec_)
+ << " and video codec " << CodecToString(video_codec_);
+ messenger_->SendMessageToRemote(callback_message);
+
+ if (ready_cb_) {
+ ready_cb_();
+ } else {
+ OSP_DLOG_INFO << "Received a ready message, but no ready callback.";
+ }
+}
+
+} // namespace cast
+} // namespace openscreen
diff --git a/cast/standalone_sender/remoting_sender.h b/cast/standalone_sender/remoting_sender.h
new file mode 100644
index 00000000..d331009b
--- /dev/null
+++ b/cast/standalone_sender/remoting_sender.h
@@ -0,0 +1,67 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
+#define CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
+
+#include <memory>
+
+#include "cast/streaming/constants.h"
+#include "cast/streaming/rpc_messenger.h"
+
+namespace openscreen {
+namespace cast {
+
+// This class behaves like a pared-down version of Chrome's StreamProvider (see
+// https://source.chromium.org/chromium/chromium/src/+/main:media/remoting/stream_provider.h
+// ). Instead of fully managing a media::DemuxerStream however, it just provides
+// an RPC initialization routine that notifies the standalone receiver's
+// SimpleRemotingReceiver instance (if configured) that initialization has been
+// complete and what codecs were selected.
+//
+// Due to the sheer complexity of remoting, we don't have a fully functional
+// implementation of remoting in the standalone_* components, instead Chrome is
+// the reference implementation and we have these simple classes to exercise
+// the public APIs.
+class RemotingSender {
+ public:
+ using ReadyCallback = std::function<void()>;
+ RemotingSender(RpcMessenger* messenger,
+ AudioCodec audio_codec,
+ VideoCodec video_codec,
+ ReadyCallback ready_cb);
+ ~RemotingSender();
+
+ private:
+ // When the receiver indicates that it is ready for initialization, it will
+ // The receiver sends us an "initialization" message that we respond to
+ // here with an "initialization callback" message that contains codec
+ // information.
+ void OnInitializeMessage(const RpcMessage& message);
+
+ // The messenger is the only caller of OnInitializeMessage, so there are no
+ // lifetime concerns. However, if this class outlives |messenger_|, it will
+ // no longer receive initialization messages.
+ RpcMessenger* messenger_;
+
+ // Unlike in Chrome, here we should know the video and audio codecs before any
+ // of the remoting code gets set up, and for simplicity's sake we can only
+ // populate the AudioDecoderConfig and VideoDecoderConfig objects with the
+ // codecs and use the rest of the fields as-is from the OFFER/ANSWER exchange.
+ const AudioCodec audio_codec_;
+ const VideoCodec video_codec_;
+
+ // The callback method to be called once we get the initialization message
+ // from the receiver.
+ ReadyCallback ready_cb_;
+
+ // The initialization message from the receiver contains the handle the
+ // callback should go to.
+ RpcMessenger::Handle receiver_handle_ = RpcMessenger::kInvalidHandle;
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STANDALONE_SENDER_REMOTING_SENDER_H_
diff --git a/cast/streaming/constants.h b/cast/streaming/constants.h
index c640796d..668b6bca 100644
--- a/cast/streaming/constants.h
+++ b/cast/streaming/constants.h
@@ -111,6 +111,8 @@ constexpr int kSupportedRemotingVersion = 2;
enum class AudioCodec { kAac, kOpus, kNotSpecified };
enum class VideoCodec { kH264, kVp8, kHevc, kVp9, kNotSpecified };
+enum class CastMode : uint8_t { kMirroring, kRemoting };
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/offer_messages.h b/cast/streaming/offer_messages.h
index c4899eec..c2be5bfa 100644
--- a/cast/streaming/offer_messages.h
+++ b/cast/streaming/offer_messages.h
@@ -97,8 +97,6 @@ struct VideoStream {
std::string error_recovery_mode = {};
};
-enum class CastMode : uint8_t { kMirroring, kRemoting };
-
struct Offer {
// TODO(jophba): remove deprecated declaration in a separate patch.
static ErrorOr<Offer> Parse(const Json::Value& root);
diff --git a/cast/streaming/receiver_message.cc b/cast/streaming/receiver_message.cc
index e1af610c..4fb1a8a6 100644
--- a/cast/streaming/receiver_message.cc
+++ b/cast/streaming/receiver_message.cc
@@ -128,10 +128,8 @@ Json::Value ReceiverCapability::ToJson() const {
// static
ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
ReceiverMessage message;
- if (!value ||
- !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
- return Error(Error::Code::kJsonParseError,
- "Failed to parse sequence number");
+ if (!value) {
+ return Error(Error::Code::kJsonParseError, "Invalid message body");
}
std::string result;
@@ -184,6 +182,12 @@ ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
break;
}
+ if (message.type != ReceiverMessage::Type::kRpc &&
+ !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
+ message.sequence_number = -1;
+ message.valid = false;
+ }
+
return message;
}
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index 36a47bca..056bbabf 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -240,6 +240,10 @@ ReceiverSession::ReceiverSession(Client* const client,
[this](SenderMessage message) {
OnCapabilitiesRequest(std::move(message));
});
+ messenger_.SetHandler(SenderMessage::Type::kRpc,
+ [this](SenderMessage message) {
+ this->OnRpcMessage(std::move(message));
+ });
environment_->SetSocketSubscriber(this);
}
@@ -354,6 +358,21 @@ void ReceiverSession::OnCapabilitiesRequest(SenderMessage message) {
}
}
+void ReceiverSession::OnRpcMessage(SenderMessage message) {
+ if (!message.valid) {
+ OSP_DLOG_WARN
+ << "Bad RPC message. This may or may not represent a serious problem.";
+ return;
+ }
+
+ const auto& body = absl::get<std::vector<uint8_t>>(message.body);
+ if (!rpc_messenger_) {
+ OSP_DLOG_INFO << "Received an RPC message without having a messenger.";
+ return;
+ }
+ rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+}
+
void ReceiverSession::SelectStreams(const Offer& offer,
SessionProperties* properties) {
if (offer.cast_mode == CastMode::kMirroring) {
@@ -465,6 +484,7 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
client_->OnReceiversDestroying(this, reason);
current_audio_receiver_.reset();
current_video_receiver_.reset();
+ rpc_messenger_.reset();
}
}
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index fe4f9481..d928da5e 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -67,10 +67,16 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// once we get a remoting request from a Sender.
struct RemotingNegotiation {
// The configured receivers set to be used for handling audio and
- // video streams.
+ // video streams. Unlike in the general streaming case, when we are remoting
+ // we don't know the codec and other information about the stream until
+ // the sender provices that information through the
+ // DemuxerStreamInitializeCallback RPC method.
ConfiguredReceivers receivers;
// The RPC messenger to be used for subscribing to remoting proto messages.
+ // Unlike the SenderSession API, the RPC messenger is negotiation specific.
+ // The messenger is torn down when |OnReceiversDestroying| is called, and
+ // is owned by the ReceiverSession.
RpcMessenger* messenger;
};
@@ -303,6 +309,7 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// Specific message type handler methods.
void OnOffer(SenderMessage message);
void OnCapabilitiesRequest(SenderMessage message);
+ void OnRpcMessage(SenderMessage message);
// Selects streams from an offer based on its configuration, and sets
// them in the session properties.
diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc
index 25b567ee..b8d64afa 100644
--- a/cast/streaming/receiver_session_unittest.cc
+++ b/cast/streaming/receiver_session_unittest.cc
@@ -94,6 +94,48 @@ constexpr char kValidOfferMessage[] = R"({
}
})";
+constexpr char kValidRemotingOfferMessage[] = R"({
+ "type": "OFFER",
+ "seqNum": 419,
+ "offer": {
+ "castMode": "remoting",
+ "supportedStreams": [
+ {
+ "index": 31339,
+ "type": "video_source",
+ "codecName": "REMOTE_VIDEO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 127,
+ "ssrc": 19088745,
+ "maxFrameRate": "60000/1000",
+ "timeBase": "1/90000",
+ "maxBitRate": 5432101,
+ "aesKey": "040d756791711fd3adb939066e6d8690",
+ "aesIvMask": "9ff0f022a959150e70a2d05a6c184aed",
+ "resolutions": [
+ {
+ "width": 1920,
+ "height":1080
+ }
+ ]
+ },
+ {
+ "index": 31340,
+ "type": "audio_source",
+ "codecName": "REMOTE_AUDIO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 97,
+ "ssrc": 19088747,
+ "bitRate": 125000,
+ "timeBase": "1/48000",
+ "channels": 2,
+ "aesKey": "51027e4e2347cbcb49d57ef10177aebc",
+ "aesIvMask": "7f12a19be62a36c04ae4116caaeff6d1"
+ }
+ ]
+ }
+})";
+
constexpr char kNoAudioOfferMessage[] = R"({
"type": "OFFER",
"seqNum": 1337,
@@ -244,6 +286,12 @@ constexpr char kGetCapabilitiesMessage[] = R"({
"type": "GET_CAPABILITIES"
})";
+constexpr char kRpcMessage[] = R"({
+ "rpc" : "CGQQnBiCGQgSAggMGgIIBg==",
+ "seqNum" : 2,
+ "type" : "RPC"
+})";
+
class FakeClient : public ReceiverSession::Client {
public:
MOCK_METHOD(void,
@@ -251,6 +299,10 @@ class FakeClient : public ReceiverSession::Client {
(const ReceiverSession*, ReceiverSession::ConfiguredReceivers),
(override));
MOCK_METHOD(void,
+ OnRemotingNegotiated,
+ (const ReceiverSession*, ReceiverSession::RemotingNegotiation),
+ (override));
+ MOCK_METHOD(void,
OnReceiversDestroying,
(const ReceiverSession*, ReceiversDestroyingReason),
(override));
@@ -752,6 +804,83 @@ TEST_F(ReceiverSessionTest, ReturnsCapabilitiesWithRemotingPreferences) {
MediaCapability::k4k));
}
+TEST_F(ReceiverSessionTest, CanNegotiateRemoting) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ InSequence s;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([](const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) {
+ const auto& cr = negotiation.receivers;
+ EXPECT_TRUE(cr.audio_receiver);
+ EXPECT_EQ(cr.audio_receiver->config().sender_ssrc, 19088747u);
+ EXPECT_EQ(cr.audio_receiver->config().receiver_ssrc, 19088748u);
+ EXPECT_EQ(cr.audio_receiver->config().channels, 2);
+ EXPECT_EQ(cr.audio_receiver->config().rtp_timebase, 48000);
+ EXPECT_EQ(cr.audio_config.codec, AudioCodec::kNotSpecified);
+
+ EXPECT_TRUE(cr.video_receiver);
+ EXPECT_EQ(cr.video_receiver->config().sender_ssrc, 19088745u);
+ EXPECT_EQ(cr.video_receiver->config().receiver_ssrc, 19088746u);
+ EXPECT_EQ(cr.video_receiver->config().channels, 1);
+ EXPECT_EQ(cr.video_receiver->config().rtp_timebase, 90000);
+ EXPECT_EQ(cr.video_config.codec, VideoCodec::kNotSpecified);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+}
+
+TEST_F(ReceiverSessionTest, HandlesRpcMessage) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ const auto& messages = message_port_->posted_messages();
+ // Nothing should happen yet, the session doesn't have a messenger.
+ ASSERT_EQ(0u, messages.size());
+
+ // We don't need to fully test that the subscription model on the RpcMessenger
+ // works, but we do want to test that the ReceiverSession has properly wired
+ // the RpcMessenger up to the backing SessionMessenger and can properly
+ // handle received RPC messages.
+ InSequence s;
+ bool received_initialize_message = false;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([this, &received_initialize_message](
+ const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) mutable {
+ negotiation.messenger->RegisterMessageReceiverCallback(
+ 100, [&received_initialize_message](
+ std::unique_ptr<RpcMessage> message) mutable {
+ ASSERT_EQ(100, message->handle());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE_CALLBACK,
+ message->proc());
+ ASSERT_EQ(0, message->integer_value());
+ received_initialize_message = true;
+ });
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+ ASSERT_TRUE(received_initialize_message);
+}
+
TEST_F(ReceiverSessionTest, VideoLimitsIsSupersetOf) {
ReceiverSession::VideoLimits first{};
ReceiverSession::VideoLimits second = first;
diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc
index 54b2e279..63176f7c 100644
--- a/cast/streaming/rpc_messenger.cc
+++ b/cast/streaming/rpc_messenger.cc
@@ -72,7 +72,8 @@ void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
std::size_t message_len) {
auto rpc = std::make_unique<RpcMessage>();
if (!rpc->ParseFromArray(message, message_len)) {
- OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message;
+ OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message
+ << "\"";
return;
}
OSP_DVLOG << "Received RPC message: " << *rpc;
diff --git a/cast/streaming/rpc_messenger_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc
index e0627542..758a9036 100644
--- a/cast/streaming/rpc_messenger_unittest.cc
+++ b/cast/streaming/rpc_messenger_unittest.cc
@@ -125,16 +125,16 @@ TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) {
// Send message for RPC messenger to process.
RpcMessage sent_rpc;
sent_rpc.set_handle(fake_messenger_->handle());
- sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
- sent_rpc.set_double_value(3.4);
+ sent_rpc.set_proc(RpcMessage::RPC_DS_INITIALIZE);
+ sent_rpc.set_integer_value(4004);
ProcessMessage(sent_rpc);
// Checks if received message is identical to the one sent earlier.
ASSERT_EQ(1, fake_messenger_->received_count());
const RpcMessage& received_rpc = fake_messenger_->received_rpc();
ASSERT_EQ(fake_messenger_->handle(), received_rpc.handle());
- ASSERT_EQ(RpcMessage::RPC_R_SETVOLUME, received_rpc.proc());
- ASSERT_EQ(3.4, received_rpc.double_value());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE, received_rpc.proc());
+ ASSERT_EQ(4004, received_rpc.integer_value());
}
TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) {
diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc
index bbb3972f..f59f0d70 100644
--- a/cast/streaming/sender_session.cc
+++ b/cast/streaming/sender_session.cc
@@ -210,6 +210,9 @@ SenderSession::SenderSession(Configuration config)
config_.client->OnError(this, error);
},
config_.environment->task_runner()),
+ rpc_messenger_([this](std::vector<uint8_t> message) {
+ SendRpcMessage(std::move(message));
+ }),
packet_router_(config_.environment) {
OSP_DCHECK(config_.client);
OSP_DCHECK(config_.environment);
@@ -265,7 +268,6 @@ void SenderSession::ResetState() {
current_negotiation_.reset();
current_audio_sender_.reset();
current_video_sender_.reset();
- rpc_messenger_.reset();
}
Error SenderSession::StartNegotiation(
@@ -299,7 +301,7 @@ void SenderSession::OnAnswer(ReceiverMessage message) {
return;
}
- state_ = State::kMirroring;
+ state_ = State::kStreaming;
config_.client->OnNegotiated(
this, std::move(senders),
capture_recommendations::GetRecommendations(answer));
@@ -361,27 +363,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) {
"Failed to negotiate a remoting session."));
return;
}
- rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
- Error error = this->messenger_.SendOutboundMessage(SenderMessage{
- SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
- std::move(message)});
-
- if (!error.ok()) {
- OSP_LOG_WARN << "Failed to send RPC message: " << error;
- }
- });
config_.client->OnRemotingNegotiated(
- this, RemotingNegotiation{std::move(senders), ToCapabilities(caps),
- rpc_messenger_.get()});
+ this, RemotingNegotiation{std::move(senders), ToCapabilities(caps)});
}
void SenderSession::OnRpcMessage(ReceiverMessage message) {
- if (!rpc_messenger_) {
- OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger.";
- return;
- }
-
if (!message.valid) {
HandleErrorMessage(
message,
@@ -390,7 +377,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) {
}
const auto& body = absl::get<std::vector<uint8_t>>(message.body);
- rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+ rpc_messenger_.ProcessMessageFromRemote(body.data(), body.size());
}
void SenderSession::HandleErrorMessage(ReceiverMessage message,
@@ -492,5 +479,15 @@ SenderSession::ConfiguredSenders SenderSession::SpawnSenders(
return senders;
}
+void SenderSession::SendRpcMessage(std::vector<uint8_t> message_body) {
+ Error error = this->messenger_.SendOutboundMessage(SenderMessage{
+ SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
+ std::move(message_body)});
+
+ if (!error.ok()) {
+ OSP_LOG_WARN << "Failed to send RPC message: " << error;
+ }
+}
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h
index 7995181f..f7fbb97a 100644
--- a/cast/streaming/sender_session.h
+++ b/cast/streaming/sender_session.h
@@ -58,9 +58,6 @@ class SenderSession final {
// about legacy devices, such as pre-1.27 Earth receivers should do
// a version check when using these capabilities to offer remoting.
RemotingCapabilities capabilities;
-
- // The RPC messenger to be used for subscribing to remoting proto messages.
- RpcMessenger* messenger;
};
// The embedder should provide a client for handling negotiation events.
@@ -158,6 +155,11 @@ class SenderSession final {
// feedback. Embedders may use this information to throttle capture devices.
int GetEstimatedNetworkBandwidth() const;
+ // The RPC messenger for this session. NOTE: RPC messages may come at
+ // any time from the receiver, so subscriptions to RPC remoting messages
+ // should be done before calling |NegotiateRemoting|.
+ RpcMessenger* rpc_messenger() { return &rpc_messenger_; }
+
private:
// We store the current negotiation, so that when we get an answer from the
// receiver we can line up the selected streams with the original
@@ -183,7 +185,7 @@ class SenderSession final {
kIdle,
// Currently mirroring content to a receiver.
- kMirroring,
+ kStreaming,
// Currently remoting content to a receiver.
kRemoting
@@ -225,6 +227,9 @@ class SenderSession final {
// Spawn a set of configured senders from the currently stored negotiation.
ConfiguredSenders SpawnSenders(const Answer& answer);
+ // Used by the RPC messenger to send outbound messages.
+ void SendRpcMessage(std::vector<uint8_t> message_body);
+
// This session's configuration.
Configuration config_;
@@ -233,6 +238,10 @@ class SenderSession final {
// cast/protocol/castv2/streaming_schema.json.
SenderSessionMessenger messenger_;
+ // The RPC messenger, which uses the session messager for sending RPC messages
+ // and handles subscriptions to RPC messages.
+ RpcMessenger rpc_messenger_;
+
// The packet router used for RTP/RTCP messaging across all senders.
SenderPacketRouter packet_router_;
@@ -246,7 +255,7 @@ class SenderSession final {
std::unique_ptr<InProcessNegotiation> current_negotiation_;
// The current state of the session. Note that the state is intentionally
- // limited. |kMirroring| or |kRemoting| means that we are either starting
+ // limited. |kStreaming| or |kRemoting| means that we are either starting
// a negotiation or actively sending to a receiver.
State state_ = State::kIdle;
@@ -254,11 +263,6 @@ class SenderSession final {
// senders used for this session. Either or both may be nullptr.
std::unique_ptr<Sender> current_audio_sender_;
std::unique_ptr<Sender> current_video_sender_;
-
- // If remoting, we store the RpcMessenger used by the embedder to send RPC
- // messages from the remoting protobuf specification. For more information,
- // see //cast/streaming/remoting.proto.
- std::unique_ptr<RpcMessenger> rpc_messenger_;
}; // namespace cast
} // namespace cast
diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc
index 36b1bae8..bfe4cfab 100644
--- a/cast/streaming/sender_session_unittest.cc
+++ b/cast/streaming/sender_session_unittest.cc
@@ -545,8 +545,9 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) {
// The messenger is tested elsewhere, but we can sanity check that we got a valid
// one here.
- EXPECT_TRUE(negotiation.messenger);
- const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle();
+ EXPECT_TRUE(session_->rpc_messenger());
+ const RpcMessenger::Handle handle =
+ session_->rpc_messenger()->GetUniqueHandle();
EXPECT_NE(RpcMessenger::kInvalidHandle, handle);
}
diff --git a/cast/streaming/session_messenger.cc b/cast/streaming/session_messenger.cc
index 34c4c02f..389d87bb 100644
--- a/cast/streaming/session_messenger.cc
+++ b/cast/streaming/session_messenger.cc
@@ -146,13 +146,6 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
return;
}
- int sequence_number;
- if (!json::TryParseInt(message_body.value()[kSequenceNumber],
- &sequence_number)) {
- OSP_DLOG_WARN << "Received a message without a sequence number";
- return;
- }
-
// If the message is valid JSON and we don't understand it, there are two
// options: (1) it's an unknown type, or (2) the receiver filled out the
// message incorrectly. In the first case we can drop it, it's likely just
@@ -173,6 +166,13 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
}
} else {
+ int sequence_number;
+ if (!json::TryParseInt(message_body.value()[kSequenceNumber],
+ &sequence_number)) {
+ OSP_DLOG_WARN << "Received a message without a sequence number";
+ return;
+ }
+
auto it = awaiting_replies_.find(sequence_number);
if (it == awaiting_replies_.end()) {
OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "