aboutsummaryrefslogtreecommitdiff
path: root/webrtc/base/stream.h
blob: c57daae76c591ba83709da6c134317cd4b0237f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
/*
 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#ifndef WEBRTC_BASE_STREAM_H_
#define WEBRTC_BASE_STREAM_H_

#include <stdio.h>

#include "webrtc/base/basictypes.h"
#include "webrtc/base/buffer.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/messagehandler.h"
#include "webrtc/base/messagequeue.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/base/sigslot.h"

namespace rtc {

///////////////////////////////////////////////////////////////////////////////
// StreamInterface is a generic asynchronous stream interface, supporting read,
// write, and close operations, and asynchronous signalling of state changes.
// The interface is designed with file, memory, and socket implementations in
// mind.  Some implementations offer extended operations, such as seeking.
///////////////////////////////////////////////////////////////////////////////

// The following enumerations are declared outside of the StreamInterface
// class for brevity in use.

// The SS_OPENING state indicates that the stream will signal open or closed
// in the future.
enum StreamState { SS_CLOSED, SS_OPENING, SS_OPEN };

// Stream read/write methods return this value to indicate various success
// and failure conditions described below.
enum StreamResult { SR_ERROR, SR_SUCCESS, SR_BLOCK, SR_EOS };

// StreamEvents are used to asynchronously signal state transitionss.  The flags
// may be combined.
//  SE_OPEN: The stream has transitioned to the SS_OPEN state
//  SE_CLOSE: The stream has transitioned to the SS_CLOSED state
//  SE_READ: Data is available, so Read is likely to not return SR_BLOCK
//  SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };

class Thread;

struct StreamEventData : public MessageData {
  int events, error;
  StreamEventData(int ev, int er) : events(ev), error(er) { }
};

class StreamInterface : public MessageHandler {
 public:
  enum {
    MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT
  };

  ~StreamInterface() override;

  virtual StreamState GetState() const = 0;

  // Read attempts to fill buffer of size buffer_len.  Write attempts to send
  // data_len bytes stored in data.  The variables read and write are set only
  // on SR_SUCCESS (see below).  Likewise, error is only set on SR_ERROR.
  // Read and Write return a value indicating:
  //  SR_ERROR: an error occurred, which is returned in a non-null error
  //    argument.  Interpretation of the error requires knowledge of the
  //    stream's concrete type, which limits its usefulness.
  //  SR_SUCCESS: some number of bytes were successfully written, which is
  //    returned in a non-null read/write argument.
  //  SR_BLOCK: the stream is in non-blocking mode, and the operation would
  //    block, or the stream is in SS_OPENING state.
  //  SR_EOS: the end-of-stream has been reached, or the stream is in the
  //    SS_CLOSED state.
  virtual StreamResult Read(void* buffer, size_t buffer_len,
                            size_t* read, int* error) = 0;
  virtual StreamResult Write(const void* data, size_t data_len,
                             size_t* written, int* error) = 0;
  // Attempt to transition to the SS_CLOSED state.  SE_CLOSE will not be
  // signalled as a result of this call.
  virtual void Close() = 0;

  // Streams may signal one or more StreamEvents to indicate state changes.
  // The first argument identifies the stream on which the state change occured.
  // The second argument is a bit-wise combination of StreamEvents.
  // If SE_CLOSE is signalled, then the third argument is the associated error
  // code.  Otherwise, the value is undefined.
  // Note: Not all streams will support asynchronous event signalling.  However,
  // SS_OPENING and SR_BLOCK returned from stream member functions imply that
  // certain events will be raised in the future.
  sigslot::signal3<StreamInterface*, int, int> SignalEvent;

  // Like calling SignalEvent, but posts a message to the specified thread,
  // which will call SignalEvent.  This helps unroll the stack and prevent
  // re-entrancy.
  void PostEvent(Thread* t, int events, int err);
  // Like the aforementioned method, but posts to the current thread.
  void PostEvent(int events, int err);

  //
  // OPTIONAL OPERATIONS
  //
  // Not all implementations will support the following operations.  In general,
  // a stream will only support an operation if it reasonably efficient to do
  // so.  For example, while a socket could buffer incoming data to support
  // seeking, it will not do so.  Instead, a buffering stream adapter should
  // be used.
  //
  // Even though several of these operations are related, you should
  // always use whichever operation is most relevant.  For example, you may
  // be tempted to use GetSize() and GetPosition() to deduce the result of
  // GetAvailable().  However, a stream which is read-once may support the
  // latter operation but not the former.
  //

  // The following four methods are used to avoid copying data multiple times.

  // GetReadData returns a pointer to a buffer which is owned by the stream.
  // The buffer contains data_len bytes.  NULL is returned if no data is
  // available, or if the method fails.  If the caller processes the data, it
  // must call ConsumeReadData with the number of processed bytes.  GetReadData
  // does not require a matching call to ConsumeReadData if the data is not
  // processed.  Read and ConsumeReadData invalidate the buffer returned by
  // GetReadData.
  virtual const void* GetReadData(size_t* data_len);
  virtual void ConsumeReadData(size_t used) {}

  // GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
  // The buffer has a capacity of buf_len bytes.  NULL is returned if there is
  // no buffer available, or if the method fails.  The call may write data to
  // the buffer, and then call ConsumeWriteBuffer with the number of bytes
  // written.  GetWriteBuffer does not require a matching call to
  // ConsumeWriteData if no data is written.  Write, ForceWrite, and
  // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
  // TODO: Allow the caller to specify a minimum buffer size.  If the specified
  // amount of buffer is not yet available, return NULL and Signal SE_WRITE
  // when it is available.  If the requested amount is too large, return an
  // error.
  virtual void* GetWriteBuffer(size_t* buf_len);
  virtual void ConsumeWriteBuffer(size_t used) {}

  // Write data_len bytes found in data, circumventing any throttling which
  // would could cause SR_BLOCK to be returned.  Returns true if all the data
  // was written.  Otherwise, the method is unsupported, or an unrecoverable
  // error occurred, and the error value is set.  This method should be used
  // sparingly to write critical data which should not be throttled.  A stream
  // which cannot circumvent its blocking constraints should not implement this
  // method.
  // NOTE: This interface is being considered experimentally at the moment.  It
  // would be used by JUDP and BandwidthStream as a way to circumvent certain
  // soft limits in writing.
  //virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
  //  if (error) *error = -1;
  //  return false;
  //}

  // Seek to a byte offset from the beginning of the stream.  Returns false if
  // the stream does not support seeking, or cannot seek to the specified
  // position.
  virtual bool SetPosition(size_t position);

  // Get the byte offset of the current position from the start of the stream.
  // Returns false if the position is not known.
  virtual bool GetPosition(size_t* position) const;

  // Get the byte length of the entire stream.  Returns false if the length
  // is not known.
  virtual bool GetSize(size_t* size) const;

  // Return the number of Read()-able bytes remaining before end-of-stream.
  // Returns false if not known.
  virtual bool GetAvailable(size_t* size) const;

  // Return the number of Write()-able bytes remaining before end-of-stream.
  // Returns false if not known.
  virtual bool GetWriteRemaining(size_t* size) const;

  // Return true if flush is successful.
  virtual bool Flush();

  // Communicates the amount of data which will be written to the stream.  The
  // stream may choose to preallocate memory to accomodate this data.  The
  // stream may return false to indicate that there is not enough room (ie,
  // Write will return SR_EOS/SR_ERROR at some point).  Note that calling this
  // function should not affect the existing state of data in the stream.
  virtual bool ReserveSize(size_t size);

  //
  // CONVENIENCE METHODS
  //
  // These methods are implemented in terms of other methods, for convenience.
  //

  // Seek to the start of the stream.
  inline bool Rewind() { return SetPosition(0); }

  // WriteAll is a helper function which repeatedly calls Write until all the
  // data is written, or something other than SR_SUCCESS is returned.  Note that
  // unlike Write, the argument 'written' is always set, and may be non-zero
  // on results other than SR_SUCCESS.  The remaining arguments have the
  // same semantics as Write.
  StreamResult WriteAll(const void* data, size_t data_len,
                        size_t* written, int* error);

  // Similar to ReadAll.  Calls Read until buffer_len bytes have been read, or
  // until a non-SR_SUCCESS result is returned.  'read' is always set.
  StreamResult ReadAll(void* buffer, size_t buffer_len,
                       size_t* read, int* error);

  // ReadLine is a helper function which repeatedly calls Read until it hits
  // the end-of-line character, or something other than SR_SUCCESS.
  // TODO: this is too inefficient to keep here.  Break this out into a buffered
  // readline object or adapter
  StreamResult ReadLine(std::string* line);

 protected:
  StreamInterface();

  // MessageHandler Interface
  void OnMessage(Message* msg) override;

 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
};

///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface is a convenient base-class for adapting a stream.
// By default, all operations are pass-through.  Override the methods that you
// require adaptation.  Streams should really be upgraded to reference-counted.
// In the meantime, use the owned flag to indicate whether the adapter should
// own the adapted stream.
///////////////////////////////////////////////////////////////////////////////

class StreamAdapterInterface : public StreamInterface,
                               public sigslot::has_slots<> {
 public:
  explicit StreamAdapterInterface(StreamInterface* stream, bool owned = true);

  // Core Stream Interface
  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;
  void Close() override;

  // Optional Stream Interface
  /*  Note: Many stream adapters were implemented prior to this Read/Write
      interface.  Therefore, a simple pass through of data in those cases may
      be broken.  At a later time, we should do a once-over pass of all
      adapters, and make them compliant with these interfaces, after which this
      code can be uncommented.
  virtual const void* GetReadData(size_t* data_len) {
    return stream_->GetReadData(data_len);
  }
  virtual void ConsumeReadData(size_t used) {
    stream_->ConsumeReadData(used);
  }

  virtual void* GetWriteBuffer(size_t* buf_len) {
    return stream_->GetWriteBuffer(buf_len);
  }
  virtual void ConsumeWriteBuffer(size_t used) {
    stream_->ConsumeWriteBuffer(used);
  }
  */

  /*  Note: This interface is currently undergoing evaluation.
  virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
    return stream_->ForceWrite(data, data_len, error);
  }
  */

  bool SetPosition(size_t position) override;
  bool GetPosition(size_t* position) const override;
  bool GetSize(size_t* size) const override;
  bool GetAvailable(size_t* size) const override;
  bool GetWriteRemaining(size_t* size) const override;
  bool ReserveSize(size_t size) override;
  bool Flush() override;

  void Attach(StreamInterface* stream, bool owned = true);
  StreamInterface* Detach();

 protected:
  ~StreamAdapterInterface() override;

  // Note that the adapter presents itself as the origin of the stream events,
  // since users of the adapter may not recognize the adapted object.
  virtual void OnEvent(StreamInterface* stream, int events, int err);
  StreamInterface* stream() { return stream_; }

 private:
  StreamInterface* stream_;
  bool owned_;
  RTC_DISALLOW_COPY_AND_ASSIGN(StreamAdapterInterface);
};

