aboutsummaryrefslogtreecommitdiff
path: root/cast
diff options
context:
space:
mode:
Diffstat (limited to 'cast')
-rw-r--r--cast/streaming/BUILD.gn1
-rw-r--r--cast/streaming/environment.cc29
-rw-r--r--cast/streaming/environment.h51
-rw-r--r--cast/streaming/receiver_session.cc134
-rw-r--r--cast/streaming/receiver_session.h31
-rw-r--r--cast/streaming/receiver_session_unittest.cc76
-rw-r--r--cast/streaming/receiver_unittest.cc6
-rw-r--r--cast/streaming/sender_packet_router_unittest.cc5
-rw-r--r--cast/streaming/sender_unittest.cc5
-rw-r--r--cast/streaming/testing/simple_socket_subscriber.h22
10 files changed, 287 insertions, 73 deletions
diff --git a/cast/streaming/BUILD.gn b/cast/streaming/BUILD.gn
index e8eeee22..f9a7b81f 100644
--- a/cast/streaming/BUILD.gn
+++ b/cast/streaming/BUILD.gn
@@ -134,6 +134,7 @@ source_set("test_helpers") {
sources = [
"testing/message_pipe.h",
"testing/simple_message_port.h",
+ "testing/simple_socket_subscriber.h",
]
deps = [
diff --git a/cast/streaming/environment.cc b/cast/streaming/environment.cc
index c3e7bea0..74897a72 100644
--- a/cast/streaming/environment.cc
+++ b/cast/streaming/environment.cc
@@ -4,6 +4,7 @@
#include "cast/streaming/environment.h"
+#include <algorithm>
#include <utility>
#include "cast/streaming/rtp_defines.h"
@@ -39,6 +40,10 @@ IPEndpoint Environment::GetBoundLocalEndpoint() const {
return IPEndpoint{};
}
+void Environment::SetSocketSubscriber(SocketSubscriber* subscriber) {
+ socket_subscriber_ = subscriber;
+}
+
void Environment::ConsumeIncomingPackets(PacketConsumer* packet_consumer) {
OSP_DCHECK(packet_consumer);
OSP_DCHECK(!packet_consumer_);
@@ -74,7 +79,17 @@ void Environment::SendPacket(absl::Span<const uint8_t> packet) {
Environment::PacketConsumer::~PacketConsumer() = default;
+void Environment::OnBound(UdpSocket* socket) {
+ OSP_DCHECK(socket == socket_.get());
+ state_ = SocketState::kReady;
+
+ if (socket_subscriber_) {
+ socket_subscriber_->OnSocketReady();
+ }
+}
+
void Environment::OnError(UdpSocket* socket, Error error) {
+ OSP_DCHECK(socket == socket_.get());
// Usually OnError() is only called for non-recoverable Errors. However,
// OnSendError() and OnRead() delegate to this method, to handle their hard
// error cases as well. So, return early here if |error| is recoverable.
@@ -82,14 +97,14 @@ void Environment::OnError(UdpSocket* socket, Error error) {
return;
}
- if (socket_error_handler_) {
- socket_error_handler_(error);
- return;
+ state_ = SocketState::kInvalid;
+ if (socket_subscriber_) {
+ socket_subscriber_->OnSocketInvalid(error);
+ } else {
+ // Default behavior when there are no subscribers.
+ OSP_LOG_ERROR << "For UDP socket bound to " << socket_->GetLocalEndpoint()
+ << ": " << error;
}
-
- // Default behavior when no error handler is set.
- OSP_LOG_ERROR << "For UDP socket bound to " << socket_->GetLocalEndpoint()
- << ": " << error;
}
void Environment::OnSendError(UdpSocket* socket, Error error) {
diff --git a/cast/streaming/environment.h b/cast/streaming/environment.h
index 0ab9a399..606f408f 100644
--- a/cast/streaming/environment.h
+++ b/cast/streaming/environment.h
@@ -33,6 +33,36 @@ class Environment : public UdpSocket::Client {
virtual ~PacketConsumer();
};
+ // Consumers of the environment's UDP socket should be careful to check the
+ // socket's state before accessing its methods, especially
+ // GetBoundLocalEndpoint(). If the environment is |kStarting|, the
+ // local endpoint may not be set yet and will be zero initialized.
+ enum class SocketState {
+ // Socket is still initializing. Usually the UDP socket bind is
+ // the last piece.
+ kStarting,
+
+ // The socket is ready for use and has been bound.
+ kReady,
+
+ // The socket is either closed (normally or due to an error) or in an
+ // invalid state. Currently the environment does not create a new socket
+ // in this case, so to be used again the environment itself needs to be
+ // recreated.
+ kInvalid
+ };
+
+ // Classes concerned with the Environment's UDP socket state may inherit from
+ // |Subscriber| and then |Subscribe|.
+ class SocketSubscriber {
+ public:
+ // Event that occurs when the environment is ready for use.
+ virtual void OnSocketReady() = 0;
+
+ // Event that occurs when the environment has experienced a fatal error.
+ virtual void OnSocketInvalid(Error error) = 0;
+ };
+
// Construct with the given clock source and TaskRunner. Creates and
// internally-owns a UdpSocket, and immediately binds it to the given
// |local_endpoint|. If embedders do not care what interface/address the UDP
@@ -54,12 +84,6 @@ class Environment : public UdpSocket::Client {
// is a bound socket.
virtual IPEndpoint GetBoundLocalEndpoint() const;
- // Set a handler function to run whenever non-recoverable socket errors occur.
- // If never set, the default is to emit log messages at error priority.
- void set_socket_error_handler(std::function<void(Error)> handler) {
- socket_error_handler_ = handler;
- }
-
// Get/Set the remote endpoint. This is separate from the constructor because
// the remote endpoint is, in some cases, discovered only after receiving a
// packet.
@@ -68,6 +92,15 @@ class Environment : public UdpSocket::Client {
remote_endpoint_ = endpoint;
}
+ // Returns the current state of the UDP socket. This method is virtual
+ // to allow tests to simulate socket state.
+ SocketState socket_state() const { return state_; }
+ void set_socket_state_for_testing(SocketState state) { state_ = state; }
+
+ // Subscribe to socket changes. Callers can unsubscribe by passing
+ // nullptr.
+ void SetSocketSubscriber(SocketSubscriber* subscriber);
+
// Start/Resume delivery of incoming packets to the given |packet_consumer|.
// Delivery will continue until DropIncomingPackets() is called.
void ConsumeIncomingPackets(PacketConsumer* packet_consumer);
@@ -97,20 +130,22 @@ class Environment : public UdpSocket::Client {
private:
// UdpSocket::Client implementation.
+ void OnBound(UdpSocket* socket) final;
void OnError(UdpSocket* socket, Error error) final;
void OnSendError(UdpSocket* socket, Error error) final;
void OnRead(UdpSocket* socket, ErrorOr<UdpPacket> packet_or_error) final;
-
// The UDP socket bound to the local endpoint that was passed into the
// constructor, or null if socket creation failed.
const std::unique_ptr<UdpSocket> socket_;
// These are externally set/cleared. Behaviors are described in getter/setter
// method comments above.
- std::function<void(Error)> socket_error_handler_;
+ IPEndpoint local_endpoint_{};
IPEndpoint remote_endpoint_{};
PacketConsumer* packet_consumer_ = nullptr;
+ SocketState state_ = SocketState::kStarting;
+ SocketSubscriber* socket_subscriber_ = nullptr;
};
} // namespace cast
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index 70abe7aa..4e976e01 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -31,14 +31,15 @@ using ConfiguredReceivers = ReceiverSession::ConfiguredReceivers;
namespace {
template <typename Stream, typename Codec>
-const Stream* SelectStream(const std::vector<Codec>& preferred_codecs,
- const std::vector<Stream>& offered_streams) {
+std::unique_ptr<Stream> SelectStream(
+ const std::vector<Codec>& preferred_codecs,
+ const std::vector<Stream>& offered_streams) {
for (auto codec : preferred_codecs) {
for (const Stream& offered_stream : offered_streams) {
if (offered_stream.codec == codec) {
OSP_DVLOG << "Selected " << CodecToString(codec)
<< " as codec for streaming";
- return &offered_stream;
+ return std::make_unique<Stream>(offered_stream);
}
}
}
@@ -91,12 +92,36 @@ ReceiverSession::ReceiverSession(Client* const client,
messager_.SetHandler(
SenderMessage::Type::kOffer,
[this](SenderMessage message) { OnOffer(std::move(message)); });
+ environment_->SetSocketSubscriber(this);
}
ReceiverSession::~ReceiverSession() {
ResetReceivers(Client::kEndOfSession);
}
+void ReceiverSession::OnSocketReady() {
+ if (pending_session_) {
+ InitializeSession(*pending_session_);
+ pending_session_.reset();
+ }
+}
+
+void ReceiverSession::OnSocketInvalid(Error error) {
+ if (pending_session_) {
+ SendErrorAnswerReply(pending_session_->sequence_number,
+ "Failed to bind UDP socket");
+ pending_session_.reset();
+ }
+
+ client_->OnError(this,
+ Error(Error::Code::kSocketFailure,
+ "The environment is invalid and should be replaced."));
+}
+
+bool ReceiverSession::SessionProperties::IsValid() const {
+ return (selected_audio || selected_video) && sequence_number >= 0;
+}
+
void ReceiverSession::OnOffer(SenderMessage message) {
// We just drop offers we can't respond to. Note that libcast senders will
// always send a strictly positive sequence numbers, but zero is permitted
@@ -115,41 +140,62 @@ void ReceiverSession::OnOffer(SenderMessage message) {
return;
}
+ auto properties = std::make_unique<SessionProperties>();
+ properties->sequence_number = message.sequence_number;
+
const Offer& offer = absl::get<Offer>(message.body);
- const AudioStream* selected_audio_stream = nullptr;
if (!offer.audio_streams.empty() && !preferences_.audio_codecs.empty()) {
- selected_audio_stream =
+ properties->selected_audio =
SelectStream(preferences_.audio_codecs, offer.audio_streams);
}
- const VideoStream* selected_video_stream = nullptr;
if (!offer.video_streams.empty() && !preferences_.video_codecs.empty()) {
- selected_video_stream =
+ properties->selected_video =
SelectStream(preferences_.video_codecs, offer.video_streams);
}
- if (!selected_audio_stream && !selected_video_stream) {
+ if (!properties->IsValid()) {
SendErrorAnswerReply(message.sequence_number,
"Failed to select any streams from OFFER");
return;
}
- Answer answer = ConstructAnswer(selected_audio_stream, selected_video_stream);
+ switch (environment_->socket_state()) {
+ // If the environment is ready or in a bad state, we can respond
+ // immediately.
+ case Environment::SocketState::kInvalid:
+ SendErrorAnswerReply(message.sequence_number,
+ "UDP socket is closed, likely due to a bind error.");
+ break;
+
+ case Environment::SocketState::kReady:
+ InitializeSession(*properties);
+ break;
+
+ // Else we need to store the properties we just created until we get a
+ // ready or error event.
+ case Environment::SocketState::kStarting:
+ pending_session_ = std::move(properties);
+ break;
+ }
+}
+
+void ReceiverSession::InitializeSession(const SessionProperties& properties) {
+ Answer answer = ConstructAnswer(properties);
if (!answer.IsValid()) {
// If the answer message is invalid, there is no point in setting up a
// negotiation because the sender won't be able to connect to it.
- SendErrorAnswerReply(message.sequence_number,
+ SendErrorAnswerReply(properties.sequence_number,
"Failed to construct an ANSWER message");
return;
}
// Only spawn receivers if we know we have a valid answer message.
- ConfiguredReceivers receivers =
- SpawnReceivers(selected_audio_stream, selected_video_stream);
+ ConfiguredReceivers receivers = SpawnReceivers(properties);
client_->OnNegotiated(this, std::move(receivers));
- const Error result = messager_.SendMessage(
- ReceiverMessage{ReceiverMessage::Type::kAnswer, message.sequence_number,
- true /* valid */, std::move(answer)});
+ const Error result = messager_.SendMessage(ReceiverMessage{
+ ReceiverMessage::Type::kAnswer, properties.sequence_number,
+ true /* valid */, std::move(answer)});
if (!result.ok()) {
client_->OnError(this, std::move(result));
}
@@ -166,32 +212,38 @@ std::unique_ptr<Receiver> ReceiverSession::ConstructReceiver(
std::move(config));
}
-ConfiguredReceivers ReceiverSession::SpawnReceivers(const AudioStream* audio,
- const VideoStream* video) {
- OSP_DCHECK(audio || video);
+ConfiguredReceivers ReceiverSession::SpawnReceivers(
+ const SessionProperties& properties) {
+ OSP_DCHECK(properties.IsValid());
ResetReceivers(Client::kRenegotiated);
AudioCaptureConfig audio_config;
- if (audio) {
- current_audio_receiver_ = ConstructReceiver(audio->stream);
- audio_config = AudioCaptureConfig{
- audio->codec, audio->stream.channels, audio->bit_rate,
- audio->stream.rtp_timebase, audio->stream.target_delay};
+ if (properties.selected_audio) {
+ current_audio_receiver_ =
+ ConstructReceiver(properties.selected_audio->stream);
+ audio_config =
+ AudioCaptureConfig{properties.selected_audio->codec,
+ properties.selected_audio->stream.channels,
+ properties.selected_audio->bit_rate,
+ properties.selected_audio->stream.rtp_timebase,
+ properties.selected_audio->stream.target_delay};
}
VideoCaptureConfig video_config;
- if (video) {
- current_video_receiver_ = ConstructReceiver(video->stream);
+ if (properties.selected_video) {
+ current_video_receiver_ =
+ ConstructReceiver(properties.selected_video->stream);
std::vector<DisplayResolution> display_resolutions;
- std::transform(video->resolutions.begin(), video->resolutions.end(),
+ std::transform(properties.selected_video->resolutions.begin(),
+ properties.selected_video->resolutions.end(),
std::back_inserter(display_resolutions),
ToDisplayResolution);
- video_config =
- VideoCaptureConfig{video->codec,
- FrameRate{video->max_frame_rate.numerator,
- video->max_frame_rate.denominator},
- video->max_bit_rate, std::move(display_resolutions),
- video->stream.target_delay};
+ video_config = VideoCaptureConfig{
+ properties.selected_video->codec,
+ FrameRate{properties.selected_video->max_frame_rate.numerator,
+ properties.selected_video->max_frame_rate.denominator},
+ properties.selected_video->max_bit_rate, std::move(display_resolutions),
+ properties.selected_video->stream.target_delay};
}
return ConfiguredReceivers{
@@ -207,21 +259,19 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
}
}
-Answer ReceiverSession::ConstructAnswer(
- const AudioStream* selected_audio_stream,
- const VideoStream* selected_video_stream) {
- OSP_DCHECK(selected_audio_stream || selected_video_stream);
+Answer ReceiverSession::ConstructAnswer(const SessionProperties& properties) {
+ OSP_DCHECK(properties.IsValid());
std::vector<int> stream_indexes;
std::vector<Ssrc> stream_ssrcs;
- if (selected_audio_stream) {
- stream_indexes.push_back(selected_audio_stream->stream.index);
- stream_ssrcs.push_back(selected_audio_stream->stream.ssrc + 1);
+ if (properties.selected_audio) {
+ stream_indexes.push_back(properties.selected_audio->stream.index);
+ stream_ssrcs.push_back(properties.selected_audio->stream.ssrc + 1);
}
- if (selected_video_stream) {
- stream_indexes.push_back(selected_video_stream->stream.index);
- stream_ssrcs.push_back(selected_video_stream->stream.ssrc + 1);
+ if (properties.selected_video) {
+ stream_indexes.push_back(properties.selected_video->stream.index);
+ stream_ssrcs.push_back(properties.selected_video->stream.ssrc + 1);
}
absl::optional<Constraints> constraints;
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index b448bd00..b29b6d52 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -26,7 +26,7 @@ namespace cast {
class Environment;
class Receiver;
-class ReceiverSession final {
+class ReceiverSession final : public Environment::SocketSubscriber {
public:
// Upon successful negotiation, a set of configured receivers is constructed
// for handling audio and video. Note that either receiver may be null.
@@ -115,20 +115,37 @@ class ReceiverSession final {
const std::string& session_id() const { return session_id_; }
+ // Environment::SocketSubscriber event callbacks.
+ void OnSocketReady() override;
+ void OnSocketInvalid(Error error) override;
+
private:
+ struct SessionProperties {
+ std::unique_ptr<AudioStream> selected_audio;
+ std::unique_ptr<VideoStream> selected_video;
+ int sequence_number;
+
+ // To be valid either the audio or video must be selected, and we must
+ // have a sequence number we can reference.
+ bool IsValid() const;
+ };
+
// Specific message type handler methods.
void OnOffer(SenderMessage message);
+ // Creates receivers and sends an appropriate Answer message using the
+ // session properties.
+ void InitializeSession(const SessionProperties& properties);
+
// Used by SpawnReceivers to generate a receiver for a specific stream.
std::unique_ptr<Receiver> ConstructReceiver(const Stream& stream);
// Creates a set of configured receivers from a given pair of audio and
// video streams. NOTE: either audio or video may be null, but not both.
- ConfiguredReceivers SpawnReceivers(const AudioStream* audio,
- const VideoStream* video);
+ ConfiguredReceivers SpawnReceivers(const SessionProperties& properties);
// Callers of this method should ensure at least one stream is non-null.
- Answer ConstructAnswer(const AudioStream* audio, const VideoStream* video);
+ Answer ConstructAnswer(const SessionProperties& properties);
// Handles resetting receivers and notifying the client.
void ResetReceivers(Client::ReceiversDestroyingReason reason);
@@ -143,6 +160,12 @@ class ReceiverSession final {
const std::string session_id_;
ReceiverSessionMessager messager_;
+ // In some cases, the session initialization may be pending waiting for the
+ // UDP socket to be ready. In this case, the receivers and the answer
+ // message will not be configured and sent until the UDP socket has finished
+ // binding.
+ std::unique_ptr<SessionProperties> pending_session_;
+
bool supports_wifi_status_reporting_ = false;
ReceiverPacketRouter packet_router_;
diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc
index fd2c48f6..afbc556e 100644
--- a/cast/streaming/receiver_session_unittest.cc
+++ b/cast/streaming/receiver_session_unittest.cc
@@ -282,8 +282,9 @@ class ReceiverSessionTest : public ::testing::Test {
auto environment_ = std::make_unique<NiceMock<MockEnvironment>>(
&FakeClock::now, &task_runner_);
ON_CALL(*environment_, GetBoundLocalEndpoint())
- .WillByDefault(
- Return(IPEndpoint{IPAddress::Parse("127.0.0.1").value(), 12345}));
+ .WillByDefault(Return(IPEndpoint{{127, 0, 0, 1}, 12345}));
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kReady);
return environment_;
}
@@ -609,9 +610,8 @@ TEST_F(ReceiverSessionTest, NotifiesReceiverDestruction) {
TEST_F(ReceiverSessionTest, HandlesInvalidAnswer) {
// Simulate an unbound local endpoint.
- EXPECT_CALL(*environment_, GetBoundLocalEndpoint).WillOnce([]() {
- return IPEndpoint{};
- });
+ EXPECT_CALL(*environment_, GetBoundLocalEndpoint)
+ .WillOnce(Return(IPEndpoint{}));
message_port_->ReceiveMessage(kValidOfferMessage);
const auto& messages = message_port_->posted_messages();
@@ -624,5 +624,71 @@ TEST_F(ReceiverSessionTest, HandlesInvalidAnswer) {
EXPECT_EQ("ANSWER", answer["type"].asString());
EXPECT_EQ("error", answer["result"].asString());
}
+
+TEST_F(ReceiverSessionTest, DelaysAnswerUntilEnvironmentIsReady) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kStarting);
+
+ // We should not have sent an answer yet--the UDP socket is not ready.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ ASSERT_TRUE(message_port_->posted_messages().empty());
+
+ // Simulate the environment calling back into us with the socket being ready.
+ // state() will not be called again--we just need to get the bind event.
+ EXPECT_CALL(*environment_, GetBoundLocalEndpoint())
+ .WillOnce(Return(IPEndpoint{{10, 0, 0, 2}, 4567}));
+ EXPECT_CALL(client_, OnNegotiated(session_.get(), _));
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+ session_->OnSocketReady();
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ // We should have set the UDP port based on the ready socket value.
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ const Json::Value& answer_body = message_body.value()["answer"];
+ EXPECT_TRUE(answer_body.isObject());
+ EXPECT_EQ(4567, answer_body["udpPort"].asInt());
+}
+
+TEST_F(ReceiverSessionTest,
+ ReturnsErrorAnswerIfEnvironmentIsAlreadyInvalidated) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kInvalid);
+
+ // If the environment is already in a bad state, we can respond immediately.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ EXPECT_EQ("ANSWER", message_body.value()["type"].asString());
+ EXPECT_EQ("error", message_body.value()["result"].asString());
+}
+
+TEST_F(ReceiverSessionTest, ReturnsErrorAnswerIfEnvironmentIsInvalidated) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kStarting);
+
+ // We should not have sent an answer yet--the environment is not ready.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ ASSERT_TRUE(message_port_->posted_messages().empty());
+
+ // Simulate the environment calling back into us with invalidation.
+ EXPECT_CALL(client_, OnError(_, _)).Times(1);
+ session_->OnSocketInvalid(Error::Code::kSocketBindFailure);
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ // We should have an error answer.
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ EXPECT_EQ("ANSWER", message_body.value()["type"].asString());
+ EXPECT_EQ("error", message_body.value()["result"].asString());
+}
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/receiver_unittest.cc b/cast/streaming/receiver_unittest.cc
index 0a6817e6..86fd95ff 100644
--- a/cast/streaming/receiver_unittest.cc
+++ b/cast/streaming/receiver_unittest.cc
@@ -26,6 +26,7 @@
#include "cast/streaming/sender_report_builder.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/ssrc.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/api/time.h"
@@ -249,7 +250,6 @@ class MockSender : public CompoundRtcpParser::Client {
CompoundRtcpParser rtcp_parser_;
FrameCrypto crypto_;
RtpPacketizer rtp_packetizer_;
-
FrameId max_feedback_frame_id_ = FrameId::first() + kMaxUnackedFrames;
EncryptedFrame frame_being_sent_;
@@ -278,8 +278,7 @@ class ReceiverTest : public testing::Test {
/* .aes_iv_mask = */ kCastIvMask,
/* .is_pli_enabled = */ true}),
sender_(&task_runner_, &env_) {
- env_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ env_.SetSocketSubscriber(&socket_subscriber_);
ON_CALL(env_, SendPacket(_))
.WillByDefault(Invoke([this](absl::Span<const uint8_t> packet) {
task_runner_.PostTaskWithDelay(
@@ -360,6 +359,7 @@ class ReceiverTest : public testing::Test {
Receiver receiver_;
testing::NiceMock<MockSender> sender_;
testing::NiceMock<MockConsumer> consumer_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the Receiver processes RTCP packets correctly and sends RTCP
diff --git a/cast/streaming/sender_packet_router_unittest.cc b/cast/streaming/sender_packet_router_unittest.cc
index 3c1d1f6a..13377a6d 100644
--- a/cast/streaming/sender_packet_router_unittest.cc
+++ b/cast/streaming/sender_packet_router_unittest.cc
@@ -8,6 +8,7 @@
#include "cast/streaming/constants.h"
#include "cast/streaming/mock_environment.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/base/ip_address.h"
@@ -155,8 +156,7 @@ class SenderPacketRouterTest : public testing::Test {
task_runner_(&clock_),
env_(&FakeClock::now, &task_runner_),
router_(&env_, kMaxPacketsPerBurst, kBurstInterval) {
- env_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ env_.SetSocketSubscriber(&socket_subscriber_);
}
~SenderPacketRouterTest() override = default;
@@ -182,6 +182,7 @@ class SenderPacketRouterTest : public testing::Test {
SenderPacketRouter router_;
testing::NiceMock<MockSender> audio_sender_;
testing::NiceMock<MockSender> video_sender_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the SenderPacketRouter is correctly configured from the specific
diff --git a/cast/streaming/sender_unittest.cc b/cast/streaming/sender_unittest.cc
index 1296e4a1..01c11f8f 100644
--- a/cast/streaming/sender_unittest.cc
+++ b/cast/streaming/sender_unittest.cc
@@ -32,6 +32,7 @@
#include "cast/streaming/sender_report_parser.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/ssrc.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/test/fake_clock.h"
@@ -350,8 +351,7 @@ class SenderTest : public testing::Test {
receiver_to_sender_pipe_(&task_runner_, &sender_packet_router_),
receiver_(&receiver_to_sender_pipe_),
sender_to_receiver_pipe_(&task_runner_, &receiver_) {
- sender_environment_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ sender_environment_.SetSocketSubscriber(&socket_subscriber_);
sender_environment_.set_remote_endpoint(
receiver_to_sender_pipe_.local_endpoint());
ON_CALL(sender_environment_, SendPacket(_))
@@ -442,6 +442,7 @@ class SenderTest : public testing::Test {
SimulatedNetworkPipe receiver_to_sender_pipe_;
NiceMock<MockReceiver> receiver_;
SimulatedNetworkPipe sender_to_receiver_pipe_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the Sender can send EncodedFrames over an ideal network (i.e., low
diff --git a/cast/streaming/testing/simple_socket_subscriber.h b/cast/streaming/testing/simple_socket_subscriber.h
new file mode 100644
index 00000000..f6208b42
--- /dev/null
+++ b/cast/streaming/testing/simple_socket_subscriber.h
@@ -0,0 +1,22 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_
+#define CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_
+
+#include "cast/streaming/environment.h"
+#include "gtest/gtest.h"
+
+namespace openscreen {
+namespace cast {
+
+class SimpleSubscriber : public Environment::SocketSubscriber {
+ void OnSocketReady() {}
+ void OnSocketInvalid(Error error) { ASSERT_TRUE(error.ok()) << error; }
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_