diff options
Diffstat (limited to 'cast/streaming')
-rw-r--r-- | cast/streaming/constants.h | 2 | ||||
-rw-r--r-- | cast/streaming/offer_messages.h | 2 | ||||
-rw-r--r-- | cast/streaming/receiver_message.cc | 12 | ||||
-rw-r--r-- | cast/streaming/receiver_session.cc | 20 | ||||
-rw-r--r-- | cast/streaming/receiver_session.h | 9 | ||||
-rw-r--r-- | cast/streaming/receiver_session_unittest.cc | 129 | ||||
-rw-r--r-- | cast/streaming/rpc_messenger.cc | 3 | ||||
-rw-r--r-- | cast/streaming/rpc_messenger_unittest.cc | 8 | ||||
-rw-r--r-- | cast/streaming/sender_session.cc | 35 | ||||
-rw-r--r-- | cast/streaming/sender_session.h | 24 | ||||
-rw-r--r-- | cast/streaming/sender_session_unittest.cc | 5 | ||||
-rw-r--r-- | cast/streaming/session_messenger.cc | 14 |
12 files changed, 213 insertions, 50 deletions
diff --git a/cast/streaming/constants.h b/cast/streaming/constants.h index c640796d..668b6bca 100644 --- a/cast/streaming/constants.h +++ b/cast/streaming/constants.h @@ -111,6 +111,8 @@ constexpr int kSupportedRemotingVersion = 2; enum class AudioCodec { kAac, kOpus, kNotSpecified }; enum class VideoCodec { kH264, kVp8, kHevc, kVp9, kNotSpecified }; +enum class CastMode : uint8_t { kMirroring, kRemoting }; + } // namespace cast } // namespace openscreen diff --git a/cast/streaming/offer_messages.h b/cast/streaming/offer_messages.h index c4899eec..c2be5bfa 100644 --- a/cast/streaming/offer_messages.h +++ b/cast/streaming/offer_messages.h @@ -97,8 +97,6 @@ struct VideoStream { std::string error_recovery_mode = {}; }; -enum class CastMode : uint8_t { kMirroring, kRemoting }; - struct Offer { // TODO(jophba): remove deprecated declaration in a separate patch. static ErrorOr<Offer> Parse(const Json::Value& root); diff --git a/cast/streaming/receiver_message.cc b/cast/streaming/receiver_message.cc index e1af610c..4fb1a8a6 100644 --- a/cast/streaming/receiver_message.cc +++ b/cast/streaming/receiver_message.cc @@ -128,10 +128,8 @@ Json::Value ReceiverCapability::ToJson() const { // static ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) { ReceiverMessage message; - if (!value || - !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) { - return Error(Error::Code::kJsonParseError, - "Failed to parse sequence number"); + if (!value) { + return Error(Error::Code::kJsonParseError, "Invalid message body"); } std::string result; @@ -184,6 +182,12 @@ ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) { break; } + if (message.type != ReceiverMessage::Type::kRpc && + !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) { + message.sequence_number = -1; + message.valid = false; + } + return message; } diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc index 36a47bca..056bbabf 100644 --- a/cast/streaming/receiver_session.cc +++ b/cast/streaming/receiver_session.cc @@ -240,6 +240,10 @@ ReceiverSession::ReceiverSession(Client* const client, [this](SenderMessage message) { OnCapabilitiesRequest(std::move(message)); }); + messenger_.SetHandler(SenderMessage::Type::kRpc, + [this](SenderMessage message) { + this->OnRpcMessage(std::move(message)); + }); environment_->SetSocketSubscriber(this); } @@ -354,6 +358,21 @@ void ReceiverSession::OnCapabilitiesRequest(SenderMessage message) { } } +void ReceiverSession::OnRpcMessage(SenderMessage message) { + if (!message.valid) { + OSP_DLOG_WARN + << "Bad RPC message. This may or may not represent a serious problem."; + return; + } + + const auto& body = absl::get<std::vector<uint8_t>>(message.body); + if (!rpc_messenger_) { + OSP_DLOG_INFO << "Received an RPC message without having a messenger."; + return; + } + rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size()); +} + void ReceiverSession::SelectStreams(const Offer& offer, SessionProperties* properties) { if (offer.cast_mode == CastMode::kMirroring) { @@ -465,6 +484,7 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) { client_->OnReceiversDestroying(this, reason); current_audio_receiver_.reset(); current_video_receiver_.reset(); + rpc_messenger_.reset(); } } diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h index fe4f9481..d928da5e 100644 --- a/cast/streaming/receiver_session.h +++ b/cast/streaming/receiver_session.h @@ -67,10 +67,16 @@ class ReceiverSession final : public Environment::SocketSubscriber { // once we get a remoting request from a Sender. struct RemotingNegotiation { // The configured receivers set to be used for handling audio and - // video streams. + // video streams. Unlike in the general streaming case, when we are remoting + // we don't know the codec and other information about the stream until + // the sender provices that information through the + // DemuxerStreamInitializeCallback RPC method. ConfiguredReceivers receivers; // The RPC messenger to be used for subscribing to remoting proto messages. + // Unlike the SenderSession API, the RPC messenger is negotiation specific. + // The messenger is torn down when |OnReceiversDestroying| is called, and + // is owned by the ReceiverSession. RpcMessenger* messenger; }; @@ -303,6 +309,7 @@ class ReceiverSession final : public Environment::SocketSubscriber { // Specific message type handler methods. void OnOffer(SenderMessage message); void OnCapabilitiesRequest(SenderMessage message); + void OnRpcMessage(SenderMessage message); // Selects streams from an offer based on its configuration, and sets // them in the session properties. diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc index 25b567ee..b8d64afa 100644 --- a/cast/streaming/receiver_session_unittest.cc +++ b/cast/streaming/receiver_session_unittest.cc @@ -94,6 +94,48 @@ constexpr char kValidOfferMessage[] = R"({ } })"; +constexpr char kValidRemotingOfferMessage[] = R"({ + "type": "OFFER", + "seqNum": 419, + "offer": { + "castMode": "remoting", + "supportedStreams": [ + { + "index": 31339, + "type": "video_source", + "codecName": "REMOTE_VIDEO", + "rtpProfile": "cast", + "rtpPayloadType": 127, + "ssrc": 19088745, + "maxFrameRate": "60000/1000", + "timeBase": "1/90000", + "maxBitRate": 5432101, + "aesKey": "040d756791711fd3adb939066e6d8690", + "aesIvMask": "9ff0f022a959150e70a2d05a6c184aed", + "resolutions": [ + { + "width": 1920, + "height":1080 + } + ] + }, + { + "index": 31340, + "type": "audio_source", + "codecName": "REMOTE_AUDIO", + "rtpProfile": "cast", + "rtpPayloadType": 97, + "ssrc": 19088747, + "bitRate": 125000, + "timeBase": "1/48000", + "channels": 2, + "aesKey": "51027e4e2347cbcb49d57ef10177aebc", + "aesIvMask": "7f12a19be62a36c04ae4116caaeff6d1" + } + ] + } +})"; + constexpr char kNoAudioOfferMessage[] = R"({ "type": "OFFER", "seqNum": 1337, @@ -244,6 +286,12 @@ constexpr char kGetCapabilitiesMessage[] = R"({ "type": "GET_CAPABILITIES" })"; +constexpr char kRpcMessage[] = R"({ + "rpc" : "CGQQnBiCGQgSAggMGgIIBg==", + "seqNum" : 2, + "type" : "RPC" +})"; + class FakeClient : public ReceiverSession::Client { public: MOCK_METHOD(void, @@ -251,6 +299,10 @@ class FakeClient : public ReceiverSession::Client { (const ReceiverSession*, ReceiverSession::ConfiguredReceivers), (override)); MOCK_METHOD(void, + OnRemotingNegotiated, + (const ReceiverSession*, ReceiverSession::RemotingNegotiation), + (override)); + MOCK_METHOD(void, OnReceiversDestroying, (const ReceiverSession*, ReceiversDestroyingReason), (override)); @@ -752,6 +804,83 @@ TEST_F(ReceiverSessionTest, ReturnsCapabilitiesWithRemotingPreferences) { MediaCapability::k4k)); } +TEST_F(ReceiverSessionTest, CanNegotiateRemoting) { + ReceiverSession::Preferences preferences; + preferences.remoting = + std::make_unique<ReceiverSession::RemotingPreferences>(); + preferences.remoting->supports_chrome_audio_codecs = true; + preferences.remoting->supports_4k = true; + SetUpWithPreferences(std::move(preferences)); + + InSequence s; + EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _)) + .WillOnce([](const ReceiverSession* session_, + ReceiverSession::RemotingNegotiation negotiation) { + const auto& cr = negotiation.receivers; + EXPECT_TRUE(cr.audio_receiver); + EXPECT_EQ(cr.audio_receiver->config().sender_ssrc, 19088747u); + EXPECT_EQ(cr.audio_receiver->config().receiver_ssrc, 19088748u); + EXPECT_EQ(cr.audio_receiver->config().channels, 2); + EXPECT_EQ(cr.audio_receiver->config().rtp_timebase, 48000); + EXPECT_EQ(cr.audio_config.codec, AudioCodec::kNotSpecified); + + EXPECT_TRUE(cr.video_receiver); + EXPECT_EQ(cr.video_receiver->config().sender_ssrc, 19088745u); + EXPECT_EQ(cr.video_receiver->config().receiver_ssrc, 19088746u); + EXPECT_EQ(cr.video_receiver->config().channels, 1); + EXPECT_EQ(cr.video_receiver->config().rtp_timebase, 90000); + EXPECT_EQ(cr.video_config.codec, VideoCodec::kNotSpecified); + }); + EXPECT_CALL(client_, + OnReceiversDestroying(session_.get(), + ReceiverSession::Client::kEndOfSession)); + + message_port_->ReceiveMessage(kValidRemotingOfferMessage); +} + +TEST_F(ReceiverSessionTest, HandlesRpcMessage) { + ReceiverSession::Preferences preferences; + preferences.remoting = + std::make_unique<ReceiverSession::RemotingPreferences>(); + preferences.remoting->supports_chrome_audio_codecs = true; + preferences.remoting->supports_4k = true; + SetUpWithPreferences(std::move(preferences)); + + message_port_->ReceiveMessage(kRpcMessage); + const auto& messages = message_port_->posted_messages(); + // Nothing should happen yet, the session doesn't have a messenger. + ASSERT_EQ(0u, messages.size()); + + // We don't need to fully test that the subscription model on the RpcMessenger + // works, but we do want to test that the ReceiverSession has properly wired + // the RpcMessenger up to the backing SessionMessenger and can properly + // handle received RPC messages. + InSequence s; + bool received_initialize_message = false; + EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _)) + .WillOnce([this, &received_initialize_message]( + const ReceiverSession* session_, + ReceiverSession::RemotingNegotiation negotiation) mutable { + negotiation.messenger->RegisterMessageReceiverCallback( + 100, [&received_initialize_message]( + std::unique_ptr<RpcMessage> message) mutable { + ASSERT_EQ(100, message->handle()); + ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE_CALLBACK, + message->proc()); + ASSERT_EQ(0, message->integer_value()); + received_initialize_message = true; + }); + + message_port_->ReceiveMessage(kRpcMessage); + }); + EXPECT_CALL(client_, + OnReceiversDestroying(session_.get(), + ReceiverSession::Client::kEndOfSession)); + + message_port_->ReceiveMessage(kValidRemotingOfferMessage); + ASSERT_TRUE(received_initialize_message); +} + TEST_F(ReceiverSessionTest, VideoLimitsIsSupersetOf) { ReceiverSession::VideoLimits first{}; ReceiverSession::VideoLimits second = first; diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc index 54b2e279..63176f7c 100644 --- a/cast/streaming/rpc_messenger.cc +++ b/cast/streaming/rpc_messenger.cc @@ -72,7 +72,8 @@ void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message, std::size_t message_len) { auto rpc = std::make_unique<RpcMessage>(); if (!rpc->ParseFromArray(message, message_len)) { - OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message; + OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message + << "\""; return; } OSP_DVLOG << "Received RPC message: " << *rpc; diff --git a/cast/streaming/rpc_messenger_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc index e0627542..758a9036 100644 --- a/cast/streaming/rpc_messenger_unittest.cc +++ b/cast/streaming/rpc_messenger_unittest.cc @@ -125,16 +125,16 @@ TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) { // Send message for RPC messenger to process. RpcMessage sent_rpc; sent_rpc.set_handle(fake_messenger_->handle()); - sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME); - sent_rpc.set_double_value(3.4); + sent_rpc.set_proc(RpcMessage::RPC_DS_INITIALIZE); + sent_rpc.set_integer_value(4004); ProcessMessage(sent_rpc); // Checks if received message is identical to the one sent earlier. ASSERT_EQ(1, fake_messenger_->received_count()); const RpcMessage& received_rpc = fake_messenger_->received_rpc(); ASSERT_EQ(fake_messenger_->handle(), received_rpc.handle()); - ASSERT_EQ(RpcMessage::RPC_R_SETVOLUME, received_rpc.proc()); - ASSERT_EQ(3.4, received_rpc.double_value()); + ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE, received_rpc.proc()); + ASSERT_EQ(4004, received_rpc.integer_value()); } TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) { diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc index bbb3972f..f59f0d70 100644 --- a/cast/streaming/sender_session.cc +++ b/cast/streaming/sender_session.cc @@ -210,6 +210,9 @@ SenderSession::SenderSession(Configuration config) config_.client->OnError(this, error); }, config_.environment->task_runner()), + rpc_messenger_([this](std::vector<uint8_t> message) { + SendRpcMessage(std::move(message)); + }), packet_router_(config_.environment) { OSP_DCHECK(config_.client); OSP_DCHECK(config_.environment); @@ -265,7 +268,6 @@ void SenderSession::ResetState() { current_negotiation_.reset(); current_audio_sender_.reset(); current_video_sender_.reset(); - rpc_messenger_.reset(); } Error SenderSession::StartNegotiation( @@ -299,7 +301,7 @@ void SenderSession::OnAnswer(ReceiverMessage message) { return; } - state_ = State::kMirroring; + state_ = State::kStreaming; config_.client->OnNegotiated( this, std::move(senders), capture_recommendations::GetRecommendations(answer)); @@ -361,27 +363,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) { "Failed to negotiate a remoting session.")); return; } - rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) { - Error error = this->messenger_.SendOutboundMessage(SenderMessage{ - SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true, - std::move(message)}); - - if (!error.ok()) { - OSP_LOG_WARN << "Failed to send RPC message: " << error; - } - }); config_.client->OnRemotingNegotiated( - this, RemotingNegotiation{std::move(senders), ToCapabilities(caps), - rpc_messenger_.get()}); + this, RemotingNegotiation{std::move(senders), ToCapabilities(caps)}); } void SenderSession::OnRpcMessage(ReceiverMessage message) { - if (!rpc_messenger_) { - OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger."; - return; - } - if (!message.valid) { HandleErrorMessage( message, @@ -390,7 +377,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) { } const auto& body = absl::get<std::vector<uint8_t>>(message.body); - rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size()); + rpc_messenger_.ProcessMessageFromRemote(body.data(), body.size()); } void SenderSession::HandleErrorMessage(ReceiverMessage message, @@ -492,5 +479,15 @@ SenderSession::ConfiguredSenders SenderSession::SpawnSenders( return senders; } +void SenderSession::SendRpcMessage(std::vector<uint8_t> message_body) { + Error error = this->messenger_.SendOutboundMessage(SenderMessage{ + SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true, + std::move(message_body)}); + + if (!error.ok()) { + OSP_LOG_WARN << "Failed to send RPC message: " << error; + } +} + } // namespace cast } // namespace openscreen diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h index 7995181f..f7fbb97a 100644 --- a/cast/streaming/sender_session.h +++ b/cast/streaming/sender_session.h @@ -58,9 +58,6 @@ class SenderSession final { // about legacy devices, such as pre-1.27 Earth receivers should do // a version check when using these capabilities to offer remoting. RemotingCapabilities capabilities; - - // The RPC messenger to be used for subscribing to remoting proto messages. - RpcMessenger* messenger; }; // The embedder should provide a client for handling negotiation events. @@ -158,6 +155,11 @@ class SenderSession final { // feedback. Embedders may use this information to throttle capture devices. int GetEstimatedNetworkBandwidth() const; + // The RPC messenger for this session. NOTE: RPC messages may come at + // any time from the receiver, so subscriptions to RPC remoting messages + // should be done before calling |NegotiateRemoting|. + RpcMessenger* rpc_messenger() { return &rpc_messenger_; } + private: // We store the current negotiation, so that when we get an answer from the // receiver we can line up the selected streams with the original @@ -183,7 +185,7 @@ class SenderSession final { kIdle, // Currently mirroring content to a receiver. - kMirroring, + kStreaming, // Currently remoting content to a receiver. kRemoting @@ -225,6 +227,9 @@ class SenderSession final { // Spawn a set of configured senders from the currently stored negotiation. ConfiguredSenders SpawnSenders(const Answer& answer); + // Used by the RPC messenger to send outbound messages. + void SendRpcMessage(std::vector<uint8_t> message_body); + // This session's configuration. Configuration config_; @@ -233,6 +238,10 @@ class SenderSession final { // cast/protocol/castv2/streaming_schema.json. SenderSessionMessenger messenger_; + // The RPC messenger, which uses the session messager for sending RPC messages + // and handles subscriptions to RPC messages. + RpcMessenger rpc_messenger_; + // The packet router used for RTP/RTCP messaging across all senders. SenderPacketRouter packet_router_; @@ -246,7 +255,7 @@ class SenderSession final { std::unique_ptr<InProcessNegotiation> current_negotiation_; // The current state of the session. Note that the state is intentionally - // limited. |kMirroring| or |kRemoting| means that we are either starting + // limited. |kStreaming| or |kRemoting| means that we are either starting // a negotiation or actively sending to a receiver. State state_ = State::kIdle; @@ -254,11 +263,6 @@ class SenderSession final { // senders used for this session. Either or both may be nullptr. std::unique_ptr<Sender> current_audio_sender_; std::unique_ptr<Sender> current_video_sender_; - - // If remoting, we store the RpcMessenger used by the embedder to send RPC - // messages from the remoting protobuf specification. For more information, - // see //cast/streaming/remoting.proto. - std::unique_ptr<RpcMessenger> rpc_messenger_; }; // namespace cast } // namespace cast diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc index 36b1bae8..bfe4cfab 100644 --- a/cast/streaming/sender_session_unittest.cc +++ b/cast/streaming/sender_session_unittest.cc @@ -545,8 +545,9 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) { // The messenger is tested elsewhere, but we can sanity check that we got a valid // one here. - EXPECT_TRUE(negotiation.messenger); - const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle(); + EXPECT_TRUE(session_->rpc_messenger()); + const RpcMessenger::Handle handle = + session_->rpc_messenger()->GetUniqueHandle(); EXPECT_NE(RpcMessenger::kInvalidHandle, handle); } diff --git a/cast/streaming/session_messenger.cc b/cast/streaming/session_messenger.cc index 34c4c02f..389d87bb 100644 --- a/cast/streaming/session_messenger.cc +++ b/cast/streaming/session_messenger.cc @@ -146,13 +146,6 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id, return; } - int sequence_number; - if (!json::TryParseInt(message_body.value()[kSequenceNumber], - &sequence_number)) { - OSP_DLOG_WARN << "Received a message without a sequence number"; - return; - } - // If the message is valid JSON and we don't understand it, there are two // options: (1) it's an unknown type, or (2) the receiver filled out the // message incorrectly. In the first case we can drop it, it's likely just @@ -173,6 +166,13 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id, OSP_DLOG_INFO << "Received RTP message but no callback, dropping"; } } else { + int sequence_number; + if (!json::TryParseInt(message_body.value()[kSequenceNumber], + &sequence_number)) { + OSP_DLOG_WARN << "Received a message without a sequence number"; + return; + } + auto it = awaiting_replies_.find(sequence_number); if (it == awaiting_replies_.end()) { OSP_DLOG_WARN << "Received a reply I wasn't waiting for: " |