///////////////////////////////////////////////////////////////////////////////
// StreamTap is a non-modifying, pass-through adapter, which copies all data
// in either direction to the tap.  Note that errors or blocking on writing to
// the tap will prevent further tap writes from occurring.
///////////////////////////////////////////////////////////////////////////////

class StreamTap : public StreamAdapterInterface {
 public:
  explicit StreamTap(StreamInterface* stream, StreamInterface* tap);
  ~StreamTap() override;

  void AttachTap(StreamInterface* tap);
  StreamInterface* DetachTap();
  StreamResult GetTapResult(int* error);

  // StreamAdapterInterface Interface
  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;

 private:
  scoped_ptr<StreamInterface> tap_;
  StreamResult tap_result_;
  int tap_error_;
  RTC_DISALLOW_COPY_AND_ASSIGN(StreamTap);
};

///////////////////////////////////////////////////////////////////////////////
// NullStream gives errors on read, and silently discards all written data.
///////////////////////////////////////////////////////////////////////////////

class NullStream : public StreamInterface {
 public:
  NullStream();
  ~NullStream() override;

  // StreamInterface Interface
  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;
  void Close() override;
};

///////////////////////////////////////////////////////////////////////////////
// FileStream is a simple implementation of a StreamInterface, which does not
// support asynchronous notification.
///////////////////////////////////////////////////////////////////////////////

