diff options
Diffstat (limited to 'media/cast/test/transport/transport.cc')
-rw-r--r-- | media/cast/test/transport/transport.cc | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/media/cast/test/transport/transport.cc b/media/cast/test/transport/transport.cc new file mode 100644 index 0000000000..22f41ba32e --- /dev/null +++ b/media/cast/test/transport/transport.cc @@ -0,0 +1,218 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/cast/test/transport/transport.h" + +#include <string> + +#include "base/bind.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/rand_util.h" +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/base/rand_callback.h" +#include "net/base/test_completion_callback.h" + +namespace media { +namespace cast { +namespace test { + +const int kMaxPacketSize = 1500; + +class LocalUdpTransportData; + +void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) { + net::IPAddressNumber ip_number; + bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number); + if (!rv) + return; + *address = net::IPEndPoint(ip_number, port); +} + +class LocalUdpTransportData + : public base::RefCountedThreadSafe<LocalUdpTransportData> { + public: + LocalUdpTransportData(net::DatagramServerSocket* udp_socket, + scoped_refptr<base::TaskRunner> io_thread_proxy) + : udp_socket_(udp_socket), + buffer_(new net::IOBufferWithSize(kMaxPacketSize)), + io_thread_proxy_(io_thread_proxy) { + } + + void ListenTo(net::IPEndPoint bind_address) { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + + bind_address_ = bind_address; + io_thread_proxy_->PostTask(FROM_HERE, + base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this)); + } + + void DeletePacket(uint8* data) { + // Should be called from the receiver (not on the transport thread). + DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread())); + delete [] data; + } + + void PacketReceived(int size) { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + // Got a packet with length result. + uint8* data = new uint8[size]; + memcpy(data, buffer_->data(), size); + packet_receiver_->ReceivedPacket(data, size, + base::Bind(&LocalUdpTransportData::DeletePacket, this, data)); + RecvFromSocketLoop(); + + } + + void RecvFromSocketLoop() { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + // Callback should always trigger with a packet. + int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize, + &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived, + this)); + DCHECK(res >= net::ERR_IO_PENDING); + if (res > 0) { + PacketReceived(res); + } + } + + void set_packet_receiver(PacketReceiver* packet_receiver) { + packet_receiver_ = packet_receiver; + } + + void Close() { + udp_socket_->Close(); + } + + protected: + virtual ~LocalUdpTransportData() {} + + private: + friend class base::RefCountedThreadSafe<LocalUdpTransportData>; + + net::DatagramServerSocket* udp_socket_; + net::IPEndPoint bind_address_; + PacketReceiver* packet_receiver_; + scoped_refptr<net::IOBufferWithSize> buffer_; + scoped_refptr<base::TaskRunner> io_thread_proxy_; + + DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData); +}; + +class LocalPacketSender : public PacketSender, + public base::RefCountedThreadSafe<LocalPacketSender> { + public: + LocalPacketSender(net::DatagramServerSocket* udp_socket, + scoped_refptr<base::TaskRunner> io_thread_proxy) + : udp_socket_(udp_socket), + send_address_(), + loss_limit_(0), + io_thread_proxy_(io_thread_proxy) {} + + virtual bool SendPacket(const Packet& packet) OVERRIDE { + io_thread_proxy_->PostTask(FROM_HERE, + base::Bind(&LocalPacketSender::SendPacketToNetwork, + this, packet)); + return true; + } + + virtual void SendPacketToNetwork(const Packet& packet) { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + const uint8* data = packet.data(); + if (loss_limit_ > 0) { + int r = base::RandInt(0, 100); + if (r < loss_limit_) { + VLOG(1) << "Drop packet f:" << static_cast<int>(data[12 + 1]) + << " p:" << static_cast<int>(data[12 + 3]) + << " m:" << static_cast<int>(data[12 + 5]); + } + } + net::TestCompletionCallback callback; + scoped_refptr<net::WrappedIOBuffer> buffer( + new net::WrappedIOBuffer(reinterpret_cast<const char*>(data))); + udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()), + send_address_, callback.callback()); + } + + virtual bool SendPackets(const PacketList& packets) OVERRIDE { + bool out_val = true; + for (size_t i = 0; i < packets.size(); ++i) { + const Packet& packet = packets[i]; + out_val |= SendPacket(packet); + } + return out_val; + } + + void SetPacketLoss(int percentage) { + DCHECK_GE(percentage, 0); + DCHECK_LT(percentage, 100); + loss_limit_ = percentage; + } + + void SetSendAddress(const net::IPEndPoint& send_address) { + send_address_ = send_address; + } + + protected: + virtual ~LocalPacketSender() {} + + private: + friend class base::RefCountedThreadSafe<LocalPacketSender>; + + net::DatagramServerSocket* udp_socket_; // Not owned by this class. + net::IPEndPoint send_address_; + int loss_limit_; + scoped_refptr<base::TaskRunner> io_thread_proxy_; +}; + +Transport::Transport( + scoped_refptr<base::TaskRunner> io_thread_proxy) + : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), + local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(), + io_thread_proxy)), + packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)), + io_thread_proxy_(io_thread_proxy) {} + +Transport::~Transport() {} + +PacketSender* Transport::packet_sender() { + return static_cast<PacketSender*>(packet_sender_.get()); +} + +void Transport::SetSendSidePacketLoss(int percentage) { + packet_sender_->SetPacketLoss(percentage); +} + +void Transport::StopReceiving() { + local_udp_transport_data_->Close(); +} + +void Transport::SetLocalReceiver(PacketReceiver* packet_receiver, + std::string ip_address, + std::string local_ip_address, + int port) { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + net::IPEndPoint bind_address, local_bind_address; + CreateUDPAddress(ip_address, port, &bind_address); + CreateUDPAddress(local_ip_address, port, &local_bind_address); + local_udp_transport_data_->set_packet_receiver(packet_receiver); + udp_socket_->AllowAddressReuse(); + udp_socket_->SetMulticastLoopbackMode(true); + udp_socket_->Listen(local_bind_address); + + // Start listening once receiver has been set. + local_udp_transport_data_->ListenTo(bind_address); +} + +void Transport::SetSendDestination(std::string ip_address, int port) { + net::IPEndPoint send_address; + CreateUDPAddress(ip_address, port, &send_address); + packet_sender_->SetSendAddress(send_address); +} + +} // namespace test +} // namespace cast +} // namespace media |