// Copyright 2020 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "cast/standalone_sender/looping_file_cast_agent.h" #include #include #include #include "cast/common/channel/message_util.h" #include "cast/standalone_sender/looping_file_sender.h" #include "cast/streaming/capture_recommendations.h" #include "cast/streaming/constants.h" #include "cast/streaming/offer_messages.h" #include "json/value.h" #include "platform/api/tls_connection_factory.h" #include "util/json/json_helpers.h" #include "util/stringprintf.h" #include "util/trace_logging.h" namespace openscreen { namespace cast { namespace { using DeviceMediaPolicy = SenderSocketFactory::DeviceMediaPolicy; } // namespace LoopingFileCastAgent::LoopingFileCastAgent(TaskRunner* task_runner, ShutdownCallback shutdown_callback) : task_runner_(task_runner), shutdown_callback_(std::move(shutdown_callback)), connection_handler_(&router_, this), socket_factory_(this, task_runner_), connection_factory_( TlsConnectionFactory::CreateFactory(&socket_factory_, task_runner_)), message_port_(&router_) { router_.AddHandlerForLocalId(kPlatformSenderId, this); socket_factory_.set_factory(connection_factory_.get()); } LoopingFileCastAgent::~LoopingFileCastAgent() { Shutdown(); } void LoopingFileCastAgent::Connect(ConnectionSettings settings) { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender); OSP_DCHECK(!connection_settings_); connection_settings_ = std::move(settings); const auto policy = connection_settings_->should_include_video ? DeviceMediaPolicy::kIncludesVideo : DeviceMediaPolicy::kAudioOnly; task_runner_->PostTask([this, policy] { wake_lock_ = ScopedWakeLock::Create(task_runner_); socket_factory_.Connect(connection_settings_->receiver_endpoint, policy, &router_); }); } void LoopingFileCastAgent::OnConnected(SenderSocketFactory* factory, const IPEndpoint& endpoint, std::unique_ptr socket) { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender); if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) { OSP_LOG_WARN << "Already connected, dropping peer at: " << endpoint; return; } message_port_.SetSocket(socket->GetWeakPtr()); router_.TakeSocket(this, std::move(socket)); OSP_LOG_INFO << "Launching Mirroring App on the Cast Receiver..."; static constexpr char kLaunchMessageTemplate[] = R"({"type":"LAUNCH", "requestId":%d, "appId":"%s"})"; router_.Send(VirtualConnection{kPlatformSenderId, kPlatformReceiverId, message_port_.GetSocketId()}, MakeSimpleUTF8Message( kReceiverNamespace, StringPrintf(kLaunchMessageTemplate, next_request_id_++, GetMirroringAppId()))); } void LoopingFileCastAgent::OnError(SenderSocketFactory* factory, const IPEndpoint& endpoint, Error error) { OSP_LOG_ERROR << "Cast agent received socket factory error: " << error; Shutdown(); } void LoopingFileCastAgent::OnClose(CastSocket* cast_socket) { OSP_VLOG << "Cast agent socket closed."; Shutdown(); } void LoopingFileCastAgent::OnError(CastSocket* socket, Error error) { OSP_LOG_ERROR << "Cast agent received socket error: " << error; Shutdown(); } bool LoopingFileCastAgent::IsConnectionAllowed( const VirtualConnection& virtual_conn) const { return true; } void LoopingFileCastAgent::OnMessage(VirtualConnectionRouter* router, CastSocket* socket, ::cast::channel::CastMessage message) { if (message_port_.GetSocketId() == ToCastSocketId(socket) && !message_port_.client_sender_id().empty() && message_port_.client_sender_id() == message.destination_id()) { OSP_DCHECK(message_port_.client_sender_id() != kPlatformSenderId); message_port_.OnMessage(router, socket, std::move(message)); return; } if (message.destination_id() != kPlatformSenderId && message.destination_id() != kBroadcastId) { return; // Message not for us. } if (message.namespace_() == kReceiverNamespace && message_port_.GetSocketId() == ToCastSocketId(socket)) { const ErrorOr payload = json::Parse(message.payload_utf8()); if (payload.is_error()) { OSP_LOG_ERROR << "Failed to parse message: " << payload.error(); } if (HasType(payload.value(), CastMessageType::kReceiverStatus)) { HandleReceiverStatus(payload.value()); } else if (HasType(payload.value(), CastMessageType::kLaunchError)) { std::string reason; if (!json::TryParseString(payload.value()[kMessageKeyReason], &reason)) { reason = "UNKNOWN"; } OSP_LOG_ERROR << "Failed to launch the Cast Mirroring App on the Receiver! Reason: " << reason; Shutdown(); } else if (HasType(payload.value(), CastMessageType::kInvalidRequest)) { std::string reason; if (!json::TryParseString(payload.value()[kMessageKeyReason], &reason)) { reason = "UNKNOWN"; } OSP_LOG_ERROR << "Cast Receiver thinks our request is invalid: " << reason; } } } const char* LoopingFileCastAgent::GetMirroringAppId() const { if (connection_settings_ && !connection_settings_->should_include_video) { return kMirroringAudioOnlyAppId; } return kMirroringAppId; } void LoopingFileCastAgent::HandleReceiverStatus(const Json::Value& status) { const Json::Value& details = (status[kMessageKeyStatus].isObject() && status[kMessageKeyStatus][kMessageKeyApplications].isArray()) ? status[kMessageKeyStatus][kMessageKeyApplications][0] : Json::Value(); std::string running_app_id; if (!json::TryParseString(details[kMessageKeyAppId], &running_app_id) || running_app_id != GetMirroringAppId()) { // The mirroring app is not running. If it was just stopped, Shutdown() will // tear everything down. If it has been stopped already, Shutdown() is a // no-op. Shutdown(); return; } std::string session_id; if (!json::TryParseString(details[kMessageKeySessionId], &session_id) || session_id.empty()) { OSP_LOG_ERROR << "Cannot continue: Cast Receiver did not provide a session ID for " "the Mirroring App running on it."; Shutdown(); return; } if (app_session_id_ != session_id) { if (app_session_id_.empty()) { app_session_id_ = session_id; } else { OSP_LOG_ERROR << "Cannot continue: Different Mirroring App session is " "now running on the Cast Receiver."; Shutdown(); return; } } if (remote_connection_) { // The mirroring app is running and this LoopingFileCastAgent is already // streaming to it (or is awaiting message routing to be established). There // are no additional actions to be taken in response to this extra // RECEIVER_STATUS message. return; } std::string message_destination_id; if (!json::TryParseString(details[kMessageKeyTransportId], &message_destination_id) || message_destination_id.empty()) { OSP_LOG_ERROR << "Cannot continue: Cast Receiver did not provide a transport ID for " "routing messages to the Mirroring App running on it."; Shutdown(); return; } remote_connection_.emplace( VirtualConnection{MakeUniqueSessionId("streaming_sender"), message_destination_id, message_port_.GetSocketId()}); OSP_LOG_INFO << "Starting-up message routing to the Cast Receiver's " "Mirroring App (sessionId=" << app_session_id_ << ")..."; connection_handler_.OpenRemoteConnection( *remote_connection_, [this](bool success) { OnRemoteMessagingOpened(success); }); } void LoopingFileCastAgent::OnRemoteMessagingOpened(bool success) { if (!remote_connection_) { return; // Shutdown() was called in the meantime. } if (success) { OSP_LOG_INFO << "Starting streaming session..."; CreateAndStartSession(); } else { OSP_LOG_INFO << "Failed to establish messaging to the Cast Receiver's " "Mirroring App. Perhaps another Cast Sender is using it?"; Shutdown(); } } void LoopingFileCastAgent::CreateAndStartSession() { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender); OSP_DCHECK(remote_connection_.has_value()); environment_ = std::make_unique(&Clock::now, task_runner_, IPEndpoint{}); SenderSession::Configuration config{ connection_settings_->receiver_endpoint.address, this, environment_.get(), &message_port_, remote_connection_->local_id, remote_connection_->peer_id, connection_settings_->use_android_rtp_hack}; current_session_ = std::make_unique(std::move(config)); OSP_DCHECK(!message_port_.client_sender_id().empty()); AudioCaptureConfig audio_config; // Opus does best at 192kbps, so we cap that here. audio_config.bit_rate = 192 * 1000; VideoCaptureConfig video_config = { .codec = connection_settings_->codec, // The video config is allowed to use whatever is left over after audio. .max_bit_rate = connection_settings_->max_bitrate - audio_config.bit_rate}; // Use default display resolution of 1080P. video_config.resolutions.emplace_back(Resolution{1920, 1080}); OSP_VLOG << "Starting session negotiation."; Error negotiation_error; if (connection_settings_->use_remoting) { remoting_sender_ = std::make_unique( current_session_->rpc_messenger(), AudioCodec::kOpus, connection_settings_->codec, [this]() { OnRemotingReceiverReady(); }); negotiation_error = current_session_->NegotiateRemoting(audio_config, video_config); } else { negotiation_error = current_session_->Negotiate({audio_config}, {video_config}); } if (!negotiation_error.ok()) { OSP_LOG_ERROR << "Failed to negotiate a session: " << negotiation_error; } } void LoopingFileCastAgent::OnNegotiated( const SenderSession* session, SenderSession::ConfiguredSenders senders, capture_recommendations::Recommendations capture_recommendations) { if (senders.audio_sender == nullptr || senders.video_sender == nullptr) { OSP_LOG_ERROR << "Missing both audio and video, so exiting..."; return; } file_sender_ = std::make_unique( environment_.get(), connection_settings_.value(), session, std::move(senders), [this]() { shutdown_callback_(); }); } void LoopingFileCastAgent::OnRemotingNegotiated( const SenderSession* session, SenderSession::RemotingNegotiation negotiation) { if (negotiation.senders.audio_sender == nullptr && negotiation.senders.video_sender == nullptr) { OSP_LOG_ERROR << "Missing both audio and video, so exiting..."; return; } current_negotiation_ = std::make_unique(negotiation); if (is_ready_for_remoting_) { StartRemotingSenders(); } } void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) { OSP_LOG_ERROR << "SenderSession fatal error: " << error; Shutdown(); } void LoopingFileCastAgent::OnRemotingReceiverReady() { is_ready_for_remoting_ = true; if (current_negotiation_) { StartRemotingSenders(); } } void LoopingFileCastAgent::StartRemotingSenders() { OSP_DCHECK(current_negotiation_); file_sender_ = std::make_unique( environment_.get(), connection_settings_.value(), current_session_.get(), std::move(current_negotiation_->senders), [this]() { shutdown_callback_(); }); current_negotiation_.reset(); is_ready_for_remoting_ = false; } void LoopingFileCastAgent::Shutdown() { TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender); file_sender_.reset(); if (current_session_) { OSP_LOG_INFO << "Stopping mirroring session..."; current_session_.reset(); } OSP_DCHECK(message_port_.client_sender_id().empty()); environment_.reset(); if (remote_connection_) { const VirtualConnection connection = *remote_connection_; // Reset |remote_connection_| because ConnectionNamespaceHandler may // call-back into OnRemoteMessagingOpened(). remote_connection_.reset(); connection_handler_.CloseRemoteConnection(connection); } if (!app_session_id_.empty()) { OSP_LOG_INFO << "Stopping the Cast Receiver's Mirroring App..."; static constexpr char kStopMessageTemplate[] = R"({"type":"STOP", "requestId":%d, "sessionId":"%s"})"; std::string stop_json = StringPrintf( kStopMessageTemplate, next_request_id_++, app_session_id_.c_str()); router_.Send( VirtualConnection{kPlatformSenderId, kPlatformReceiverId, message_port_.GetSocketId()}, MakeSimpleUTF8Message(kReceiverNamespace, std::move(stop_json))); app_session_id_.clear(); } if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) { router_.CloseSocket(message_port_.GetSocketId()); message_port_.SetSocket({}); } wake_lock_.reset(); if (shutdown_callback_) { const ShutdownCallback callback = std::move(shutdown_callback_); callback(); } } } // namespace cast } // namespace openscreen