class FileStream : public StreamInterface {
 public:
  FileStream();
  ~FileStream() override;

  // The semantics of filename and mode are the same as stdio's fopen
  virtual bool Open(const std::string& filename, const char* mode, int* error);
  virtual bool OpenShare(const std::string& filename, const char* mode,
                         int shflag, int* error);

  // By default, reads and writes are buffered for efficiency.  Disabling
  // buffering causes writes to block until the bytes on disk are updated.
  virtual bool DisableBuffering();

  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;
  void Close() override;
  bool SetPosition(size_t position) override;
  bool GetPosition(size_t* position) const override;
  bool GetSize(size_t* size) const override;
  bool GetAvailable(size_t* size) const override;
  bool ReserveSize(size_t size) override;

  bool Flush() override;

#if defined(WEBRTC_POSIX) && !defined(__native_client__)
  // Tries to aquire an exclusive lock on the file.
  // Use OpenShare(...) on win32 to get similar functionality.
  bool TryLock();
  bool Unlock();
#endif

  // Note: Deprecated in favor of Filesystem::GetFileSize().
  static bool GetSize(const std::string& filename, size_t* size);

 protected:
  virtual void DoClose();

  FILE* file_;

 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(FileStream);
};

///////////////////////////////////////////////////////////////////////////////
// MemoryStream is a simple implementation of a StreamInterface over in-memory
// data.  Data is read and written at the current seek position.  Reads return
// end-of-stream when they reach the end of data.  Writes actually extend the
// end of data mark.
///////////////////////////////////////////////////////////////////////////////

