diff options
24 files changed, 650 insertions, 82 deletions
diff --git a/cast/protocol/castv2/streaming_examples/rpc.json b/cast/protocol/castv2/streaming_examples/rpc.json index 6ebfc0e0..880ca641 100644 --- a/cast/protocol/castv2/streaming_examples/rpc.json +++ b/cast/protocol/castv2/streaming_examples/rpc.json @@ -1,5 +1,4 @@ { - "seqNum": 12345, "sessionId": 735189, "type": "RPC", "result": "ok", diff --git a/cast/protocol/castv2/streaming_schema.json b/cast/protocol/castv2/streaming_schema.json index e659e7bf..7044c8ea 100644 --- a/cast/protocol/castv2/streaming_schema.json +++ b/cast/protocol/castv2/streaming_schema.json @@ -121,10 +121,7 @@ "maxBitRate": {"type": "integer", "minimum": 300000}, "maxDelay": {"$ref": "#/definitions/delay"} }, - "required": [ - "maxDimensions", - "maxBitRate" - ] + "required": ["maxDimensions", "maxBitRate"] }, "constraints": { "properties": { @@ -220,15 +217,36 @@ ] } }, - "required": ["type", "seqNum"], + "required": ["type"], "allOf": [ { "if": { - "properties": {"type": {"enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"]}} + "properties": { + "type": { + "enum": ["ANSWER", "CAPABILITIES_RESPONSE", "STATUS_RESPONSE"] + } + } }, "then": {"required": ["result"]} }, { + "if": { + "properties": { + "type": { + "enum": [ + "OFFER", + "ANSWER", + "GET_CAPABILITIES", + "CAPABILITIES_RESPONSE", + "GET_STATUS", + "STATUS_RESPONSE" + ] + } + } + }, + "then": {"required": ["seqNum"]} + }, + { "if": {"properties": {"type": {"const": "OFFER"}}}, "then": {"required": ["offer"]} }, diff --git a/cast/standalone_receiver/BUILD.gn b/cast/standalone_receiver/BUILD.gn index f1b5fd31..956a1a71 100644 --- a/cast/standalone_receiver/BUILD.gn +++ b/cast/standalone_receiver/BUILD.gn @@ -16,6 +16,8 @@ if (!build_with_chromium) { "cast_service.h", "mirroring_application.cc", "mirroring_application.h", + "simple_remoting_receiver.cc", + "simple_remoting_receiver.h", "streaming_playback_controller.cc", "streaming_playback_controller.h", ] diff --git a/cast/standalone_receiver/simple_remoting_receiver.cc b/cast/standalone_receiver/simple_remoting_receiver.cc new file mode 100644 index 00000000..119be44f --- /dev/null +++ b/cast/standalone_receiver/simple_remoting_receiver.cc @@ -0,0 +1,107 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "cast/standalone_receiver/simple_remoting_receiver.h" + +#include <utility> + +#include "cast/streaming/message_fields.h" +#include "cast/streaming/remoting.pb.h" + +namespace openscreen { +namespace cast { + +namespace { + +VideoCodec ParseProtoCodec(VideoDecoderConfig::Codec value) { + switch (value) { + case VideoDecoderConfig_Codec_kCodecHEVC: + return VideoCodec::kHevc; + + case VideoDecoderConfig_Codec_kCodecH264: + return VideoCodec::kH264; + + case VideoDecoderConfig_Codec_kCodecVP8: + return VideoCodec::kVp8; + + case VideoDecoderConfig_Codec_kCodecVP9: + return VideoCodec::kVp9; + + default: + return VideoCodec::kNotSpecified; + } +} + +AudioCodec ParseProtoCodec(AudioDecoderConfig::Codec value) { + switch (value) { + case AudioDecoderConfig_Codec_kCodecAAC: + return AudioCodec::kAac; + + case AudioDecoderConfig_Codec_kCodecOpus: + return AudioCodec::kOpus; + + default: + return AudioCodec::kNotSpecified; + } +} + +} // namespace + +SimpleRemotingReceiver::SimpleRemotingReceiver(RpcMessenger* messenger) + : messenger_(messenger) { + messenger_->RegisterMessageReceiverCallback( + RpcMessenger::kFirstHandle, [this](std::unique_ptr<RpcMessage> message) { + this->OnInitializeCallbackMessage(std::move(message)); + }); +} + +SimpleRemotingReceiver::~SimpleRemotingReceiver() { + messenger_->UnregisterMessageReceiverCallback(RpcMessenger::kFirstHandle); +} + +void SimpleRemotingReceiver::SendInitializeMessage( + SimpleRemotingReceiver::InitializeCallback initialize_cb) { + initialize_cb_ = std::move(initialize_cb); + + OSP_DVLOG + << "Indicating to the sender we are ready for remoting initialization."; + openscreen::cast::RpcMessage rpc; + rpc.set_handle(RpcMessenger::kAcquireRendererHandle); + rpc.set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE); + + // The initialize message contains the handle to be used for sending the + // initialization callback message. + rpc.set_integer_value(RpcMessenger::kFirstHandle); + messenger_->SendMessageToRemote(rpc); +} + +void SimpleRemotingReceiver::OnInitializeCallbackMessage( + std::unique_ptr<RpcMessage> message) { + OSP_DCHECK(message->proc() == RpcMessage::RPC_DS_INITIALIZE_CALLBACK); + if (!initialize_cb_) { + OSP_DLOG_INFO << "Received an initialization callback message but no " + "callback was set."; + return; + } + + const DemuxerStreamInitializeCallback& callback_message = + message->demuxerstream_initializecb_rpc(); + const auto audio_codec = + callback_message.has_audio_decoder_config() + ? ParseProtoCodec(callback_message.audio_decoder_config().codec()) + : AudioCodec::kNotSpecified; + const auto video_codec = + callback_message.has_video_decoder_config() + ? ParseProtoCodec(callback_message.video_decoder_config().codec()) + : VideoCodec::kNotSpecified; + + OSP_DLOG_INFO << "Initializing remoting with audio codec " + << CodecToString(audio_codec) << " and video codec " + << CodecToString(video_codec); + initialize_cb_(audio_codec, video_codec); + initialize_cb_ = nullptr; +} + +} // namespace cast +} // namespace openscreen diff --git a/cast/standalone_receiver/simple_remoting_receiver.h b/cast/standalone_receiver/simple_remoting_receiver.h new file mode 100644 index 00000000..52fe131f --- /dev/null +++ b/cast/standalone_receiver/simple_remoting_receiver.h @@ -0,0 +1,52 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_ +#define CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_ + +#include <functional> +#include <memory> + +#include "cast/streaming/constants.h" +#include "cast/streaming/rpc_messenger.h" + +namespace openscreen { +namespace cast { + +// This class behaves like a pared-down version of Chrome's DemuxerStreamAdapter +// (see +// https://source.chromium.org/chromium/chromium/src/+/main:/media/remoting/demuxer_stream_adapter.h +// ). Instead of providing a full adapter implementation, it just provides a +// callback register that can be used to notify a component when the +// RemotingProvider sends an initialization message with audio and video codec +// information. +// +// Due to the sheer complexity of remoting, we don't have a fully functional +// implementation of remoting in the standalone_* components, instead Chrome is +// the reference implementation and we have these simple classes to exercise +// the public APIs. +class SimpleRemotingReceiver { + public: + explicit SimpleRemotingReceiver(RpcMessenger* messenger); + ~SimpleRemotingReceiver(); + + // The flow here closely mirrors the remoting.proto. The standalone receiver + // indicates it is ready for initialization by calling + // |SendInitializeMessage|, then this class sends an initialize message to the + // sender. The sender then replies with an initialization message containing + // configurations, which is passed to |initialize_cb|. + using InitializeCallback = std::function<void(AudioCodec, VideoCodec)>; + void SendInitializeMessage(InitializeCallback initialize_cb); + + private: + void OnInitializeCallbackMessage(std::unique_ptr<RpcMessage> message); + + RpcMessenger* messenger_; + InitializeCallback initialize_cb_; +}; + +} // namespace cast +} // namespace openscreen + +#endif // CAST_STANDALONE_RECEIVER_SIMPLE_REMOTING_RECEIVER_H_ diff --git a/cast/standalone_receiver/streaming_playback_controller.cc b/cast/standalone_receiver/streaming_playback_controller.cc index 2a39ab8e..46ee0bbb 100644 --- a/cast/standalone_receiver/streaming_playback_controller.cc +++ b/cast/standalone_receiver/streaming_playback_controller.cc @@ -57,6 +57,42 @@ void StreamingPlaybackController::OnNegotiated( const ReceiverSession* session, ReceiverSession::ConfiguredReceivers receivers) { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneReceiver); + Initialize(receivers); +} + +void StreamingPlaybackController::OnRemotingNegotiated( + const ReceiverSession* session, + ReceiverSession::RemotingNegotiation negotiation) { + remoting_receiver_ = + std::make_unique<SimpleRemotingReceiver>(negotiation.messenger); + remoting_receiver_->SendInitializeMessage( + [this, receivers = negotiation.receivers](AudioCodec audio_codec, + VideoCodec video_codec) { + // The configurations in |negotiation| do not have the actual codecs, + // only REMOTE_AUDIO and REMOTE_VIDEO. Once we receive the + // initialization callback method, we can override with the actual + // codecs here. + auto mutable_receivers = receivers; + mutable_receivers.audio_config.codec = audio_codec; + mutable_receivers.video_config.codec = video_codec; + Initialize(mutable_receivers); + }); +} + +void StreamingPlaybackController::OnReceiversDestroying( + const ReceiverSession* session, + ReceiversDestroyingReason reason) { + audio_player_.reset(); + video_player_.reset(); +} + +void StreamingPlaybackController::OnError(const ReceiverSession* session, + Error error) { + client_->OnPlaybackError(this, error); +} + +void StreamingPlaybackController::Initialize( + ReceiverSession::ConfiguredReceivers receivers) { #if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS) if (receivers.audio_receiver) { audio_player_ = std::make_unique<SDLAudioPlayer>( @@ -83,25 +119,5 @@ void StreamingPlaybackController::OnNegotiated( #endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS) } -void StreamingPlaybackController::OnRemotingNegotiated( - const ReceiverSession* session, - ReceiverSession::RemotingNegotiation negotiation) { - // TODO(issuetracker.google.com/190078859): need ability to initialize - // remoting codecs. - OSP_UNIMPLEMENTED(); -} - -void StreamingPlaybackController::OnReceiversDestroying( - const ReceiverSession* session, - ReceiversDestroyingReason reason) { - audio_player_.reset(); - video_player_.reset(); -} - -void StreamingPlaybackController::OnError(const ReceiverSession* session, - Error error) { - client_->OnPlaybackError(this, error); -} - } // namespace cast } // namespace openscreen diff --git a/cast/standalone_receiver/streaming_playback_controller.h b/cast/standalone_receiver/streaming_playback_controller.h index ea288046..65a233e5 100644 --- a/cast/standalone_receiver/streaming_playback_controller.h +++ b/cast/standalone_receiver/streaming_playback_controller.h @@ -7,6 +7,7 @@ #include <memory> +#include "cast/standalone_receiver/simple_remoting_receiver.h" #include "cast/streaming/receiver_session.h" #include "platform/impl/task_runner.h" @@ -46,6 +47,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client { TaskRunner* const task_runner_; StreamingPlaybackController::Client* client_; + void Initialize(ReceiverSession::ConfiguredReceivers receivers); + #if defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS) // NOTE: member ordering is important, since the sub systems must be // first-constructed, last-destroyed. Make sure any new SDL related @@ -62,6 +65,8 @@ class StreamingPlaybackController final : public ReceiverSession::Client { std::unique_ptr<DummyPlayer> audio_player_; std::unique_ptr<DummyPlayer> video_player_; #endif // defined(CAST_STANDALONE_RECEIVER_HAVE_EXTERNAL_LIBS) + + std::unique_ptr<SimpleRemotingReceiver> remoting_receiver_; }; } // namespace cast diff --git a/cast/standalone_sender/BUILD.gn b/cast/standalone_sender/BUILD.gn index afa73d7d..780e0609 100644 --- a/cast/standalone_sender/BUILD.gn +++ b/cast/standalone_sender/BUILD.gn @@ -49,6 +49,8 @@ if (!build_with_chromium) { "looping_file_sender.h", "receiver_chooser.cc", "receiver_chooser.h", + "remoting_sender.cc", + "remoting_sender.h", "simulated_capturer.cc", "simulated_capturer.h", "streaming_opus_encoder.cc", diff --git a/cast/standalone_sender/looping_file_cast_agent.cc b/cast/standalone_sender/looping_file_cast_agent.cc index ce9826de..bfb970bf 100644 --- a/cast/standalone_sender/looping_file_cast_agent.cc +++ b/cast/standalone_sender/looping_file_cast_agent.cc @@ -270,6 +270,10 @@ void LoopingFileCastAgent::CreateAndStartSession() { OSP_VLOG << "Starting session negotiation."; Error negotiation_error; if (connection_settings_->use_remoting) { + remoting_sender_ = std::make_unique<RemotingSender>( + current_session_->rpc_messenger(), AudioCodec::kOpus, VideoCodec::kVp8, + [this]() { OnRemotingReceiverReady(); }); + negotiation_error = current_session_->NegotiateRemoting(audio_config, video_config); } else { @@ -298,17 +302,17 @@ void LoopingFileCastAgent::OnNegotiated( void LoopingFileCastAgent::OnRemotingNegotiated( const SenderSession* session, SenderSession::RemotingNegotiation negotiation) { - // TODO(jophba): this needs to be hashed out as part of - // figuring out the embedder workflow. if (negotiation.senders.audio_sender == nullptr && negotiation.senders.video_sender == nullptr) { OSP_LOG_ERROR << "Missing both audio and video, so exiting..."; return; } - file_sender_ = std::make_unique<LoopingFileSender>( - environment_.get(), connection_settings_.value(), session, - std::move(negotiation.senders), [this]() { shutdown_callback_(); }); + current_negotiation_ = + std::make_unique<SenderSession::RemotingNegotiation>(negotiation); + if (is_ready_for_remoting_) { + StartRemotingSenders(); + } } void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) { @@ -316,6 +320,23 @@ void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) { Shutdown(); } +void LoopingFileCastAgent::OnRemotingReceiverReady() { + is_ready_for_remoting_ = true; + if (current_negotiation_) { + StartRemotingSenders(); + } +} + +void LoopingFileCastAgent::StartRemotingSenders() { + OSP_DCHECK(current_negotiation_); + file_sender_ = std::make_unique<LoopingFileSender>( + environment_.get(), connection_settings_.value(), current_session_.get(), + std::move(current_negotiation_->senders), + [this]() { shutdown_callback_(); }); + current_negotiation_.reset(); + is_ready_for_remoting_ = false; +} + void LoopingFileCastAgent::Shutdown() { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender); diff --git a/cast/standalone_sender/looping_file_cast_agent.h b/cast/standalone_sender/looping_file_cast_agent.h index cff0f6ba..895d1324 100644 --- a/cast/standalone_sender/looping_file_cast_agent.h +++ b/cast/standalone_sender/looping_file_cast_agent.h @@ -21,6 +21,7 @@ #include "cast/sender/public/sender_socket_factory.h" #include "cast/standalone_sender/connection_settings.h" #include "cast/standalone_sender/looping_file_sender.h" +#include "cast/standalone_sender/remoting_sender.h" #include "cast/streaming/environment.h" #include "cast/streaming/sender_session.h" #include "platform/api/scoped_wake_lock.h" @@ -134,6 +135,15 @@ class LoopingFileCastAgent final SenderSession::RemotingNegotiation negotiation) override; void OnError(const SenderSession* session, Error error) override; + // Callback for when the RemotingSender indicates that the receiver + // is ready. + void OnRemotingReceiverReady(); + + // Starts the remoting sender. This may occur when remoting is "ready" if the + // session is already negotiated, or upon session negotiation if the receiver + // is already ready. + void StartRemotingSenders(); + // Helper for stopping the current session, and/or unwinding a remote // connection request (pre-session). This ensures LoopingFileCastAgent is in a // terminal shutdown state. @@ -168,6 +178,17 @@ class LoopingFileCastAgent final std::unique_ptr<Environment> environment_; std::unique_ptr<SenderSession> current_session_; std::unique_ptr<LoopingFileSender> file_sender_; + + // Remoting specific member variables. + std::unique_ptr<RemotingSender> remoting_sender_; + + // Set when remoting is successfully negotiated. However, remoting streams + // won't start until |is_ready_for_remoting_| is true. + std::unique_ptr<SenderSession::RemotingNegotiation> current_negotiation_; + + // Set to true when the remoting receiver is ready. However, remoting streams + // won't start until remoting is successfully negotiated. + bool is_ready_for_remoting_ = false; }; } // namespace cast diff --git a/cast/standalone_sender/remoting_sender.cc b/cast/standalone_sender/remoting_sender.cc new file mode 100644 index 00000000..1566f0a7 --- /dev/null +++ b/cast/standalone_sender/remoting_sender.cc @@ -0,0 +1,95 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "cast/standalone_sender/remoting_sender.h" + +#include <utility> + +#include "cast/streaming/message_fields.h" + +namespace openscreen { +namespace cast { + +namespace { + +VideoDecoderConfig::Codec ToProtoCodec(VideoCodec value) { + switch (value) { + case VideoCodec::kHevc: + return VideoDecoderConfig_Codec_kCodecHEVC; + case VideoCodec::kH264: + return VideoDecoderConfig_Codec_kCodecH264; + case VideoCodec::kVp8: + return VideoDecoderConfig_Codec_kCodecVP8; + case VideoCodec::kVp9: + return VideoDecoderConfig_Codec_kCodecVP9; + default: + return VideoDecoderConfig_Codec_kUnknownVideoCodec; + } +} + +AudioDecoderConfig::Codec ToProtoCodec(AudioCodec value) { + switch (value) { + case AudioCodec::kAac: + return AudioDecoderConfig_Codec_kCodecAAC; + case AudioCodec::kOpus: + return AudioDecoderConfig_Codec_kCodecOpus; + default: + return AudioDecoderConfig_Codec_kUnknownAudioCodec; + } +} + +} // namespace + +RemotingSender::RemotingSender(RpcMessenger* messenger, + AudioCodec audio_codec, + VideoCodec video_codec, + ReadyCallback ready_cb) + : messenger_(messenger), + audio_codec_(audio_codec), + video_codec_(video_codec), + ready_cb_(std::move(ready_cb)) { + messenger_->RegisterMessageReceiverCallback( + RpcMessenger::kAcquireRendererHandle, + [this](std::unique_ptr<RpcMessage> message) { + OSP_DCHECK(message); + this->OnInitializeMessage(*message); + }); +} + +RemotingSender::~RemotingSender() { + messenger_->UnregisterMessageReceiverCallback( + RpcMessenger::kAcquireRendererHandle); +} + +void RemotingSender::OnInitializeMessage(const RpcMessage& message) { + receiver_handle_ = message.integer_value(); + + RpcMessage callback_message; + callback_message.set_handle(receiver_handle_); + callback_message.set_proc(RpcMessage::RPC_DS_INITIALIZE_CALLBACK); + + auto* callback_body = + callback_message.mutable_demuxerstream_initializecb_rpc(); + + // In Chrome, separate calls are used for the audio and video configs, but + // for simplicity's sake we combine them here. + callback_body->mutable_audio_decoder_config()->set_codec( + ToProtoCodec(audio_codec_)); + callback_body->mutable_video_decoder_config()->set_codec( + ToProtoCodec(video_codec_)); + + OSP_DLOG_INFO << "Initializing receiver handle " << receiver_handle_ + << " with audio codec " << CodecToString(audio_codec_) + << " and video codec " << CodecToString(video_codec_); + messenger_->SendMessageToRemote(callback_message); + + if (ready_cb_) { + ready_cb_(); + } else { + OSP_DLOG_INFO << "Received a ready message, but no ready callback."; + } +} + +} // namespace cast +} // namespace openscreen diff --git a/cast/standalone_sender/remoting_sender.h b/cast/standalone_sender/remoting_sender.h new file mode 100644 index 00000000..d331009b --- /dev/null +++ b/cast/standalone_sender/remoting_sender.h @@ -0,0 +1,67 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CAST_STANDALONE_SENDER_REMOTING_SENDER_H_ +#define CAST_STANDALONE_SENDER_REMOTING_SENDER_H_ + +#include <memory> + +#include "cast/streaming/constants.h" +#include "cast/streaming/rpc_messenger.h" + +namespace openscreen { +namespace cast { + +// This class behaves like a pared-down version of Chrome's StreamProvider (see +// https://source.chromium.org/chromium/chromium/src/+/main:media/remoting/stream_provider.h +// ). Instead of fully managing a media::DemuxerStream however, it just provides +// an RPC initialization routine that notifies the standalone receiver's +// SimpleRemotingReceiver instance (if configured) that initialization has been +// complete and what codecs were selected. +// +// Due to the sheer complexity of remoting, we don't have a fully functional +// implementation of remoting in the standalone_* components, instead Chrome is +// the reference implementation and we have these simple classes to exercise +// the public APIs. +class RemotingSender { + public: + using ReadyCallback = std::function<void()>; + RemotingSender(RpcMessenger* messenger, + AudioCodec audio_codec, + VideoCodec video_codec, + ReadyCallback ready_cb); + ~RemotingSender(); + + private: + // When the receiver indicates that it is ready for initialization, it will + // The receiver sends us an "initialization" message that we respond to + // here with an "initialization callback" message that contains codec + // information. + void OnInitializeMessage(const RpcMessage& message); + + // The messenger is the only caller of OnInitializeMessage, so there are no + // lifetime concerns. However, if this class outlives |messenger_|, it will + // no longer receive initialization messages. + RpcMessenger* messenger_; + + // Unlike in Chrome, here we should know the video and audio codecs before any + // of the remoting code gets set up, and for simplicity's sake we can only + // populate the AudioDecoderConfig and VideoDecoderConfig objects with the + // codecs and use the rest of the fields as-is from the OFFER/ANSWER exchange. + const AudioCodec audio_codec_; + const VideoCodec video_codec_; + + // The callback method to be called once we get the initialization message + // from the receiver. + ReadyCallback ready_cb_; + + // The initialization message from the receiver contains the handle the + // callback should go to. + RpcMessenger::Handle receiver_handle_ = RpcMessenger::kInvalidHandle; +}; + +} // namespace cast +} // namespace openscreen + +#endif // CAST_STANDALONE_SENDER_REMOTING_SENDER_H_ diff --git a/cast/streaming/constants.h b/cast/streaming/constants.h index c640796d..668b6bca 100644 --- a/cast/streaming/constants.h +++ b/cast/streaming/constants.h @@ -111,6 +111,8 @@ constexpr int kSupportedRemotingVersion = 2; enum class AudioCodec { kAac, kOpus, kNotSpecified }; enum class VideoCodec { kH264, kVp8, kHevc, kVp9, kNotSpecified }; +enum class CastMode : uint8_t { kMirroring, kRemoting }; + } // namespace cast } // namespace openscreen diff --git a/cast/streaming/offer_messages.h b/cast/streaming/offer_messages.h index c4899eec..c2be5bfa 100644 --- a/cast/streaming/offer_messages.h +++ b/cast/streaming/offer_messages.h @@ -97,8 +97,6 @@ struct VideoStream { std::string error_recovery_mode = {}; }; -enum class CastMode : uint8_t { kMirroring, kRemoting }; - struct Offer { // TODO(jophba): remove deprecated declaration in a separate patch. static ErrorOr<Offer> Parse(const Json::Value& root); diff --git a/cast/streaming/receiver_message.cc b/cast/streaming/receiver_message.cc index e1af610c..4fb1a8a6 100644 --- a/cast/streaming/receiver_message.cc +++ b/cast/streaming/receiver_message.cc @@ -128,10 +128,8 @@ Json::Value ReceiverCapability::ToJson() const { // static ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) { ReceiverMessage message; - if (!value || - !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) { - return Error(Error::Code::kJsonParseError, - "Failed to parse sequence number"); + if (!value) { + return Error(Error::Code::kJsonParseError, "Invalid message body"); } std::string result; @@ -184,6 +182,12 @@ ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) { break; } + if (message.type != ReceiverMessage::Type::kRpc && + !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) { + message.sequence_number = -1; + message.valid = false; + } + return message; } diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc index 36a47bca..056bbabf 100644 --- a/cast/streaming/receiver_session.cc +++ b/cast/streaming/receiver_session.cc @@ -240,6 +240,10 @@ ReceiverSession::ReceiverSession(Client* const client, [this](SenderMessage message) { OnCapabilitiesRequest(std::move(message)); }); + messenger_.SetHandler(SenderMessage::Type::kRpc, + [this](SenderMessage message) { + this->OnRpcMessage(std::move(message)); + }); environment_->SetSocketSubscriber(this); } @@ -354,6 +358,21 @@ void ReceiverSession::OnCapabilitiesRequest(SenderMessage message) { } } +void ReceiverSession::OnRpcMessage(SenderMessage message) { + if (!message.valid) { + OSP_DLOG_WARN + << "Bad RPC message. This may or may not represent a serious problem."; + return; + } + + const auto& body = absl::get<std::vector<uint8_t>>(message.body); + if (!rpc_messenger_) { + OSP_DLOG_INFO << "Received an RPC message without having a messenger."; + return; + } + rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size()); +} + void ReceiverSession::SelectStreams(const Offer& offer, SessionProperties* properties) { if (offer.cast_mode == CastMode::kMirroring) { @@ -465,6 +484,7 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) { client_->OnReceiversDestroying(this, reason); current_audio_receiver_.reset(); current_video_receiver_.reset(); + rpc_messenger_.reset(); } } diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h index fe4f9481..d928da5e 100644 --- a/cast/streaming/receiver_session.h +++ b/cast/streaming/receiver_session.h @@ -67,10 +67,16 @@ class ReceiverSession final : public Environment::SocketSubscriber { // once we get a remoting request from a Sender. struct RemotingNegotiation { // The configured receivers set to be used for handling audio and - // video streams. + // video streams. Unlike in the general streaming case, when we are remoting + // we don't know the codec and other information about the stream until + // the sender provices that information through the + // DemuxerStreamInitializeCallback RPC method. ConfiguredReceivers receivers; // The RPC messenger to be used for subscribing to remoting proto messages. + // Unlike the SenderSession API, the RPC messenger is negotiation specific. + // The messenger is torn down when |OnReceiversDestroying| is called, and + // is owned by the ReceiverSession. RpcMessenger* messenger; }; @@ -303,6 +309,7 @@ class ReceiverSession final : public Environment::SocketSubscriber { // Specific message type handler methods. void OnOffer(SenderMessage message); void OnCapabilitiesRequest(SenderMessage message); + void OnRpcMessage(SenderMessage message); // Selects streams from an offer based on its configuration, and sets // them in the session properties. diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc index 25b567ee..b8d64afa 100644 --- a/cast/streaming/receiver_session_unittest.cc +++ b/cast/streaming/receiver_session_unittest.cc @@ -94,6 +94,48 @@ constexpr char kValidOfferMessage[] = R"({ } })"; +constexpr char kValidRemotingOfferMessage[] = R"({ + "type": "OFFER", + "seqNum": 419, + "offer": { + "castMode": "remoting", + "supportedStreams": [ + { + "index": 31339, + "type": "video_source", + "codecName": "REMOTE_VIDEO", + "rtpProfile": "cast", + "rtpPayloadType": 127, + "ssrc": 19088745, + "maxFrameRate": "60000/1000", + "timeBase": "1/90000", + "maxBitRate": 5432101, + "aesKey": "040d756791711fd3adb939066e6d8690", + "aesIvMask": "9ff0f022a959150e70a2d05a6c184aed", + "resolutions": [ + { + "width": 1920, + "height":1080 + } + ] + }, + { + "index": 31340, + "type": "audio_source", + "codecName": "REMOTE_AUDIO", + "rtpProfile": "cast", + "rtpPayloadType": 97, + "ssrc": 19088747, + "bitRate": 125000, + "timeBase": "1/48000", + "channels": 2, + "aesKey": "51027e4e2347cbcb49d57ef10177aebc", + "aesIvMask": "7f12a19be62a36c04ae4116caaeff6d1" + } + ] + } +})"; + constexpr char kNoAudioOfferMessage[] = R"({ "type": "OFFER", "seqNum": 1337, @@ -244,6 +286,12 @@ constexpr char kGetCapabilitiesMessage[] = R"({ "type": "GET_CAPABILITIES" })"; +constexpr char kRpcMessage[] = R"({ + "rpc" : "CGQQnBiCGQgSAggMGgIIBg==", + "seqNum" : 2, + "type" : "RPC" +})"; + class FakeClient : public ReceiverSession::Client { public: MOCK_METHOD(void, @@ -251,6 +299,10 @@ class FakeClient : public ReceiverSession::Client { (const ReceiverSession*, ReceiverSession::ConfiguredReceivers), (override)); MOCK_METHOD(void, + OnRemotingNegotiated, + (const ReceiverSession*, ReceiverSession::RemotingNegotiation), + (override)); + MOCK_METHOD(void, OnReceiversDestroying, (const ReceiverSession*, ReceiversDestroyingReason), (override)); @@ -752,6 +804,83 @@ TEST_F(ReceiverSessionTest, ReturnsCapabilitiesWithRemotingPreferences) { MediaCapability::k4k)); } +TEST_F(ReceiverSessionTest, CanNegotiateRemoting) { + ReceiverSession::Preferences preferences; + preferences.remoting = + std::make_unique<ReceiverSession::RemotingPreferences>(); + preferences.remoting->supports_chrome_audio_codecs = true; + preferences.remoting->supports_4k = true; + SetUpWithPreferences(std::move(preferences)); + + InSequence s; + EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _)) + .WillOnce([](const ReceiverSession* session_, + ReceiverSession::RemotingNegotiation negotiation) { + const auto& cr = negotiation.receivers; + EXPECT_TRUE(cr.audio_receiver); + EXPECT_EQ(cr.audio_receiver->config().sender_ssrc, 19088747u); + EXPECT_EQ(cr.audio_receiver->config().receiver_ssrc, 19088748u); + EXPECT_EQ(cr.audio_receiver->config().channels, 2); + EXPECT_EQ(cr.audio_receiver->config().rtp_timebase, 48000); + EXPECT_EQ(cr.audio_config.codec, AudioCodec::kNotSpecified); + + EXPECT_TRUE(cr.video_receiver); + EXPECT_EQ(cr.video_receiver->config().sender_ssrc, 19088745u); + EXPECT_EQ(cr.video_receiver->config().receiver_ssrc, 19088746u); + EXPECT_EQ(cr.video_receiver->config().channels, 1); + EXPECT_EQ(cr.video_receiver->config().rtp_timebase, 90000); + EXPECT_EQ(cr.video_config.codec, VideoCodec::kNotSpecified); + }); + EXPECT_CALL(client_, + OnReceiversDestroying(session_.get(), + ReceiverSession::Client::kEndOfSession)); + + message_port_->ReceiveMessage(kValidRemotingOfferMessage); +} + +TEST_F(ReceiverSessionTest, HandlesRpcMessage) { + ReceiverSession::Preferences preferences; + preferences.remoting = + std::make_unique<ReceiverSession::RemotingPreferences>(); + preferences.remoting->supports_chrome_audio_codecs = true; + preferences.remoting->supports_4k = true; + SetUpWithPreferences(std::move(preferences)); + + message_port_->ReceiveMessage(kRpcMessage); + const auto& messages = message_port_->posted_messages(); + // Nothing should happen yet, the session doesn't have a messenger. + ASSERT_EQ(0u, messages.size()); + + // We don't need to fully test that the subscription model on the RpcMessenger + // works, but we do want to test that the ReceiverSession has properly wired + // the RpcMessenger up to the backing SessionMessenger and can properly + // handle received RPC messages. + InSequence s; + bool received_initialize_message = false; + EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _)) + .WillOnce([this, &received_initialize_message]( + const ReceiverSession* session_, + ReceiverSession::RemotingNegotiation negotiation) mutable { + negotiation.messenger->RegisterMessageReceiverCallback( + 100, [&received_initialize_message]( + std::unique_ptr<RpcMessage> message) mutable { + ASSERT_EQ(100, message->handle()); + ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE_CALLBACK, + message->proc()); + ASSERT_EQ(0, message->integer_value()); + received_initialize_message = true; + }); + + message_port_->ReceiveMessage(kRpcMessage); + }); + EXPECT_CALL(client_, + OnReceiversDestroying(session_.get(), + ReceiverSession::Client::kEndOfSession)); + + message_port_->ReceiveMessage(kValidRemotingOfferMessage); + ASSERT_TRUE(received_initialize_message); +} + TEST_F(ReceiverSessionTest, VideoLimitsIsSupersetOf) { ReceiverSession::VideoLimits first{}; ReceiverSession::VideoLimits second = first; diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc index 54b2e279..63176f7c 100644 --- a/cast/streaming/rpc_messenger.cc +++ b/cast/streaming/rpc_messenger.cc @@ -72,7 +72,8 @@ void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message, std::size_t message_len) { auto rpc = std::make_unique<RpcMessage>(); if (!rpc->ParseFromArray(message, message_len)) { - OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message; + OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message + << "\""; return; } OSP_DVLOG << "Received RPC message: " << *rpc; diff --git a/cast/streaming/rpc_messenger_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc index e0627542..758a9036 100644 --- a/cast/streaming/rpc_messenger_unittest.cc +++ b/cast/streaming/rpc_messenger_unittest.cc @@ -125,16 +125,16 @@ TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) { // Send message for RPC messenger to process. RpcMessage sent_rpc; sent_rpc.set_handle(fake_messenger_->handle()); - sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME); - sent_rpc.set_double_value(3.4); + sent_rpc.set_proc(RpcMessage::RPC_DS_INITIALIZE); + sent_rpc.set_integer_value(4004); ProcessMessage(sent_rpc); // Checks if received message is identical to the one sent earlier. ASSERT_EQ(1, fake_messenger_->received_count()); const RpcMessage& received_rpc = fake_messenger_->received_rpc(); ASSERT_EQ(fake_messenger_->handle(), received_rpc.handle()); - ASSERT_EQ(RpcMessage::RPC_R_SETVOLUME, received_rpc.proc()); - ASSERT_EQ(3.4, received_rpc.double_value()); + ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE, received_rpc.proc()); + ASSERT_EQ(4004, received_rpc.integer_value()); } TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) { diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc index bbb3972f..f59f0d70 100644 --- a/cast/streaming/sender_session.cc +++ b/cast/streaming/sender_session.cc @@ -210,6 +210,9 @@ SenderSession::SenderSession(Configuration config) config_.client->OnError(this, error); }, config_.environment->task_runner()), + rpc_messenger_([this](std::vector<uint8_t> message) { + SendRpcMessage(std::move(message)); + }), packet_router_(config_.environment) { OSP_DCHECK(config_.client); OSP_DCHECK(config_.environment); @@ -265,7 +268,6 @@ void SenderSession::ResetState() { current_negotiation_.reset(); current_audio_sender_.reset(); current_video_sender_.reset(); - rpc_messenger_.reset(); } Error SenderSession::StartNegotiation( @@ -299,7 +301,7 @@ void SenderSession::OnAnswer(ReceiverMessage message) { return; } - state_ = State::kMirroring; + state_ = State::kStreaming; config_.client->OnNegotiated( this, std::move(senders), capture_recommendations::GetRecommendations(answer)); @@ -361,27 +363,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) { "Failed to negotiate a remoting session.")); return; } - rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) { - Error error = this->messenger_.SendOutboundMessage(SenderMessage{ - SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true, - std::move(message)}); - - if (!error.ok()) { - OSP_LOG_WARN << "Failed to send RPC message: " << error; - } - }); config_.client->OnRemotingNegotiated( - this, RemotingNegotiation{std::move(senders), ToCapabilities(caps), - rpc_messenger_.get()}); + this, RemotingNegotiation{std::move(senders), ToCapabilities(caps)}); } void SenderSession::OnRpcMessage(ReceiverMessage message) { - if (!rpc_messenger_) { - OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger."; - return; - } - if (!message.valid) { HandleErrorMessage( message, @@ -390,7 +377,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) { } const auto& body = absl::get<std::vector<uint8_t>>(message.body); - rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size()); + rpc_messenger_.ProcessMessageFromRemote(body.data(), body.size()); } void SenderSession::HandleErrorMessage(ReceiverMessage message, @@ -492,5 +479,15 @@ SenderSession::ConfiguredSenders SenderSession::SpawnSenders( return senders; } +void SenderSession::SendRpcMessage(std::vector<uint8_t> message_body) { + Error error = this->messenger_.SendOutboundMessage(SenderMessage{ + SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true, + std::move(message_body)}); + + if (!error.ok()) { + OSP_LOG_WARN << "Failed to send RPC message: " << error; + } +} + } // namespace cast } // namespace openscreen diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h index 7995181f..f7fbb97a 100644 --- a/cast/streaming/sender_session.h +++ b/cast/streaming/sender_session.h @@ -58,9 +58,6 @@ class SenderSession final { // about legacy devices, such as pre-1.27 Earth receivers should do // a version check when using these capabilities to offer remoting. RemotingCapabilities capabilities; - - // The RPC messenger to be used for subscribing to remoting proto messages. - RpcMessenger* messenger; }; // The embedder should provide a client for handling negotiation events. @@ -158,6 +155,11 @@ class SenderSession final { // feedback. Embedders may use this information to throttle capture devices. int GetEstimatedNetworkBandwidth() const; + // The RPC messenger for this session. NOTE: RPC messages may come at + // any time from the receiver, so subscriptions to RPC remoting messages + // should be done before calling |NegotiateRemoting|. + RpcMessenger* rpc_messenger() { return &rpc_messenger_; } + private: // We store the current negotiation, so that when we get an answer from the // receiver we can line up the selected streams with the original @@ -183,7 +185,7 @@ class SenderSession final { kIdle, // Currently mirroring content to a receiver. - kMirroring, + kStreaming, // Currently remoting content to a receiver. kRemoting @@ -225,6 +227,9 @@ class SenderSession final { // Spawn a set of configured senders from the currently stored negotiation. ConfiguredSenders SpawnSenders(const Answer& answer); + // Used by the RPC messenger to send outbound messages. + void SendRpcMessage(std::vector<uint8_t> message_body); + // This session's configuration. Configuration config_; @@ -233,6 +238,10 @@ class SenderSession final { // cast/protocol/castv2/streaming_schema.json. SenderSessionMessenger messenger_; + // The RPC messenger, which uses the session messager for sending RPC messages + // and handles subscriptions to RPC messages. + RpcMessenger rpc_messenger_; + // The packet router used for RTP/RTCP messaging across all senders. SenderPacketRouter packet_router_; @@ -246,7 +255,7 @@ class SenderSession final { std::unique_ptr<InProcessNegotiation> current_negotiation_; // The current state of the session. Note that the state is intentionally - // limited. |kMirroring| or |kRemoting| means that we are either starting + // limited. |kStreaming| or |kRemoting| means that we are either starting // a negotiation or actively sending to a receiver. State state_ = State::kIdle; @@ -254,11 +263,6 @@ class SenderSession final { // senders used for this session. Either or both may be nullptr. std::unique_ptr<Sender> current_audio_sender_; std::unique_ptr<Sender> current_video_sender_; - - // If remoting, we store the RpcMessenger used by the embedder to send RPC - // messages from the remoting protobuf specification. For more information, - // see //cast/streaming/remoting.proto. - std::unique_ptr<RpcMessenger> rpc_messenger_; }; // namespace cast } // namespace cast diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc index 36b1bae8..bfe4cfab 100644 --- a/cast/streaming/sender_session_unittest.cc +++ b/cast/streaming/sender_session_unittest.cc @@ -545,8 +545,9 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) { // The messenger is tested elsewhere, but we can sanity check that we got a valid // one here. - EXPECT_TRUE(negotiation.messenger); - const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle(); + EXPECT_TRUE(session_->rpc_messenger()); + const RpcMessenger::Handle handle = + session_->rpc_messenger()->GetUniqueHandle(); EXPECT_NE(RpcMessenger::kInvalidHandle, handle); } diff --git a/cast/streaming/session_messenger.cc b/cast/streaming/session_messenger.cc index 34c4c02f..389d87bb 100644 --- a/cast/streaming/session_messenger.cc +++ b/cast/streaming/session_messenger.cc @@ -146,13 +146,6 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id, return; } - int sequence_number; - if (!json::TryParseInt(message_body.value()[kSequenceNumber], - &sequence_number)) { - OSP_DLOG_WARN << "Received a message without a sequence number"; - return; - } - // If the message is valid JSON and we don't understand it, there are two // options: (1) it's an unknown type, or (2) the receiver filled out the // message incorrectly. In the first case we can drop it, it's likely just @@ -173,6 +166,13 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id, OSP_DLOG_INFO << "Received RTP message but no callback, dropping"; } } else { + int sequence_number; + if (!json::TryParseInt(message_body.value()[kSequenceNumber], + &sequence_number)) { + OSP_DLOG_WARN << "Received a message without a sequence number"; + return; + } + auto it = awaiting_replies_.find(sequence_number); if (it == awaiting_replies_.end()) { OSP_DLOG_WARN << "Received a reply I wasn't waiting for: " |