aboutsummaryrefslogtreecommitdiff
path: root/rtc_base/virtual_socket_server.h
blob: 6c58a4bdfe5668fb02364381746bcf7c457427c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
/*
 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#ifndef RTC_BASE_VIRTUAL_SOCKET_SERVER_H_
#define RTC_BASE_VIRTUAL_SOCKET_SERVER_H_

#include <deque>
#include <map>
#include <vector>

#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/event.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/socket_server.h"
#include "rtc_base/synchronization/mutex.h"

namespace rtc {

class Packet;
class VirtualSocket;
class SocketAddressPair;

// Simulates a network in the same manner as a loopback interface.  The
// interface can create as many addresses as you want.  All of the sockets
// created by this network will be able to communicate with one another, unless
// they are bound to addresses from incompatible families.
class VirtualSocketServer : public SocketServer {
 public:
  VirtualSocketServer();
  // This constructor needs to be used if the test uses a fake clock and
  // ProcessMessagesUntilIdle, since ProcessMessagesUntilIdle needs a way of
  // advancing time.
  explicit VirtualSocketServer(ThreadProcessingFakeClock* fake_clock);
  ~VirtualSocketServer() override;

  // The default route indicates which local address to use when a socket is
  // bound to the 'any' address, e.g. 0.0.0.0.
  IPAddress GetDefaultRoute(int family);
  void SetDefaultRoute(const IPAddress& from_addr);

  // Limits the network bandwidth (maximum bytes per second).  Zero means that
  // all sends occur instantly.  Defaults to 0.
  uint32_t bandwidth() const { return bandwidth_; }
  void set_bandwidth(uint32_t bandwidth) { bandwidth_ = bandwidth; }

  // Limits the amount of data which can be in flight on the network without
  // packet loss (on a per sender basis).  Defaults to 64 KB.
  uint32_t network_capacity() const { return network_capacity_; }
  void set_network_capacity(uint32_t capacity) { network_capacity_ = capacity; }

  // The amount of data which can be buffered by tcp on the sender's side
  uint32_t send_buffer_capacity() const { return send_buffer_capacity_; }
  void set_send_buffer_capacity(uint32_t capacity) {
    send_buffer_capacity_ = capacity;
  }

  // The amount of data which can be buffered by tcp on the receiver's side
  uint32_t recv_buffer_capacity() const { return recv_buffer_capacity_; }
  void set_recv_buffer_capacity(uint32_t capacity) {
    recv_buffer_capacity_ = capacity;
  }

  // Controls the (transit) delay for packets sent in the network.  This does
  // not inclue the time required to sit in the send queue.  Both of these
  // values are measured in milliseconds.  Defaults to no delay.
  uint32_t delay_mean() const { return delay_mean_; }
  uint32_t delay_stddev() const { return delay_stddev_; }
  uint32_t delay_samples() const { return delay_samples_; }
  void set_delay_mean(uint32_t delay_mean) { delay_mean_ = delay_mean; }
  void set_delay_stddev(uint32_t delay_stddev) { delay_stddev_ = delay_stddev; }
  void set_delay_samples(uint32_t delay_samples) {
    delay_samples_ = delay_samples;
  }

  // If the (transit) delay parameters are modified, this method should be
  // called to recompute the new distribution.
  void UpdateDelayDistribution();

  // Controls the (uniform) probability that any sent packet is dropped.  This
  // is separate from calculations to drop based on queue size.
  double drop_probability() { return drop_prob_; }
  void set_drop_probability(double drop_prob) {
    RTC_DCHECK_GE(drop_prob, 0.0);
    RTC_DCHECK_LE(drop_prob, 1.0);
    drop_prob_ = drop_prob;
  }

  // Controls the maximum UDP payload for the networks simulated
  // by this server. Any UDP payload sent that is larger than this will
  // be dropped.
  size_t max_udp_payload() { return max_udp_payload_; }
  void set_max_udp_payload(size_t payload_size) {
    max_udp_payload_ = payload_size;
  }

  size_t largest_seen_udp_payload() { return largest_seen_udp_payload_; }

  // If |blocked| is true, subsequent attempts to send will result in -1 being
  // returned, with the socket error set to EWOULDBLOCK.
  //
  // If this method is later called with |blocked| set to false, any sockets
  // that previously failed to send with EWOULDBLOCK will emit SignalWriteEvent.
  //
  // This can be used to simulate the send buffer on a network interface being
  // full, and test functionality related to EWOULDBLOCK/SignalWriteEvent.
  void SetSendingBlocked(bool blocked);

  // SocketFactory:
  Socket* CreateSocket(int family, int type) override;
  AsyncSocket* CreateAsyncSocket(int family, int type) override;

  // SocketServer:
  void SetMessageQueue(Thread* queue) override;
  bool Wait(int cms, bool process_io) override;
  void WakeUp() override;

  void SetDelayOnAddress(const rtc::SocketAddress& address, int delay_ms) {
    delay_by_ip_[address.ipaddr()] = delay_ms;
  }

  // Used by TurnPortTest and TcpPortTest (for example), to mimic a case where
  // a proxy returns the local host address instead of the original one the
  // port was bound against. Please see WebRTC issue 3927 for more detail.
  //
  // If SetAlternativeLocalAddress(A, B) is called, then when something
  // attempts to bind a socket to address A, it will get a socket bound to
  // address B instead.
  void SetAlternativeLocalAddress(const rtc::IPAddress& address,
                                  const rtc::IPAddress& alternative);

  typedef std::pair<double, double> Point;
  typedef std::vector<Point> Function;

  static std::unique_ptr<Function> CreateDistribution(uint32_t mean,
                                                      uint32_t stddev,
                                                      uint32_t samples);

  // Similar to Thread::ProcessMessages, but it only processes messages until
  // there are no immediate messages or pending network traffic.  Returns false
  // if Thread::Stop() was called.
  bool ProcessMessagesUntilIdle();

  // Sets the next port number to use for testing.
  void SetNextPortForTesting(uint16_t port);

  // Close a pair of Tcp connections by addresses. Both connections will have
  // its own OnClose invoked.
  bool CloseTcpConnections(const SocketAddress& addr_local,
                           const SocketAddress& addr_remote);

  // Number of packets that clients have attempted to send through this virtual
  // socket server. Intended to be used for test assertions.
  uint32_t sent_packets() const { return sent_packets_; }

  // Binds the given socket to addr, assigning and IP and Port if necessary
  int Bind(VirtualSocket* socket, SocketAddress* addr);

  // Binds the given socket to the given (fully-defined) address.
  int Bind(VirtualSocket* socket, const SocketAddress& addr);

  int Unbind(const SocketAddress& addr, VirtualSocket* socket);

  // Adds a mapping between this socket pair and the socket.
  void AddConnection(const SocketAddress& client,
                     const SocketAddress& server,
                     VirtualSocket* socket);

  // Connects the given socket to the socket at the given address
  int Connect(VirtualSocket* socket,
              const SocketAddress& remote_addr,
              bool use_delay);

  // Sends a disconnect message to the socket at the given address
  bool Disconnect(VirtualSocket* socket);

  // Lookup address, and disconnect corresponding socket.
  bool Disconnect(const SocketAddress& addr);

  // Lookup connection, close corresponding socket.
  bool Disconnect(const SocketAddress& local_addr,
                  const SocketAddress& remote_addr);

  // Sends the given packet to the socket at the given address (if one exists).
  int SendUdp(VirtualSocket* socket,
              const char* data,
              size_t data_size,
              const SocketAddress& remote_addr);

  // Moves as much data as possible from the sender's buffer to the network
  void SendTcp(VirtualSocket* socket);

  // Like above, but lookup sender by address.
  void SendTcp(const SocketAddress& addr);

  // Computes the number of milliseconds required to send a packet of this size.
  uint32_t SendDelay(uint32_t size);

  // Cancel attempts to connect to a socket that is being closed.
  void CancelConnects(VirtualSocket* socket);

  // Clear incoming messages for a socket that is being closed.
  void Clear(VirtualSocket* socket);

  void ProcessOneMessage();

  void PostSignalReadEvent(VirtualSocket* socket);

  // Sending was previously blocked, but now isn't.
  sigslot::signal0<> SignalReadyToSend;

 protected:
  // Returns a new IP not used before in this network.
  IPAddress GetNextIP(int family);

  // Find the socket bound to the given address
  VirtualSocket* LookupBinding(const SocketAddress& addr);

 private:
  uint16_t GetNextPort();

  VirtualSocket* CreateSocketInternal(int family, int type);

  // Find the socket pair corresponding to this server address.
  VirtualSocket* LookupConnection(const SocketAddress& client,
                                  const SocketAddress& server);

  void RemoveConnection(const SocketAddress& client,
                        const SocketAddress& server);

  // Places a packet on the network.
  void AddPacketToNetwork(VirtualSocket* socket,
                          VirtualSocket* recipient,
                          int64_t cur_time,
                          const char* data,
                          size_t data_size,
                          size_t header_size,
                          bool ordered);

  // If the delay has been set for the address of the socket, returns the set
  // delay. Otherwise, returns a random transit delay chosen from the
  // appropriate distribution.
  uint32_t GetTransitDelay(Socket* socket);

  // Basic operations on functions.
  static std::unique_ptr<Function> Accumulate(std::unique_ptr<Function> f);
  static std::unique_ptr<Function> Invert(std::unique_ptr<Function> f);
  static std::unique_ptr<Function> Resample(std::unique_ptr<Function> f,
                                            double x1,
                                            double x2,
                                            uint32_t samples);
  static double Evaluate(const Function* f, double x);

  // Determine if two sockets should be able to communicate.
  // We don't (currently) specify an address family for sockets; instead,
  // the currently bound address is used to infer the address family.
  // Any socket that is not explicitly bound to an IPv4 address is assumed to be
  // dual-stack capable.
  // This function tests if two addresses can communicate, as well as the
  // sockets to which they may be bound (the addresses may or may not yet be
  // bound to the sockets).
  // First the addresses are tested (after normalization):
  // If both have the same family, then communication is OK.
  // If only one is IPv4 then false, unless the other is bound to ::.
  // This applies even if the IPv4 address is 0.0.0.0.
  // The socket arguments are optional; the sockets are checked to see if they
  // were explicitly bound to IPv6-any ('::'), and if so communication is
  // permitted.
  // NB: This scheme doesn't permit non-dualstack IPv6 sockets.
  static bool CanInteractWith(VirtualSocket* local, VirtualSocket* remote);

  typedef std::map<SocketAddress, VirtualSocket*> AddressMap;
  typedef std::map<SocketAddressPair, VirtualSocket*> ConnectionMap;

  // May be null if the test doesn't use a fake clock, or it does but doesn't
  // use ProcessMessagesUntilIdle.
  ThreadProcessingFakeClock* fake_clock_ = nullptr;

  // Used to implement Wait/WakeUp.
  Event wakeup_;
  Thread* msg_queue_;
  bool stop_on_idle_;
  in_addr next_ipv4_;
  in6_addr next_ipv6_;
  uint16_t next_port_;
  AddressMap* bindings_;
  ConnectionMap* connections_;

  IPAddress default_route_v4_;
  IPAddress default_route_v6_;

  uint32_t bandwidth_;
  uint32_t network_capacity_;
  uint32_t send_buffer_capacity_;
  uint32_t recv_buffer_capacity_;
  uint32_t delay_mean_;
  uint32_t delay_stddev_;
  uint32_t delay_samples_;

  // Used for testing.
  uint32_t sent_packets_ = 0;

  std::map<rtc::IPAddress, int> delay_by_ip_;
  std::map<rtc::IPAddress, rtc::IPAddress> alternative_address_mapping_;
  std::unique_ptr<Function> delay_dist_;

  double drop_prob_;
  // The largest UDP payload permitted on this virtual socket server.
  // The default is the max size of IPv4 fragmented UDP packet payload:
  // 65535 bytes - 8 bytes UDP header - 20 bytes IP header.
  size_t max_udp_payload_ = 65507;
  // The largest UDP payload seen so far.
  size_t largest_seen_udp_payload_ = 0;

  bool sending_blocked_ = false;
  RTC_DISALLOW_COPY_AND_ASSIGN(VirtualSocketServer);
};

// Implements the socket interface using the virtual network.  Packets are
// passed as messages using the message queue of the socket server.
class VirtualSocket : public AsyncSocket,
                      public MessageHandler,
                      public sigslot::has_slots<> {
 public:
  VirtualSocket(VirtualSocketServer* server, int family, int type, bool async);
  ~VirtualSocket() override;

  SocketAddress GetLocalAddress() const override;
  SocketAddress GetRemoteAddress() const override;

  int Bind(const SocketAddress& addr) override;
  int Connect(const SocketAddress& addr) override;
  int Close() override;
  int Send(const void* pv, size_t cb) override;
  int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
  int Recv(void* pv, size_t cb, int64_t* timestamp) override;
  int RecvFrom(void* pv,
               size_t cb,
               SocketAddress* paddr,
               int64_t* timestamp) override;
  int Listen(int backlog) override;
  VirtualSocket* Accept(SocketAddress* paddr) override;

  int GetError() const override;
  void SetError(int error) override;
  ConnState GetState() const override;
  int GetOption(Option opt, int* value) override;
  int SetOption(Option opt, int value) override;
  void OnMessage(Message* pmsg) override;

  size_t recv_buffer_size() const { return recv_buffer_size_; }
  size_t send_buffer_size() const { return send_buffer_.size(); }
  const char* send_buffer_data() const { return send_buffer_.data(); }

  // Used by server sockets to set the local address without binding.
  void SetLocalAddress(const SocketAddress& addr);

  bool was_any() { return was_any_; }
  void set_was_any(bool was_any) { was_any_ = was_any; }

  void SetToBlocked();

  void UpdateRecv(size_t data_size);
  void UpdateSend(size_t data_size);

  void MaybeSignalWriteEvent(size_t capacity);

  // Adds a packet to be sent. Returns delay, based on network_size_.
  uint32_t AddPacket(int64_t cur_time, size_t packet_size);

  int64_t UpdateOrderedDelivery(int64_t ts);

  // Removes stale packets from the network. Returns current size.
  size_t PurgeNetworkPackets(int64_t cur_time);

 private:
  struct NetworkEntry {
    size_t size;
    int64_t done_time;
  };

  typedef std::deque<SocketAddress> ListenQueue;
  typedef std::deque<NetworkEntry> NetworkQueue;
  typedef std::vector<char> SendBuffer;
  typedef std::list<Packet*> RecvBuffer;
  typedef std::map<Option, int> OptionsMap;

  int InitiateConnect(const SocketAddress& addr, bool use_delay);
  void CompleteConnect(const SocketAddress& addr);
  int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
  int SendTcp(const void* pv, size_t cb);

  void OnSocketServerReadyToSend();

  VirtualSocketServer* const server_;
  const int type_;
  const bool async_;
  ConnState state_;
  int error_;
  SocketAddress local_addr_;
  SocketAddress remote_addr_;

  // Pending sockets which can be Accepted
  std::unique_ptr<ListenQueue> listen_queue_ RTC_GUARDED_BY(mutex_)
      RTC_PT_GUARDED_BY(mutex_);

  // Data which tcp has buffered for sending
  SendBuffer send_buffer_;
  // Set to false if the last attempt to send resulted in EWOULDBLOCK.
  // Set back to true when the socket can send again.
  bool ready_to_send_ = true;

  // Mutex to protect the recv_buffer and listen_queue_
  webrtc::Mutex mutex_;

  // Network model that enforces bandwidth and capacity constraints
  NetworkQueue network_;
  size_t network_size_;
  // The scheduled delivery time of the last packet sent on this socket.
  // It is used to ensure ordered delivery of packets sent on this socket.
  int64_t last_delivery_time_ = 0;

  // Data which has been received from the network
  RecvBuffer recv_buffer_ RTC_GUARDED_BY(mutex_);
  // The amount of data which is in flight or in recv_buffer_
  size_t recv_buffer_size_;

  // Is this socket bound?
  bool bound_;

  // When we bind a socket to Any, VSS's Bind gives it another address. For
  // dual-stack sockets, we want to distinguish between sockets that were
  // explicitly given a particular address and sockets that had one picked
  // for them by VSS.
  bool was_any_;

  // Store the options that are set
  OptionsMap options_map_;
};

}  // namespace rtc

#endif  // RTC_BASE_VIRTUAL_SOCKET_SERVER_H_