class MemoryStreamBase : public StreamInterface {
 public:
  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t bytes,
                    size_t* bytes_read,
                    int* error) override;
  StreamResult Write(const void* buffer,
                     size_t bytes,
                     size_t* bytes_written,
                     int* error) override;
  void Close() override;
  bool SetPosition(size_t position) override;
  bool GetPosition(size_t* position) const override;
  bool GetSize(size_t* size) const override;
  bool GetAvailable(size_t* size) const override;
  bool ReserveSize(size_t size) override;

  char* GetBuffer() { return buffer_; }
  const char* GetBuffer() const { return buffer_; }

 protected:
  MemoryStreamBase();

  virtual StreamResult DoReserve(size_t size, int* error);

  // Invariant: 0 <= seek_position <= data_length_ <= buffer_length_
  char* buffer_;
  size_t buffer_length_;
  size_t data_length_;
  size_t seek_position_;

 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(MemoryStreamBase);
};

// MemoryStream dynamically resizes to accomodate written data.

class MemoryStream : public MemoryStreamBase {
 public:
  MemoryStream();
  explicit MemoryStream(const char* data);  // Calls SetData(data, strlen(data))
  MemoryStream(const void* data, size_t length);  // Calls SetData(data, length)
  ~MemoryStream() override;

  void SetData(const void* data, size_t length);

 protected:
  StreamResult DoReserve(size_t size, int* error) override;
  // Memory Streams are aligned for efficiency.
  static const int kAlignment = 16;
  char* buffer_alloc_;
};

// ExternalMemoryStream adapts an external memory buffer, so writes which would
// extend past the end of the buffer will return end-of-stream.

class ExternalMemoryStream : public MemoryStreamBase {
 public:
  ExternalMemoryStream();
  ExternalMemoryStream(void* data, size_t length);
  ~ExternalMemoryStream() override;

  void SetData(void* data, size_t length);
};

// FifoBuffer allows for efficient, thread-safe buffering of data between
// writer and reader. As the data can wrap around the end of the buffer,
// MemoryStreamBase can't help us here.

class FifoBuffer : public StreamInterface {
 public:
  // Creates a FIFO buffer with the specified capacity.
  explicit FifoBuffer(size_t length);
  // Creates a FIFO buffer with the specified capacity and owner
  FifoBuffer(size_t length, Thread* owner);
  ~FifoBuffer() override;
  // Gets the amount of data currently readable from the buffer.
  bool GetBuffered(size_t* data_len) const;
  // Resizes the buffer to the specified capacity. Fails if data_length_ > size
  bool SetCapacity(size_t length);

  // Read into |buffer| with an offset from the current read position, offset
  // is specified in number of bytes.
  // This method doesn't adjust read position nor the number of available
  // bytes, user has to call ConsumeReadData() to do this.
  StreamResult ReadOffset(void* buffer, size_t bytes, size_t offset,
                          size_t* bytes_read);

  // Write |buffer| with an offset from the current write position, offset is
  // specified in number of bytes.
  // This method doesn't adjust the number of buffered bytes, user has to call
  // ConsumeWriteBuffer() to do this.
  StreamResult WriteOffset(const void* buffer, size_t bytes, size_t offset,
                           size_t* bytes_written);

  // StreamInterface methods
  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t bytes,
                    size_t* bytes_read,
                    int* error) override;
  StreamResult Write(const void* buffer,
                     size_t bytes,
                     size_t* bytes_written,
                     int* error) override;
  void Close() override;
  const void* GetReadData(size_t* data_len) override;
  void ConsumeReadData(size_t used) override;
  void* GetWriteBuffer(size_t* buf_len) override;
  void ConsumeWriteBuffer(size_t used) override;
  bool GetWriteRemaining(size_t* size) const override;

 private:
  // Helper method that implements ReadOffset. Caller must acquire a lock
  // when calling this method.
  StreamResult ReadOffsetLocked(void* buffer, size_t bytes, size_t offset,
                                size_t* bytes_read);

  // Helper method that implements WriteOffset. Caller must acquire a lock
  // when calling this method.
  StreamResult WriteOffsetLocked(const void* buffer, size_t bytes,
                                 size_t offset, size_t* bytes_written);

  StreamState state_;  // keeps the opened/closed state of the stream
  scoped_ptr<char[]> buffer_;  // the allocated buffer
  size_t buffer_length_;  // size of the allocated buffer
  size_t data_length_;  // amount of readable data in the buffer
  size_t read_position_;  // offset to the readable data
  Thread* owner_;  // stream callbacks are dispatched on this thread
  mutable CriticalSection crit_;  // object lock
  RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
};

