aboutsummaryrefslogtreecommitdiff
path: root/cast/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'cast/streaming')
-rw-r--r--cast/streaming/constants.h2
-rw-r--r--cast/streaming/offer_messages.h2
-rw-r--r--cast/streaming/receiver_message.cc12
-rw-r--r--cast/streaming/receiver_session.cc20
-rw-r--r--cast/streaming/receiver_session.h9
-rw-r--r--cast/streaming/receiver_session_unittest.cc129
-rw-r--r--cast/streaming/rpc_messenger.cc3
-rw-r--r--cast/streaming/rpc_messenger_unittest.cc8
-rw-r--r--cast/streaming/sender_session.cc35
-rw-r--r--cast/streaming/sender_session.h24
-rw-r--r--cast/streaming/sender_session_unittest.cc5
-rw-r--r--cast/streaming/session_messenger.cc14
12 files changed, 213 insertions, 50 deletions
diff --git a/cast/streaming/constants.h b/cast/streaming/constants.h
index c640796d..668b6bca 100644
--- a/cast/streaming/constants.h
+++ b/cast/streaming/constants.h
@@ -111,6 +111,8 @@ constexpr int kSupportedRemotingVersion = 2;
enum class AudioCodec { kAac, kOpus, kNotSpecified };
enum class VideoCodec { kH264, kVp8, kHevc, kVp9, kNotSpecified };
+enum class CastMode : uint8_t { kMirroring, kRemoting };
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/offer_messages.h b/cast/streaming/offer_messages.h
index c4899eec..c2be5bfa 100644
--- a/cast/streaming/offer_messages.h
+++ b/cast/streaming/offer_messages.h
@@ -97,8 +97,6 @@ struct VideoStream {
std::string error_recovery_mode = {};
};
-enum class CastMode : uint8_t { kMirroring, kRemoting };
-
struct Offer {
// TODO(jophba): remove deprecated declaration in a separate patch.
static ErrorOr<Offer> Parse(const Json::Value& root);
diff --git a/cast/streaming/receiver_message.cc b/cast/streaming/receiver_message.cc
index e1af610c..4fb1a8a6 100644
--- a/cast/streaming/receiver_message.cc
+++ b/cast/streaming/receiver_message.cc
@@ -128,10 +128,8 @@ Json::Value ReceiverCapability::ToJson() const {
// static
ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
ReceiverMessage message;
- if (!value ||
- !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
- return Error(Error::Code::kJsonParseError,
- "Failed to parse sequence number");
+ if (!value) {
+ return Error(Error::Code::kJsonParseError, "Invalid message body");
}
std::string result;
@@ -184,6 +182,12 @@ ErrorOr<ReceiverMessage> ReceiverMessage::Parse(const Json::Value& value) {
break;
}
+ if (message.type != ReceiverMessage::Type::kRpc &&
+ !json::TryParseInt(value[kSequenceNumber], &(message.sequence_number))) {
+ message.sequence_number = -1;
+ message.valid = false;
+ }
+
return message;
}
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index 36a47bca..056bbabf 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -240,6 +240,10 @@ ReceiverSession::ReceiverSession(Client* const client,
[this](SenderMessage message) {
OnCapabilitiesRequest(std::move(message));
});
+ messenger_.SetHandler(SenderMessage::Type::kRpc,
+ [this](SenderMessage message) {
+ this->OnRpcMessage(std::move(message));
+ });
environment_->SetSocketSubscriber(this);
}
@@ -354,6 +358,21 @@ void ReceiverSession::OnCapabilitiesRequest(SenderMessage message) {
}
}
+void ReceiverSession::OnRpcMessage(SenderMessage message) {
+ if (!message.valid) {
+ OSP_DLOG_WARN
+ << "Bad RPC message. This may or may not represent a serious problem.";
+ return;
+ }
+
+ const auto& body = absl::get<std::vector<uint8_t>>(message.body);
+ if (!rpc_messenger_) {
+ OSP_DLOG_INFO << "Received an RPC message without having a messenger.";
+ return;
+ }
+ rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+}
+
void ReceiverSession::SelectStreams(const Offer& offer,
SessionProperties* properties) {
if (offer.cast_mode == CastMode::kMirroring) {
@@ -465,6 +484,7 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
client_->OnReceiversDestroying(this, reason);
current_audio_receiver_.reset();
current_video_receiver_.reset();
+ rpc_messenger_.reset();
}
}
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index fe4f9481..d928da5e 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -67,10 +67,16 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// once we get a remoting request from a Sender.
struct RemotingNegotiation {
// The configured receivers set to be used for handling audio and
- // video streams.
+ // video streams. Unlike in the general streaming case, when we are remoting
+ // we don't know the codec and other information about the stream until
+ // the sender provices that information through the
+ // DemuxerStreamInitializeCallback RPC method.
ConfiguredReceivers receivers;
// The RPC messenger to be used for subscribing to remoting proto messages.
+ // Unlike the SenderSession API, the RPC messenger is negotiation specific.
+ // The messenger is torn down when |OnReceiversDestroying| is called, and
+ // is owned by the ReceiverSession.
RpcMessenger* messenger;
};
@@ -303,6 +309,7 @@ class ReceiverSession final : public Environment::SocketSubscriber {
// Specific message type handler methods.
void OnOffer(SenderMessage message);
void OnCapabilitiesRequest(SenderMessage message);
+ void OnRpcMessage(SenderMessage message);
// Selects streams from an offer based on its configuration, and sets
// them in the session properties.
diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc
index 25b567ee..b8d64afa 100644
--- a/cast/streaming/receiver_session_unittest.cc
+++ b/cast/streaming/receiver_session_unittest.cc
@@ -94,6 +94,48 @@ constexpr char kValidOfferMessage[] = R"({
}
})";
+constexpr char kValidRemotingOfferMessage[] = R"({
+ "type": "OFFER",
+ "seqNum": 419,
+ "offer": {
+ "castMode": "remoting",
+ "supportedStreams": [
+ {
+ "index": 31339,
+ "type": "video_source",
+ "codecName": "REMOTE_VIDEO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 127,
+ "ssrc": 19088745,
+ "maxFrameRate": "60000/1000",
+ "timeBase": "1/90000",
+ "maxBitRate": 5432101,
+ "aesKey": "040d756791711fd3adb939066e6d8690",
+ "aesIvMask": "9ff0f022a959150e70a2d05a6c184aed",
+ "resolutions": [
+ {
+ "width": 1920,
+ "height":1080
+ }
+ ]
+ },
+ {
+ "index": 31340,
+ "type": "audio_source",
+ "codecName": "REMOTE_AUDIO",
+ "rtpProfile": "cast",
+ "rtpPayloadType": 97,
+ "ssrc": 19088747,
+ "bitRate": 125000,
+ "timeBase": "1/48000",
+ "channels": 2,
+ "aesKey": "51027e4e2347cbcb49d57ef10177aebc",
+ "aesIvMask": "7f12a19be62a36c04ae4116caaeff6d1"
+ }
+ ]
+ }
+})";
+
constexpr char kNoAudioOfferMessage[] = R"({
"type": "OFFER",
"seqNum": 1337,
@@ -244,6 +286,12 @@ constexpr char kGetCapabilitiesMessage[] = R"({
"type": "GET_CAPABILITIES"
})";
+constexpr char kRpcMessage[] = R"({
+ "rpc" : "CGQQnBiCGQgSAggMGgIIBg==",
+ "seqNum" : 2,
+ "type" : "RPC"
+})";
+
class FakeClient : public ReceiverSession::Client {
public:
MOCK_METHOD(void,
@@ -251,6 +299,10 @@ class FakeClient : public ReceiverSession::Client {
(const ReceiverSession*, ReceiverSession::ConfiguredReceivers),
(override));
MOCK_METHOD(void,
+ OnRemotingNegotiated,
+ (const ReceiverSession*, ReceiverSession::RemotingNegotiation),
+ (override));
+ MOCK_METHOD(void,
OnReceiversDestroying,
(const ReceiverSession*, ReceiversDestroyingReason),
(override));
@@ -752,6 +804,83 @@ TEST_F(ReceiverSessionTest, ReturnsCapabilitiesWithRemotingPreferences) {
MediaCapability::k4k));
}
+TEST_F(ReceiverSessionTest, CanNegotiateRemoting) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ InSequence s;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([](const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) {
+ const auto& cr = negotiation.receivers;
+ EXPECT_TRUE(cr.audio_receiver);
+ EXPECT_EQ(cr.audio_receiver->config().sender_ssrc, 19088747u);
+ EXPECT_EQ(cr.audio_receiver->config().receiver_ssrc, 19088748u);
+ EXPECT_EQ(cr.audio_receiver->config().channels, 2);
+ EXPECT_EQ(cr.audio_receiver->config().rtp_timebase, 48000);
+ EXPECT_EQ(cr.audio_config.codec, AudioCodec::kNotSpecified);
+
+ EXPECT_TRUE(cr.video_receiver);
+ EXPECT_EQ(cr.video_receiver->config().sender_ssrc, 19088745u);
+ EXPECT_EQ(cr.video_receiver->config().receiver_ssrc, 19088746u);
+ EXPECT_EQ(cr.video_receiver->config().channels, 1);
+ EXPECT_EQ(cr.video_receiver->config().rtp_timebase, 90000);
+ EXPECT_EQ(cr.video_config.codec, VideoCodec::kNotSpecified);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+}
+
+TEST_F(ReceiverSessionTest, HandlesRpcMessage) {
+ ReceiverSession::Preferences preferences;
+ preferences.remoting =
+ std::make_unique<ReceiverSession::RemotingPreferences>();
+ preferences.remoting->supports_chrome_audio_codecs = true;
+ preferences.remoting->supports_4k = true;
+ SetUpWithPreferences(std::move(preferences));
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ const auto& messages = message_port_->posted_messages();
+ // Nothing should happen yet, the session doesn't have a messenger.
+ ASSERT_EQ(0u, messages.size());
+
+ // We don't need to fully test that the subscription model on the RpcMessenger
+ // works, but we do want to test that the ReceiverSession has properly wired
+ // the RpcMessenger up to the backing SessionMessenger and can properly
+ // handle received RPC messages.
+ InSequence s;
+ bool received_initialize_message = false;
+ EXPECT_CALL(client_, OnRemotingNegotiated(session_.get(), _))
+ .WillOnce([this, &received_initialize_message](
+ const ReceiverSession* session_,
+ ReceiverSession::RemotingNegotiation negotiation) mutable {
+ negotiation.messenger->RegisterMessageReceiverCallback(
+ 100, [&received_initialize_message](
+ std::unique_ptr<RpcMessage> message) mutable {
+ ASSERT_EQ(100, message->handle());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE_CALLBACK,
+ message->proc());
+ ASSERT_EQ(0, message->integer_value());
+ received_initialize_message = true;
+ });
+
+ message_port_->ReceiveMessage(kRpcMessage);
+ });
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+
+ message_port_->ReceiveMessage(kValidRemotingOfferMessage);
+ ASSERT_TRUE(received_initialize_message);
+}
+
TEST_F(ReceiverSessionTest, VideoLimitsIsSupersetOf) {
ReceiverSession::VideoLimits first{};
ReceiverSession::VideoLimits second = first;
diff --git a/cast/streaming/rpc_messenger.cc b/cast/streaming/rpc_messenger.cc
index 54b2e279..63176f7c 100644
--- a/cast/streaming/rpc_messenger.cc
+++ b/cast/streaming/rpc_messenger.cc
@@ -72,7 +72,8 @@ void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
std::size_t message_len) {
auto rpc = std::make_unique<RpcMessage>();
if (!rpc->ParseFromArray(message, message_len)) {
- OSP_LOG_WARN << "Failed to parse RPC message from remote: " << message;
+ OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message
+ << "\"";
return;
}
OSP_DVLOG << "Received RPC message: " << *rpc;
diff --git a/cast/streaming/rpc_messenger_unittest.cc b/cast/streaming/rpc_messenger_unittest.cc
index e0627542..758a9036 100644
--- a/cast/streaming/rpc_messenger_unittest.cc
+++ b/cast/streaming/rpc_messenger_unittest.cc
@@ -125,16 +125,16 @@ TEST_F(RpcMessengerTest, ProcessMessageWithRegisteredHandle) {
// Send message for RPC messenger to process.
RpcMessage sent_rpc;
sent_rpc.set_handle(fake_messenger_->handle());
- sent_rpc.set_proc(RpcMessage::RPC_R_SETVOLUME);
- sent_rpc.set_double_value(3.4);
+ sent_rpc.set_proc(RpcMessage::RPC_DS_INITIALIZE);
+ sent_rpc.set_integer_value(4004);
ProcessMessage(sent_rpc);
// Checks if received message is identical to the one sent earlier.
ASSERT_EQ(1, fake_messenger_->received_count());
const RpcMessage& received_rpc = fake_messenger_->received_rpc();
ASSERT_EQ(fake_messenger_->handle(), received_rpc.handle());
- ASSERT_EQ(RpcMessage::RPC_R_SETVOLUME, received_rpc.proc());
- ASSERT_EQ(3.4, received_rpc.double_value());
+ ASSERT_EQ(RpcMessage::RPC_DS_INITIALIZE, received_rpc.proc());
+ ASSERT_EQ(4004, received_rpc.integer_value());
}
TEST_F(RpcMessengerTest, ProcessMessageWithUnregisteredHandle) {
diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc
index bbb3972f..f59f0d70 100644
--- a/cast/streaming/sender_session.cc
+++ b/cast/streaming/sender_session.cc
@@ -210,6 +210,9 @@ SenderSession::SenderSession(Configuration config)
config_.client->OnError(this, error);
},
config_.environment->task_runner()),
+ rpc_messenger_([this](std::vector<uint8_t> message) {
+ SendRpcMessage(std::move(message));
+ }),
packet_router_(config_.environment) {
OSP_DCHECK(config_.client);
OSP_DCHECK(config_.environment);
@@ -265,7 +268,6 @@ void SenderSession::ResetState() {
current_negotiation_.reset();
current_audio_sender_.reset();
current_video_sender_.reset();
- rpc_messenger_.reset();
}
Error SenderSession::StartNegotiation(
@@ -299,7 +301,7 @@ void SenderSession::OnAnswer(ReceiverMessage message) {
return;
}
- state_ = State::kMirroring;
+ state_ = State::kStreaming;
config_.client->OnNegotiated(
this, std::move(senders),
capture_recommendations::GetRecommendations(answer));
@@ -361,27 +363,12 @@ void SenderSession::OnCapabilitiesResponse(ReceiverMessage message) {
"Failed to negotiate a remoting session."));
return;
}
- rpc_messenger_ = std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
- Error error = this->messenger_.SendOutboundMessage(SenderMessage{
- SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
- std::move(message)});
-
- if (!error.ok()) {
- OSP_LOG_WARN << "Failed to send RPC message: " << error;
- }
- });
config_.client->OnRemotingNegotiated(
- this, RemotingNegotiation{std::move(senders), ToCapabilities(caps),
- rpc_messenger_.get()});
+ this, RemotingNegotiation{std::move(senders), ToCapabilities(caps)});
}
void SenderSession::OnRpcMessage(ReceiverMessage message) {
- if (!rpc_messenger_) {
- OSP_LOG_INFO << "Received an RPC message without having an RPCMessenger.";
- return;
- }
-
if (!message.valid) {
HandleErrorMessage(
message,
@@ -390,7 +377,7 @@ void SenderSession::OnRpcMessage(ReceiverMessage message) {
}
const auto& body = absl::get<std::vector<uint8_t>>(message.body);
- rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
+ rpc_messenger_.ProcessMessageFromRemote(body.data(), body.size());
}
void SenderSession::HandleErrorMessage(ReceiverMessage message,
@@ -492,5 +479,15 @@ SenderSession::ConfiguredSenders SenderSession::SpawnSenders(
return senders;
}
+void SenderSession::SendRpcMessage(std::vector<uint8_t> message_body) {
+ Error error = this->messenger_.SendOutboundMessage(SenderMessage{
+ SenderMessage::Type::kRpc, ++(this->current_sequence_number_), true,
+ std::move(message_body)});
+
+ if (!error.ok()) {
+ OSP_LOG_WARN << "Failed to send RPC message: " << error;
+ }
+}
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h
index 7995181f..f7fbb97a 100644
--- a/cast/streaming/sender_session.h
+++ b/cast/streaming/sender_session.h
@@ -58,9 +58,6 @@ class SenderSession final {
// about legacy devices, such as pre-1.27 Earth receivers should do
// a version check when using these capabilities to offer remoting.
RemotingCapabilities capabilities;
-
- // The RPC messenger to be used for subscribing to remoting proto messages.
- RpcMessenger* messenger;
};
// The embedder should provide a client for handling negotiation events.
@@ -158,6 +155,11 @@ class SenderSession final {
// feedback. Embedders may use this information to throttle capture devices.
int GetEstimatedNetworkBandwidth() const;
+ // The RPC messenger for this session. NOTE: RPC messages may come at
+ // any time from the receiver, so subscriptions to RPC remoting messages
+ // should be done before calling |NegotiateRemoting|.
+ RpcMessenger* rpc_messenger() { return &rpc_messenger_; }
+
private:
// We store the current negotiation, so that when we get an answer from the
// receiver we can line up the selected streams with the original
@@ -183,7 +185,7 @@ class SenderSession final {
kIdle,
// Currently mirroring content to a receiver.
- kMirroring,
+ kStreaming,
// Currently remoting content to a receiver.
kRemoting
@@ -225,6 +227,9 @@ class SenderSession final {
// Spawn a set of configured senders from the currently stored negotiation.
ConfiguredSenders SpawnSenders(const Answer& answer);
+ // Used by the RPC messenger to send outbound messages.
+ void SendRpcMessage(std::vector<uint8_t> message_body);
+
// This session's configuration.
Configuration config_;
@@ -233,6 +238,10 @@ class SenderSession final {
// cast/protocol/castv2/streaming_schema.json.
SenderSessionMessenger messenger_;
+ // The RPC messenger, which uses the session messager for sending RPC messages
+ // and handles subscriptions to RPC messages.
+ RpcMessenger rpc_messenger_;
+
// The packet router used for RTP/RTCP messaging across all senders.
SenderPacketRouter packet_router_;
@@ -246,7 +255,7 @@ class SenderSession final {
std::unique_ptr<InProcessNegotiation> current_negotiation_;
// The current state of the session. Note that the state is intentionally
- // limited. |kMirroring| or |kRemoting| means that we are either starting
+ // limited. |kStreaming| or |kRemoting| means that we are either starting
// a negotiation or actively sending to a receiver.
State state_ = State::kIdle;
@@ -254,11 +263,6 @@ class SenderSession final {
// senders used for this session. Either or both may be nullptr.
std::unique_ptr<Sender> current_audio_sender_;
std::unique_ptr<Sender> current_video_sender_;
-
- // If remoting, we store the RpcMessenger used by the embedder to send RPC
- // messages from the remoting protobuf specification. For more information,
- // see //cast/streaming/remoting.proto.
- std::unique_ptr<RpcMessenger> rpc_messenger_;
}; // namespace cast
} // namespace cast
diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc
index 36b1bae8..bfe4cfab 100644
--- a/cast/streaming/sender_session_unittest.cc
+++ b/cast/streaming/sender_session_unittest.cc
@@ -545,8 +545,9 @@ TEST_F(SenderSessionTest, SuccessfulRemotingNegotiationYieldsValidObject) {
// The messenger is tested elsewhere, but we can sanity check that we got a valid
// one here.
- EXPECT_TRUE(negotiation.messenger);
- const RpcMessenger::Handle handle = negotiation.messenger->GetUniqueHandle();
+ EXPECT_TRUE(session_->rpc_messenger());
+ const RpcMessenger::Handle handle =
+ session_->rpc_messenger()->GetUniqueHandle();
EXPECT_NE(RpcMessenger::kInvalidHandle, handle);
}
diff --git a/cast/streaming/session_messenger.cc b/cast/streaming/session_messenger.cc
index 34c4c02f..389d87bb 100644
--- a/cast/streaming/session_messenger.cc
+++ b/cast/streaming/session_messenger.cc
@@ -146,13 +146,6 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
return;
}
- int sequence_number;
- if (!json::TryParseInt(message_body.value()[kSequenceNumber],
- &sequence_number)) {
- OSP_DLOG_WARN << "Received a message without a sequence number";
- return;
- }
-
// If the message is valid JSON and we don't understand it, there are two
// options: (1) it's an unknown type, or (2) the receiver filled out the
// message incorrectly. In the first case we can drop it, it's likely just
@@ -173,6 +166,13 @@ void SenderSessionMessenger::OnMessage(const std::string& source_id,
OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
}
} else {
+ int sequence_number;
+ if (!json::TryParseInt(message_body.value()[kSequenceNumber],
+ &sequence_number)) {
+ OSP_DLOG_WARN << "Received a message without a sequence number";
+ return;
+ }
+
auto it = awaiting_replies_.find(sequence_number);
if (it == awaiting_replies_.end()) {
OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "