aboutsummaryrefslogtreecommitdiff
path: root/cast
diff options
context:
space:
mode:
authorJordan Bayles <jophba@chromium.org>2021-02-01 12:07:16 -0800
committerCommit Bot <commit-bot@chromium.org>2021-02-01 21:25:33 +0000
commit8f0e0430b9a536f1d56683d4cf3ab5bcf70ab1d1 (patch)
tree66f3fff7aebb574c433721a176718bb023a482bc /cast
parentc4d2832da77908dc1802d78081c388ce5aa33b9f (diff)
downloadopenscreen-8f0e0430b9a536f1d56683d4cf3ab5bcf70ab1d1.tar.gz
Fix UDP bind timing issue
This patch fixes a potential timing issue where we attempt to send an Answer message before the UDP socket is bound. The current behavior is that we construct an Answer with a UDP port value of 0, which results in the Answer failing validation and an error Answer message being sent back to the sender. The new behavior is that the Answer message is not sent until the environment is notified that the UDP socket has been bound. This is accomplished by an addition to the UDPSocket::Client abstract class, a new OnBound event. Bug: b/174501332 Change-Id: I4bcebe78d8d41c29bb4b4698c5565f0dd9ef2c17 Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2645893 Commit-Queue: Jordan Bayles <jophba@chromium.org> Reviewed-by: mark a. foltz <mfoltz@chromium.org> Reviewed-by: Jordan Bayles <jophba@chromium.org>
Diffstat (limited to 'cast')
-rw-r--r--cast/streaming/BUILD.gn1
-rw-r--r--cast/streaming/environment.cc29
-rw-r--r--cast/streaming/environment.h51
-rw-r--r--cast/streaming/receiver_session.cc134
-rw-r--r--cast/streaming/receiver_session.h31
-rw-r--r--cast/streaming/receiver_session_unittest.cc76
-rw-r--r--cast/streaming/receiver_unittest.cc6
-rw-r--r--cast/streaming/sender_packet_router_unittest.cc5
-rw-r--r--cast/streaming/sender_unittest.cc5
-rw-r--r--cast/streaming/testing/simple_socket_subscriber.h22
10 files changed, 287 insertions, 73 deletions
diff --git a/cast/streaming/BUILD.gn b/cast/streaming/BUILD.gn
index e8eeee22..f9a7b81f 100644
--- a/cast/streaming/BUILD.gn
+++ b/cast/streaming/BUILD.gn
@@ -134,6 +134,7 @@ source_set("test_helpers") {
sources = [
"testing/message_pipe.h",
"testing/simple_message_port.h",
+ "testing/simple_socket_subscriber.h",
]
deps = [
diff --git a/cast/streaming/environment.cc b/cast/streaming/environment.cc
index c3e7bea0..74897a72 100644
--- a/cast/streaming/environment.cc
+++ b/cast/streaming/environment.cc
@@ -4,6 +4,7 @@
#include "cast/streaming/environment.h"
+#include <algorithm>
#include <utility>
#include "cast/streaming/rtp_defines.h"
@@ -39,6 +40,10 @@ IPEndpoint Environment::GetBoundLocalEndpoint() const {
return IPEndpoint{};
}
+void Environment::SetSocketSubscriber(SocketSubscriber* subscriber) {
+ socket_subscriber_ = subscriber;
+}
+
void Environment::ConsumeIncomingPackets(PacketConsumer* packet_consumer) {
OSP_DCHECK(packet_consumer);
OSP_DCHECK(!packet_consumer_);
@@ -74,7 +79,17 @@ void Environment::SendPacket(absl::Span<const uint8_t> packet) {
Environment::PacketConsumer::~PacketConsumer() = default;
+void Environment::OnBound(UdpSocket* socket) {
+ OSP_DCHECK(socket == socket_.get());
+ state_ = SocketState::kReady;
+
+ if (socket_subscriber_) {
+ socket_subscriber_->OnSocketReady();
+ }
+}
+
void Environment::OnError(UdpSocket* socket, Error error) {
+ OSP_DCHECK(socket == socket_.get());
// Usually OnError() is only called for non-recoverable Errors. However,
// OnSendError() and OnRead() delegate to this method, to handle their hard
// error cases as well. So, return early here if |error| is recoverable.
@@ -82,14 +97,14 @@ void Environment::OnError(UdpSocket* socket, Error error) {
return;
}
- if (socket_error_handler_) {
- socket_error_handler_(error);
- return;
+ state_ = SocketState::kInvalid;
+ if (socket_subscriber_) {
+ socket_subscriber_->OnSocketInvalid(error);
+ } else {
+ // Default behavior when there are no subscribers.
+ OSP_LOG_ERROR << "For UDP socket bound to " << socket_->GetLocalEndpoint()
+ << ": " << error;
}
-
- // Default behavior when no error handler is set.
- OSP_LOG_ERROR << "For UDP socket bound to " << socket_->GetLocalEndpoint()
- << ": " << error;
}
void Environment::OnSendError(UdpSocket* socket, Error error) {
diff --git a/cast/streaming/environment.h b/cast/streaming/environment.h
index 0ab9a399..606f408f 100644
--- a/cast/streaming/environment.h
+++ b/cast/streaming/environment.h
@@ -33,6 +33,36 @@ class Environment : public UdpSocket::Client {
virtual ~PacketConsumer();
};
+ // Consumers of the environment's UDP socket should be careful to check the
+ // socket's state before accessing its methods, especially
+ // GetBoundLocalEndpoint(). If the environment is |kStarting|, the
+ // local endpoint may not be set yet and will be zero initialized.
+ enum class SocketState {
+ // Socket is still initializing. Usually the UDP socket bind is
+ // the last piece.
+ kStarting,
+
+ // The socket is ready for use and has been bound.
+ kReady,
+
+ // The socket is either closed (normally or due to an error) or in an
+ // invalid state. Currently the environment does not create a new socket
+ // in this case, so to be used again the environment itself needs to be
+ // recreated.
+ kInvalid
+ };
+
+ // Classes concerned with the Environment's UDP socket state may inherit from
+ // |Subscriber| and then |Subscribe|.
+ class SocketSubscriber {
+ public:
+ // Event that occurs when the environment is ready for use.
+ virtual void OnSocketReady() = 0;
+
+ // Event that occurs when the environment has experienced a fatal error.
+ virtual void OnSocketInvalid(Error error) = 0;
+ };
+
// Construct with the given clock source and TaskRunner. Creates and
// internally-owns a UdpSocket, and immediately binds it to the given
// |local_endpoint|. If embedders do not care what interface/address the UDP
@@ -54,12 +84,6 @@ class Environment : public UdpSocket::Client {
// is a bound socket.
virtual IPEndpoint GetBoundLocalEndpoint() const;
- // Set a handler function to run whenever non-recoverable socket errors occur.
- // If never set, the default is to emit log messages at error priority.
- void set_socket_error_handler(std::function<void(Error)> handler) {
- socket_error_handler_ = handler;
- }
-
// Get/Set the remote endpoint. This is separate from the constructor because
// the remote endpoint is, in some cases, discovered only after receiving a
// packet.
@@ -68,6 +92,15 @@ class Environment : public UdpSocket::Client {
remote_endpoint_ = endpoint;
}
+ // Returns the current state of the UDP socket. This method is virtual
+ // to allow tests to simulate socket state.
+ SocketState socket_state() const { return state_; }
+ void set_socket_state_for_testing(SocketState state) { state_ = state; }
+
+ // Subscribe to socket changes. Callers can unsubscribe by passing
+ // nullptr.
+ void SetSocketSubscriber(SocketSubscriber* subscriber);
+
// Start/Resume delivery of incoming packets to the given |packet_consumer|.
// Delivery will continue until DropIncomingPackets() is called.
void ConsumeIncomingPackets(PacketConsumer* packet_consumer);
@@ -97,20 +130,22 @@ class Environment : public UdpSocket::Client {
private:
// UdpSocket::Client implementation.
+ void OnBound(UdpSocket* socket) final;
void OnError(UdpSocket* socket, Error error) final;
void OnSendError(UdpSocket* socket, Error error) final;
void OnRead(UdpSocket* socket, ErrorOr<UdpPacket> packet_or_error) final;
-
// The UDP socket bound to the local endpoint that was passed into the
// constructor, or null if socket creation failed.
const std::unique_ptr<UdpSocket> socket_;
// These are externally set/cleared. Behaviors are described in getter/setter
// method comments above.
- std::function<void(Error)> socket_error_handler_;
+ IPEndpoint local_endpoint_{};
IPEndpoint remote_endpoint_{};
PacketConsumer* packet_consumer_ = nullptr;
+ SocketState state_ = SocketState::kStarting;
+ SocketSubscriber* socket_subscriber_ = nullptr;
};
} // namespace cast
diff --git a/cast/streaming/receiver_session.cc b/cast/streaming/receiver_session.cc
index 70abe7aa..4e976e01 100644
--- a/cast/streaming/receiver_session.cc
+++ b/cast/streaming/receiver_session.cc
@@ -31,14 +31,15 @@ using ConfiguredReceivers = ReceiverSession::ConfiguredReceivers;
namespace {
template <typename Stream, typename Codec>
-const Stream* SelectStream(const std::vector<Codec>& preferred_codecs,
- const std::vector<Stream>& offered_streams) {
+std::unique_ptr<Stream> SelectStream(
+ const std::vector<Codec>& preferred_codecs,
+ const std::vector<Stream>& offered_streams) {
for (auto codec : preferred_codecs) {
for (const Stream& offered_stream : offered_streams) {
if (offered_stream.codec == codec) {
OSP_DVLOG << "Selected " << CodecToString(codec)
<< " as codec for streaming";
- return &offered_stream;
+ return std::make_unique<Stream>(offered_stream);
}
}
}
@@ -91,12 +92,36 @@ ReceiverSession::ReceiverSession(Client* const client,
messager_.SetHandler(
SenderMessage::Type::kOffer,
[this](SenderMessage message) { OnOffer(std::move(message)); });
+ environment_->SetSocketSubscriber(this);
}
ReceiverSession::~ReceiverSession() {
ResetReceivers(Client::kEndOfSession);
}
+void ReceiverSession::OnSocketReady() {
+ if (pending_session_) {
+ InitializeSession(*pending_session_);
+ pending_session_.reset();
+ }
+}
+
+void ReceiverSession::OnSocketInvalid(Error error) {
+ if (pending_session_) {
+ SendErrorAnswerReply(pending_session_->sequence_number,
+ "Failed to bind UDP socket");
+ pending_session_.reset();
+ }
+
+ client_->OnError(this,
+ Error(Error::Code::kSocketFailure,
+ "The environment is invalid and should be replaced."));
+}
+
+bool ReceiverSession::SessionProperties::IsValid() const {
+ return (selected_audio || selected_video) && sequence_number >= 0;
+}
+
void ReceiverSession::OnOffer(SenderMessage message) {
// We just drop offers we can't respond to. Note that libcast senders will
// always send a strictly positive sequence numbers, but zero is permitted
@@ -115,41 +140,62 @@ void ReceiverSession::OnOffer(SenderMessage message) {
return;
}
+ auto properties = std::make_unique<SessionProperties>();
+ properties->sequence_number = message.sequence_number;
+
const Offer& offer = absl::get<Offer>(message.body);
- const AudioStream* selected_audio_stream = nullptr;
if (!offer.audio_streams.empty() && !preferences_.audio_codecs.empty()) {
- selected_audio_stream =
+ properties->selected_audio =
SelectStream(preferences_.audio_codecs, offer.audio_streams);
}
- const VideoStream* selected_video_stream = nullptr;
if (!offer.video_streams.empty() && !preferences_.video_codecs.empty()) {
- selected_video_stream =
+ properties->selected_video =
SelectStream(preferences_.video_codecs, offer.video_streams);
}
- if (!selected_audio_stream && !selected_video_stream) {
+ if (!properties->IsValid()) {
SendErrorAnswerReply(message.sequence_number,
"Failed to select any streams from OFFER");
return;
}
- Answer answer = ConstructAnswer(selected_audio_stream, selected_video_stream);
+ switch (environment_->socket_state()) {
+ // If the environment is ready or in a bad state, we can respond
+ // immediately.
+ case Environment::SocketState::kInvalid:
+ SendErrorAnswerReply(message.sequence_number,
+ "UDP socket is closed, likely due to a bind error.");
+ break;
+
+ case Environment::SocketState::kReady:
+ InitializeSession(*properties);
+ break;
+
+ // Else we need to store the properties we just created until we get a
+ // ready or error event.
+ case Environment::SocketState::kStarting:
+ pending_session_ = std::move(properties);
+ break;
+ }
+}
+
+void ReceiverSession::InitializeSession(const SessionProperties& properties) {
+ Answer answer = ConstructAnswer(properties);
if (!answer.IsValid()) {
// If the answer message is invalid, there is no point in setting up a
// negotiation because the sender won't be able to connect to it.
- SendErrorAnswerReply(message.sequence_number,
+ SendErrorAnswerReply(properties.sequence_number,
"Failed to construct an ANSWER message");
return;
}
// Only spawn receivers if we know we have a valid answer message.
- ConfiguredReceivers receivers =
- SpawnReceivers(selected_audio_stream, selected_video_stream);
+ ConfiguredReceivers receivers = SpawnReceivers(properties);
client_->OnNegotiated(this, std::move(receivers));
- const Error result = messager_.SendMessage(
- ReceiverMessage{ReceiverMessage::Type::kAnswer, message.sequence_number,
- true /* valid */, std::move(answer)});
+ const Error result = messager_.SendMessage(ReceiverMessage{
+ ReceiverMessage::Type::kAnswer, properties.sequence_number,
+ true /* valid */, std::move(answer)});
if (!result.ok()) {
client_->OnError(this, std::move(result));
}
@@ -166,32 +212,38 @@ std::unique_ptr<Receiver> ReceiverSession::ConstructReceiver(
std::move(config));
}
-ConfiguredReceivers ReceiverSession::SpawnReceivers(const AudioStream* audio,
- const VideoStream* video) {
- OSP_DCHECK(audio || video);
+ConfiguredReceivers ReceiverSession::SpawnReceivers(
+ const SessionProperties& properties) {
+ OSP_DCHECK(properties.IsValid());
ResetReceivers(Client::kRenegotiated);
AudioCaptureConfig audio_config;
- if (audio) {
- current_audio_receiver_ = ConstructReceiver(audio->stream);
- audio_config = AudioCaptureConfig{
- audio->codec, audio->stream.channels, audio->bit_rate,
- audio->stream.rtp_timebase, audio->stream.target_delay};
+ if (properties.selected_audio) {
+ current_audio_receiver_ =
+ ConstructReceiver(properties.selected_audio->stream);
+ audio_config =
+ AudioCaptureConfig{properties.selected_audio->codec,
+ properties.selected_audio->stream.channels,
+ properties.selected_audio->bit_rate,
+ properties.selected_audio->stream.rtp_timebase,
+ properties.selected_audio->stream.target_delay};
}
VideoCaptureConfig video_config;
- if (video) {
- current_video_receiver_ = ConstructReceiver(video->stream);
+ if (properties.selected_video) {
+ current_video_receiver_ =
+ ConstructReceiver(properties.selected_video->stream);
std::vector<DisplayResolution> display_resolutions;
- std::transform(video->resolutions.begin(), video->resolutions.end(),
+ std::transform(properties.selected_video->resolutions.begin(),
+ properties.selected_video->resolutions.end(),
std::back_inserter(display_resolutions),
ToDisplayResolution);
- video_config =
- VideoCaptureConfig{video->codec,
- FrameRate{video->max_frame_rate.numerator,
- video->max_frame_rate.denominator},
- video->max_bit_rate, std::move(display_resolutions),
- video->stream.target_delay};
+ video_config = VideoCaptureConfig{
+ properties.selected_video->codec,
+ FrameRate{properties.selected_video->max_frame_rate.numerator,
+ properties.selected_video->max_frame_rate.denominator},
+ properties.selected_video->max_bit_rate, std::move(display_resolutions),
+ properties.selected_video->stream.target_delay};
}
return ConfiguredReceivers{
@@ -207,21 +259,19 @@ void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
}
}
-Answer ReceiverSession::ConstructAnswer(
- const AudioStream* selected_audio_stream,
- const VideoStream* selected_video_stream) {
- OSP_DCHECK(selected_audio_stream || selected_video_stream);
+Answer ReceiverSession::ConstructAnswer(const SessionProperties& properties) {
+ OSP_DCHECK(properties.IsValid());
std::vector<int> stream_indexes;
std::vector<Ssrc> stream_ssrcs;
- if (selected_audio_stream) {
- stream_indexes.push_back(selected_audio_stream->stream.index);
- stream_ssrcs.push_back(selected_audio_stream->stream.ssrc + 1);
+ if (properties.selected_audio) {
+ stream_indexes.push_back(properties.selected_audio->stream.index);
+ stream_ssrcs.push_back(properties.selected_audio->stream.ssrc + 1);
}
- if (selected_video_stream) {
- stream_indexes.push_back(selected_video_stream->stream.index);
- stream_ssrcs.push_back(selected_video_stream->stream.ssrc + 1);
+ if (properties.selected_video) {
+ stream_indexes.push_back(properties.selected_video->stream.index);
+ stream_ssrcs.push_back(properties.selected_video->stream.ssrc + 1);
}
absl::optional<Constraints> constraints;
diff --git a/cast/streaming/receiver_session.h b/cast/streaming/receiver_session.h
index b448bd00..b29b6d52 100644
--- a/cast/streaming/receiver_session.h
+++ b/cast/streaming/receiver_session.h
@@ -26,7 +26,7 @@ namespace cast {
class Environment;
class Receiver;
-class ReceiverSession final {
+class ReceiverSession final : public Environment::SocketSubscriber {
public:
// Upon successful negotiation, a set of configured receivers is constructed
// for handling audio and video. Note that either receiver may be null.
@@ -115,20 +115,37 @@ class ReceiverSession final {
const std::string& session_id() const { return session_id_; }
+ // Environment::SocketSubscriber event callbacks.
+ void OnSocketReady() override;
+ void OnSocketInvalid(Error error) override;
+
private:
+ struct SessionProperties {
+ std::unique_ptr<AudioStream> selected_audio;
+ std::unique_ptr<VideoStream> selected_video;
+ int sequence_number;
+
+ // To be valid either the audio or video must be selected, and we must
+ // have a sequence number we can reference.
+ bool IsValid() const;
+ };
+
// Specific message type handler methods.
void OnOffer(SenderMessage message);
+ // Creates receivers and sends an appropriate Answer message using the
+ // session properties.
+ void InitializeSession(const SessionProperties& properties);
+
// Used by SpawnReceivers to generate a receiver for a specific stream.
std::unique_ptr<Receiver> ConstructReceiver(const Stream& stream);
// Creates a set of configured receivers from a given pair of audio and
// video streams. NOTE: either audio or video may be null, but not both.
- ConfiguredReceivers SpawnReceivers(const AudioStream* audio,
- const VideoStream* video);
+ ConfiguredReceivers SpawnReceivers(const SessionProperties& properties);
// Callers of this method should ensure at least one stream is non-null.
- Answer ConstructAnswer(const AudioStream* audio, const VideoStream* video);
+ Answer ConstructAnswer(const SessionProperties& properties);
// Handles resetting receivers and notifying the client.
void ResetReceivers(Client::ReceiversDestroyingReason reason);
@@ -143,6 +160,12 @@ class ReceiverSession final {
const std::string session_id_;
ReceiverSessionMessager messager_;
+ // In some cases, the session initialization may be pending waiting for the
+ // UDP socket to be ready. In this case, the receivers and the answer
+ // message will not be configured and sent until the UDP socket has finished
+ // binding.
+ std::unique_ptr<SessionProperties> pending_session_;
+
bool supports_wifi_status_reporting_ = false;
ReceiverPacketRouter packet_router_;
diff --git a/cast/streaming/receiver_session_unittest.cc b/cast/streaming/receiver_session_unittest.cc
index fd2c48f6..afbc556e 100644
--- a/cast/streaming/receiver_session_unittest.cc
+++ b/cast/streaming/receiver_session_unittest.cc
@@ -282,8 +282,9 @@ class ReceiverSessionTest : public ::testing::Test {
auto environment_ = std::make_unique<NiceMock<MockEnvironment>>(
&FakeClock::now, &task_runner_);
ON_CALL(*environment_, GetBoundLocalEndpoint())
- .WillByDefault(
- Return(IPEndpoint{IPAddress::Parse("127.0.0.1").value(), 12345}));
+ .WillByDefault(Return(IPEndpoint{{127, 0, 0, 1}, 12345}));
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kReady);
return environment_;
}
@@ -609,9 +610,8 @@ TEST_F(ReceiverSessionTest, NotifiesReceiverDestruction) {
TEST_F(ReceiverSessionTest, HandlesInvalidAnswer) {
// Simulate an unbound local endpoint.
- EXPECT_CALL(*environment_, GetBoundLocalEndpoint).WillOnce([]() {
- return IPEndpoint{};
- });
+ EXPECT_CALL(*environment_, GetBoundLocalEndpoint)
+ .WillOnce(Return(IPEndpoint{}));
message_port_->ReceiveMessage(kValidOfferMessage);
const auto& messages = message_port_->posted_messages();
@@ -624,5 +624,71 @@ TEST_F(ReceiverSessionTest, HandlesInvalidAnswer) {
EXPECT_EQ("ANSWER", answer["type"].asString());
EXPECT_EQ("error", answer["result"].asString());
}
+
+TEST_F(ReceiverSessionTest, DelaysAnswerUntilEnvironmentIsReady) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kStarting);
+
+ // We should not have sent an answer yet--the UDP socket is not ready.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ ASSERT_TRUE(message_port_->posted_messages().empty());
+
+ // Simulate the environment calling back into us with the socket being ready.
+ // state() will not be called again--we just need to get the bind event.
+ EXPECT_CALL(*environment_, GetBoundLocalEndpoint())
+ .WillOnce(Return(IPEndpoint{{10, 0, 0, 2}, 4567}));
+ EXPECT_CALL(client_, OnNegotiated(session_.get(), _));
+ EXPECT_CALL(client_,
+ OnReceiversDestroying(session_.get(),
+ ReceiverSession::Client::kEndOfSession));
+ session_->OnSocketReady();
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ // We should have set the UDP port based on the ready socket value.
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ const Json::Value& answer_body = message_body.value()["answer"];
+ EXPECT_TRUE(answer_body.isObject());
+ EXPECT_EQ(4567, answer_body["udpPort"].asInt());
+}
+
+TEST_F(ReceiverSessionTest,
+ ReturnsErrorAnswerIfEnvironmentIsAlreadyInvalidated) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kInvalid);
+
+ // If the environment is already in a bad state, we can respond immediately.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ EXPECT_EQ("ANSWER", message_body.value()["type"].asString());
+ EXPECT_EQ("error", message_body.value()["result"].asString());
+}
+
+TEST_F(ReceiverSessionTest, ReturnsErrorAnswerIfEnvironmentIsInvalidated) {
+ environment_->set_socket_state_for_testing(
+ Environment::SocketState::kStarting);
+
+ // We should not have sent an answer yet--the environment is not ready.
+ message_port_->ReceiveMessage(kValidOfferMessage);
+ ASSERT_TRUE(message_port_->posted_messages().empty());
+
+ // Simulate the environment calling back into us with invalidation.
+ EXPECT_CALL(client_, OnError(_, _)).Times(1);
+ session_->OnSocketInvalid(Error::Code::kSocketBindFailure);
+ const auto& messages = message_port_->posted_messages();
+ ASSERT_EQ(1u, messages.size());
+
+ // We should have an error answer.
+ auto message_body = json::Parse(messages[0]);
+ EXPECT_TRUE(message_body.is_value());
+ EXPECT_EQ("ANSWER", message_body.value()["type"].asString());
+ EXPECT_EQ("error", message_body.value()["result"].asString());
+}
+
} // namespace cast
} // namespace openscreen
diff --git a/cast/streaming/receiver_unittest.cc b/cast/streaming/receiver_unittest.cc
index 0a6817e6..86fd95ff 100644
--- a/cast/streaming/receiver_unittest.cc
+++ b/cast/streaming/receiver_unittest.cc
@@ -26,6 +26,7 @@
#include "cast/streaming/sender_report_builder.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/ssrc.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/api/time.h"
@@ -249,7 +250,6 @@ class MockSender : public CompoundRtcpParser::Client {
CompoundRtcpParser rtcp_parser_;
FrameCrypto crypto_;
RtpPacketizer rtp_packetizer_;
-
FrameId max_feedback_frame_id_ = FrameId::first() + kMaxUnackedFrames;
EncryptedFrame frame_being_sent_;
@@ -278,8 +278,7 @@ class ReceiverTest : public testing::Test {
/* .aes_iv_mask = */ kCastIvMask,
/* .is_pli_enabled = */ true}),
sender_(&task_runner_, &env_) {
- env_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ env_.SetSocketSubscriber(&socket_subscriber_);
ON_CALL(env_, SendPacket(_))
.WillByDefault(Invoke([this](absl::Span<const uint8_t> packet) {
task_runner_.PostTaskWithDelay(
@@ -360,6 +359,7 @@ class ReceiverTest : public testing::Test {
Receiver receiver_;
testing::NiceMock<MockSender> sender_;
testing::NiceMock<MockConsumer> consumer_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the Receiver processes RTCP packets correctly and sends RTCP
diff --git a/cast/streaming/sender_packet_router_unittest.cc b/cast/streaming/sender_packet_router_unittest.cc
index 3c1d1f6a..13377a6d 100644
--- a/cast/streaming/sender_packet_router_unittest.cc
+++ b/cast/streaming/sender_packet_router_unittest.cc
@@ -8,6 +8,7 @@
#include "cast/streaming/constants.h"
#include "cast/streaming/mock_environment.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/base/ip_address.h"
@@ -155,8 +156,7 @@ class SenderPacketRouterTest : public testing::Test {
task_runner_(&clock_),
env_(&FakeClock::now, &task_runner_),
router_(&env_, kMaxPacketsPerBurst, kBurstInterval) {
- env_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ env_.SetSocketSubscriber(&socket_subscriber_);
}
~SenderPacketRouterTest() override = default;
@@ -182,6 +182,7 @@ class SenderPacketRouterTest : public testing::Test {
SenderPacketRouter router_;
testing::NiceMock<MockSender> audio_sender_;
testing::NiceMock<MockSender> video_sender_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the SenderPacketRouter is correctly configured from the specific
diff --git a/cast/streaming/sender_unittest.cc b/cast/streaming/sender_unittest.cc
index 1296e4a1..01c11f8f 100644
--- a/cast/streaming/sender_unittest.cc
+++ b/cast/streaming/sender_unittest.cc
@@ -32,6 +32,7 @@
#include "cast/streaming/sender_report_parser.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/ssrc.h"
+#include "cast/streaming/testing/simple_socket_subscriber.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/test/fake_clock.h"
@@ -350,8 +351,7 @@ class SenderTest : public testing::Test {
receiver_to_sender_pipe_(&task_runner_, &sender_packet_router_),
receiver_(&receiver_to_sender_pipe_),
sender_to_receiver_pipe_(&task_runner_, &receiver_) {
- sender_environment_.set_socket_error_handler(
- [](Error error) { ASSERT_TRUE(error.ok()) << error; });
+ sender_environment_.SetSocketSubscriber(&socket_subscriber_);
sender_environment_.set_remote_endpoint(
receiver_to_sender_pipe_.local_endpoint());
ON_CALL(sender_environment_, SendPacket(_))
@@ -442,6 +442,7 @@ class SenderTest : public testing::Test {
SimulatedNetworkPipe receiver_to_sender_pipe_;
NiceMock<MockReceiver> receiver_;
SimulatedNetworkPipe sender_to_receiver_pipe_;
+ SimpleSubscriber socket_subscriber_;
};
// Tests that the Sender can send EncodedFrames over an ideal network (i.e., low
diff --git a/cast/streaming/testing/simple_socket_subscriber.h b/cast/streaming/testing/simple_socket_subscriber.h
new file mode 100644
index 00000000..f6208b42
--- /dev/null
+++ b/cast/streaming/testing/simple_socket_subscriber.h
@@ -0,0 +1,22 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_
+#define CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_
+
+#include "cast/streaming/environment.h"
+#include "gtest/gtest.h"
+
+namespace openscreen {
+namespace cast {
+
+class SimpleSubscriber : public Environment::SocketSubscriber {
+ void OnSocketReady() {}
+ void OnSocketInvalid(Error error) { ASSERT_TRUE(error.ok()) << error; }
+};
+
+} // namespace cast
+} // namespace openscreen
+
+#endif // CAST_STREAMING_TESTING_SIMPLE_SOCKET_SUBSCRIBER_H_