///////////////////////////////////////////////////////////////////////////////

class LoggingAdapter : public StreamAdapterInterface {
 public:
  LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
                 const std::string& label, bool hex_mode = false);

  void set_label(const std::string& label);

  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;
  void Close() override;

 protected:
  void OnEvent(StreamInterface* stream, int events, int err) override;

 private:
  LoggingSeverity level_;
  std::string label_;
  bool hex_mode_;
  LogMultilineState lms_;

  RTC_DISALLOW_COPY_AND_ASSIGN(LoggingAdapter);
};

///////////////////////////////////////////////////////////////////////////////
// StringStream - Reads/Writes to an external std::string
///////////////////////////////////////////////////////////////////////////////

class StringStream : public StreamInterface {
 public:
  explicit StringStream(std::string* str);
  explicit StringStream(const std::string& str);

  StreamState GetState() const override;
  StreamResult Read(void* buffer,
                    size_t buffer_len,
                    size_t* read,
                    int* error) override;
  StreamResult Write(const void* data,
                     size_t data_len,
                     size_t* written,
                     int* error) override;
  void Close() override;
  bool SetPosition(size_t position) override;
  bool GetPosition(size_t* position) const override;
  bool GetSize(size_t* size) const override;
  bool GetAvailable(size_t* size) const override;
  bool ReserveSize(size_t size) override;

 private:
  std::string& str_;
  size_t read_pos_;
  bool read_only_;
};

///////////////////////////////////////////////////////////////////////////////
// StreamReference - A reference counting stream adapter
///////////////////////////////////////////////////////////////////////////////

// Keep in mind that the streams and adapters defined in this file are
// not thread-safe, so this has limited uses.

// A StreamRefCount holds the reference count and a pointer to the
// wrapped stream. It deletes the wrapped stream when there are no
// more references. We can then have multiple StreamReference
// instances pointing to one StreamRefCount, all wrapping the same
// stream.

class StreamReference : public StreamAdapterInterface {
  class StreamRefCount;
 public:
  // Constructor for the first reference to a stream
  // Note: get more references through NewReference(). Use this
  // constructor only once on a given stream.
  explicit StreamReference(StreamInterface* stream);
  StreamInterface* GetStream() { return stream(); }
  StreamInterface* NewReference();
  ~StreamReference() override;

 private:
  class StreamRefCount {
   public:
    explicit StreamRefCount(StreamInterface* stream)
        : stream_(stream), ref_count_(1) {
    }
    void AddReference() {
      CritScope lock(&cs_);
      ++ref_count_;
    }
    void Release() {
      int ref_count;
      {  // Atomic ops would have been a better fit here.
        CritScope lock(&cs_);
        ref_count = --ref_count_;
      }
      if (ref_count == 0) {
        delete stream_;
        delete this;
      }
    }
   private:
    StreamInterface* stream_;
    int ref_count_;
    CriticalSection cs_;
    RTC_DISALLOW_COPY_AND_ASSIGN(StreamRefCount);
  };

  // Constructor for adding references
  explicit StreamReference(StreamRefCount* stream_ref_count,
                           StreamInterface* stream);

  StreamRefCount* stream_ref_count_;
  RTC_DISALLOW_COPY_AND_ASSIGN(StreamReference);
};

///////////////////////////////////////////////////////////////////////////////

// Flow attempts to move bytes from source to sink via buffer of size
// buffer_len.  The function returns SR_SUCCESS when source reaches
// end-of-stream (returns SR_EOS), and all the data has been written successful
// to sink.  Alternately, if source returns SR_BLOCK or SR_ERROR, or if sink
// returns SR_BLOCK, SR_ERROR, or SR_EOS, then the function immediately returns
// with the unexpected StreamResult value.
// data_len is the length of the valid data in buffer. in case of error
// this is the data that read from source but can't move to destination.
// as a pass in parameter, it indicates data in buffer that should move to sink
StreamResult Flow(StreamInterface* source,
                  char* buffer, size_t buffer_len,
                  StreamInterface* sink, size_t* data_len = NULL);

///////////////////////////////////////////////////////////////////////////////

}  // namespace rtc

#endif  // WEBRTC_BASE_STREAM_H_