aboutsummaryrefslogtreecommitdiff
path: root/src/tracing/core/trace_buffer.cc
blob: baa6a3486d38478d5fde9fc460788367376a66ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "src/tracing/core/trace_buffer.h"

#include <limits>

#include "perfetto/base/logging.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/tracing/core/shared_memory_abi.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/protozero/proto_utils.h"

#define TRACE_BUFFER_VERBOSE_LOGGING() 0  // Set to 1 when debugging unittests.
#if TRACE_BUFFER_VERBOSE_LOGGING()
#define TRACE_BUFFER_DLOG PERFETTO_DLOG
#else
#define TRACE_BUFFER_DLOG(...) void()
#endif

namespace perfetto {

namespace {
constexpr uint8_t kFirstPacketContinuesFromPrevChunk =
    SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk;
constexpr uint8_t kLastPacketContinuesOnNextChunk =
    SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk;
constexpr uint8_t kChunkNeedsPatching =
    SharedMemoryABI::ChunkHeader::kChunkNeedsPatching;
}  // namespace.

const size_t TraceBuffer::InlineChunkHeaderSize = sizeof(ChunkRecord);

// static
std::unique_ptr<TraceBuffer> TraceBuffer::Create(size_t size_in_bytes,
                                                 OverwritePolicy pol) {
  std::unique_ptr<TraceBuffer> trace_buffer(new TraceBuffer(pol));
  if (!trace_buffer->Initialize(size_in_bytes))
    return nullptr;
  return trace_buffer;
}

TraceBuffer::TraceBuffer(OverwritePolicy pol) : overwrite_policy_(pol) {
  // See comments in ChunkRecord for the rationale of this.
  static_assert(sizeof(ChunkRecord) == sizeof(SharedMemoryABI::PageHeader) +
                                           sizeof(SharedMemoryABI::ChunkHeader),
                "ChunkRecord out of sync with the layout of SharedMemoryABI");
}

TraceBuffer::~TraceBuffer() = default;

bool TraceBuffer::Initialize(size_t size) {
  static_assert(
      SharedMemoryABI::kMinPageSize % sizeof(ChunkRecord) == 0,
      "sizeof(ChunkRecord) must be an integer divider of a page size");
  auto max_size = std::numeric_limits<decltype(ChunkMeta::record_off)>::max();
  PERFETTO_CHECK(size <= static_cast<size_t>(max_size));
  data_ = base::PagedMemory::Allocate(
      size, base::PagedMemory::kMayFail | base::PagedMemory::kDontCommit);
  if (!data_.IsValid()) {
    PERFETTO_ELOG("Trace buffer allocation failed (size: %zu)", size);
    return false;
  }
  size_ = size;
  stats_.set_buffer_size(size);
  max_chunk_size_ = std::min(size, ChunkRecord::kMaxSize);
  wptr_ = begin();
  index_.clear();
  last_chunk_id_written_.clear();
  read_iter_ = GetReadIterForSequence(index_.end());
  return true;
}

// Note: |src| points to a shmem region that is shared with the producer. Assume
// that the producer is malicious and will change the content of |src|
// while we execute here. Don't do any processing on it other than memcpy().
void TraceBuffer::CopyChunkUntrusted(ProducerID producer_id_trusted,
                                     uid_t producer_uid_trusted,
                                     pid_t producer_pid_trusted,
                                     WriterID writer_id,
                                     ChunkID chunk_id,
                                     uint16_t num_fragments,
                                     uint8_t chunk_flags,
                                     bool chunk_complete,
                                     const uint8_t* src,
                                     size_t size) {
  PERFETTO_CHECK(!read_only_);

  // |record_size| = |size| + sizeof(ChunkRecord), rounded up to avoid to end
  // up in a fragmented state where size_to_end() < sizeof(ChunkRecord).
  const size_t record_size =
      base::AlignUp<sizeof(ChunkRecord)>(size + sizeof(ChunkRecord));
  TRACE_BUFFER_DLOG("CopyChunk @ %" PRIdPTR ", size=%zu", wptr_ - begin(), record_size);
  if (PERFETTO_UNLIKELY(record_size > max_chunk_size_)) {
    stats_.set_abi_violations(stats_.abi_violations() + 1);
    PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
    return;
  }

#if PERFETTO_DCHECK_IS_ON()
  changed_since_last_read_ = true;
#endif

  // If the chunk hasn't been completed, we should only consider the first
  // |num_fragments - 1| packets complete. For simplicity, we simply disregard
  // the last one when we copy the chunk.
  if (PERFETTO_UNLIKELY(!chunk_complete)) {
    if (num_fragments > 0) {
      num_fragments--;
      // These flags should only affect the last packet in the chunk. We clear
      // them, so that TraceBuffer is able to look at the remaining packets in
      // this chunk.
      chunk_flags &= ~kLastPacketContinuesOnNextChunk;
      chunk_flags &= ~kChunkNeedsPatching;
    }
  }

  ChunkRecord record(record_size);
  record.producer_id = producer_id_trusted;
  record.chunk_id = chunk_id;
  record.writer_id = writer_id;
  record.num_fragments = num_fragments;
  record.flags = chunk_flags;
  ChunkMeta::Key key(record);

  // Check whether we have already copied the same chunk previously. This may
  // happen if the service scrapes chunks in a potentially incomplete state
  // before receiving commit requests for them from the producer. Note that the
  // service may scrape and thus override chunks in arbitrary order since the
  // chunks aren't ordered in the SMB.
  const auto it = index_.find(key);
  if (PERFETTO_UNLIKELY(it != index_.end())) {
    ChunkMeta* record_meta = &it->second;
    ChunkRecord* prev = GetChunkRecordAt(begin() + record_meta->record_off);

    // Verify that the old chunk's metadata corresponds to the new one.
    // Overridden chunks should never change size, since the page layout is
    // fixed per writer. The number of fragments should also never decrease and
    // flags should not be removed.
    if (PERFETTO_UNLIKELY(ChunkMeta::Key(*prev) != key ||
                          prev->size != record_size ||
                          prev->num_fragments > num_fragments ||
                          (prev->flags & chunk_flags) != prev->flags)) {
      stats_.set_abi_violations(stats_.abi_violations() + 1);
      PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
      return;
    }

    // If this chunk was previously copied with the same number of fragments and
    // the number didn't change, there's no need to copy it again. If the
    // previous chunk was complete already, this should always be the case.
    PERFETTO_DCHECK(suppress_client_dchecks_for_testing_ ||
                    !record_meta->is_complete() ||
                    (chunk_complete && prev->num_fragments == num_fragments));
    if (prev->num_fragments == num_fragments) {
      TRACE_BUFFER_DLOG("  skipping recommit of identical chunk");
      return;
    }

    // If we've already started reading from chunk N+1 following this chunk N,
    // don't override chunk N. Otherwise we may end up reading a packet from
    // chunk N after having read from chunk N+1, thereby violating sequential
    // read of packets. This shouldn't happen if the producer is well-behaved,
    // because it shouldn't start chunk N+1 before completing chunk N.
    ChunkMeta::Key subsequent_key = key;
    static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
                  "ChunkID wraps");
    subsequent_key.chunk_id++;
    const auto subsequent_it = index_.find(subsequent_key);
    if (subsequent_it != index_.end() &&
        subsequent_it->second.num_fragments_read > 0) {
      stats_.set_abi_violations(stats_.abi_violations() + 1);
      PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
      return;
    }

    // We should not have read past the last packet.
    if (record_meta->num_fragments_read > prev->num_fragments) {
      PERFETTO_ELOG(
          "TraceBuffer read too many fragments from an incomplete chunk");
      PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
      return;
    }

    uint8_t* wptr = reinterpret_cast<uint8_t*>(prev);
    TRACE_BUFFER_DLOG("  overriding chunk @ %" PRIdPTR ", size=%zu", wptr - begin(),
                      record_size);

    // Update chunk meta data stored in the index, as it may have changed.
    record_meta->num_fragments = num_fragments;
    record_meta->flags = chunk_flags;
    record_meta->set_complete(chunk_complete);

    // Override the ChunkRecord contents at the original |wptr|.
    TRACE_BUFFER_DLOG("  copying @ [%" PRIdPTR " - %" PRIdPTR "] %zu", wptr - begin(),
                      uintptr_t(wptr - begin()) + record_size, record_size);
    WriteChunkRecord(wptr, record, src, size);
    TRACE_BUFFER_DLOG("Chunk raw: %s",
                      base::HexDump(wptr, record_size).c_str());
    stats_.set_chunks_rewritten(stats_.chunks_rewritten() + 1);
    return;
  }

  if (PERFETTO_UNLIKELY(discard_writes_))
    return DiscardWrite();

  // If there isn't enough room from the given write position. Write a padding
  // record to clear the end of the buffer and wrap back.
  const size_t cached_size_to_end = size_to_end();
  if (PERFETTO_UNLIKELY(record_size > cached_size_to_end)) {
    ssize_t res = DeleteNextChunksFor(cached_size_to_end);
    if (res == -1)
      return DiscardWrite();
    PERFETTO_DCHECK(static_cast<size_t>(res) <= cached_size_to_end);
    AddPaddingRecord(cached_size_to_end);
    wptr_ = begin();
    stats_.set_write_wrap_count(stats_.write_wrap_count() + 1);
    PERFETTO_DCHECK(size_to_end() >= record_size);
  }

  // At this point either |wptr_| points to an untouched part of the buffer
  // (i.e. *wptr_ == 0) or we are about to overwrite one or more ChunkRecord(s).
  // In the latter case we need to first figure out where the next valid
  // ChunkRecord is (if it exists) and add padding between the new record.
  // Example ((w) == write cursor):
  //
  // Initial state (wtpr_ == 0):
  // |0 (w)    |10               |30                  |50
  // +---------+-----------------+--------------------+--------------------+
  // | Chunk 1 | Chunk 2         | Chunk 3            | Chunk 4            |
  // +---------+-----------------+--------------------+--------------------+
  //
  // Let's assume we now want now write a 5th Chunk of size == 35. The final
  // state should look like this:
  // |0                                |35 (w)         |50
  // +---------------------------------+---------------+--------------------+
  // | Chunk 5                         | Padding Chunk | Chunk 4            |
  // +---------------------------------+---------------+--------------------+

  // Deletes all chunks from |wptr_| to |wptr_| + |record_size|.
  ssize_t del_res = DeleteNextChunksFor(record_size);
  if (del_res == -1)
    return DiscardWrite();
  size_t padding_size = static_cast<size_t>(del_res);

  // Now first insert the new chunk. At the end, if necessary, add the padding.
  stats_.set_chunks_written(stats_.chunks_written() + 1);
  stats_.set_bytes_written(stats_.bytes_written() + record_size);

  uint32_t chunk_off = GetOffset(GetChunkRecordAt(wptr_));
  auto it_and_inserted = index_.emplace(
      key, ChunkMeta(chunk_off, num_fragments, chunk_complete, chunk_flags,
                     producer_uid_trusted, producer_pid_trusted));
  PERFETTO_DCHECK(it_and_inserted.second);
  TRACE_BUFFER_DLOG("  copying @ [%" PRIdPTR " - %" PRIdPTR "] %zu", wptr_ - begin(),
                    uintptr_t(wptr_ - begin()) + record_size, record_size);
  WriteChunkRecord(wptr_, record, src, size);
  TRACE_BUFFER_DLOG("Chunk raw: %s", base::HexDump(wptr_, record_size).c_str());
  wptr_ += record_size;
  if (wptr_ >= end()) {
    PERFETTO_DCHECK(padding_size == 0);
    wptr_ = begin();
    stats_.set_write_wrap_count(stats_.write_wrap_count() + 1);
  }
  DcheckIsAlignedAndWithinBounds(wptr_);

  // Chunks may be received out of order, so only update last_chunk_id if the
  // new chunk_id is larger. But take into account overflows by only selecting
  // the new ID if its distance to the latest ID is smaller than half the number
  // space.
  //
  // This accounts for both the case where the new ID has just overflown and
  // last_chunk_id be updated even though it's smaller (e.g. |chunk_id| = 1 and
  // |last_chunk_id| = kMaxChunkId; chunk_id - last_chunk_id = 0) and the case
  // where the new ID is an out-of-order ID right after an overflow and
  // last_chunk_id shouldn't be updated even though it's larger (e.g. |chunk_id|
  // = kMaxChunkId and |last_chunk_id| = 1; chunk_id - last_chunk_id =
  // kMaxChunkId - 1).
  auto producer_and_writer_id = std::make_pair(producer_id_trusted, writer_id);
  ChunkID& last_chunk_id = last_chunk_id_written_[producer_and_writer_id];
  static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
                "This code assumes that ChunkID wraps at kMaxChunkID");
  if (chunk_id - last_chunk_id < kMaxChunkID / 2) {
    last_chunk_id = chunk_id;
  } else {
    stats_.set_chunks_committed_out_of_order(
        stats_.chunks_committed_out_of_order() + 1);
  }

  if (padding_size)
    AddPaddingRecord(padding_size);
}

ssize_t TraceBuffer::DeleteNextChunksFor(size_t bytes_to_clear) {
  PERFETTO_CHECK(!discard_writes_);

  // Find the position of the first chunk which begins at or after
  // (|wptr_| + |bytes|). Note that such a chunk might not exist and we might
  // either reach the end of the buffer or a zeroed region of the buffer.
  uint8_t* next_chunk_ptr = wptr_;
  uint8_t* search_end = wptr_ + bytes_to_clear;
  TRACE_BUFFER_DLOG("Delete [%zu %zu]", wptr_ - begin(), search_end - begin());
  DcheckIsAlignedAndWithinBounds(wptr_);
  PERFETTO_DCHECK(search_end <= end());
  std::vector<ChunkMap::iterator> index_delete;
  uint64_t chunks_overwritten = stats_.chunks_overwritten();
  uint64_t bytes_overwritten = stats_.bytes_overwritten();
  uint64_t padding_bytes_cleared = stats_.padding_bytes_cleared();
  while (next_chunk_ptr < search_end) {
    const ChunkRecord& next_chunk = *GetChunkRecordAt(next_chunk_ptr);
    TRACE_BUFFER_DLOG(
        "  scanning chunk [%zu %zu] (valid=%d)", next_chunk_ptr - begin(),
        next_chunk_ptr - begin() + next_chunk.size, next_chunk.is_valid());

    // We just reached the untouched part of the buffer, it's going to be all
    // zeroes from here to end().
    // Optimization: if during Initialize() we fill the buffer with padding
    // records we could get rid of this branch.
    if (PERFETTO_UNLIKELY(!next_chunk.is_valid())) {
      // This should happen only at the first iteration. The zeroed area can
      // only begin precisely at the |wptr_|, not after. Otherwise it means that
      // we wrapped but screwed up the ChunkRecord chain.
      PERFETTO_DCHECK(next_chunk_ptr == wptr_);
      return 0;
    }

    // Remove |next_chunk| from the index, unless it's a padding record (padding
    // records are not part of the index).
    if (PERFETTO_LIKELY(!next_chunk.is_padding)) {
      ChunkMeta::Key key(next_chunk);
      auto it = index_.find(key);
      bool will_remove = false;
      if (PERFETTO_LIKELY(it != index_.end())) {
        const ChunkMeta& meta = it->second;
        if (PERFETTO_UNLIKELY(meta.num_fragments_read < meta.num_fragments)) {
          if (overwrite_policy_ == kDiscard)
            return -1;
          chunks_overwritten++;
          bytes_overwritten += next_chunk.size;
        }
        index_delete.push_back(it);
        will_remove = true;
      }
      TRACE_BUFFER_DLOG(
          "  del index {%" PRIu32 ",%" PRIu32 ",%u} @ [%" PRIdPTR " - %" PRIdPTR "] %d",
          key.producer_id, key.writer_id, key.chunk_id,
          next_chunk_ptr - begin(), next_chunk_ptr - begin() + next_chunk.size,
          will_remove);
      PERFETTO_DCHECK(will_remove);
    } else {
      padding_bytes_cleared += next_chunk.size;
    }

    next_chunk_ptr += next_chunk.size;

    // We should never hit this, unless we managed to screw up while writing
    // to the buffer and breaking the ChunkRecord(s) chain.
    // TODO(primiano): Write more meaningful logging with the status of the
    // buffer, to get more actionable bugs in case we hit this.
    PERFETTO_CHECK(next_chunk_ptr <= end());
  }

  // Remove from the index.
  for (auto it : index_delete) {
    index_.erase(it);
  }
  stats_.set_chunks_overwritten(chunks_overwritten);
  stats_.set_bytes_overwritten(bytes_overwritten);
  stats_.set_padding_bytes_cleared(padding_bytes_cleared);

  PERFETTO_DCHECK(next_chunk_ptr >= search_end && next_chunk_ptr <= end());
  return static_cast<ssize_t>(next_chunk_ptr - search_end);
}

void TraceBuffer::AddPaddingRecord(size_t size) {
  PERFETTO_DCHECK(size >= sizeof(ChunkRecord) && size <= ChunkRecord::kMaxSize);
  ChunkRecord record(size);
  record.is_padding = 1;
  TRACE_BUFFER_DLOG("AddPaddingRecord @ [%" PRIdPTR " - %" PRIdPTR "] %zu", wptr_ - begin(),
                    uintptr_t(wptr_ - begin()) + size, size);
  WriteChunkRecord(wptr_, record, nullptr, size - sizeof(ChunkRecord));
  stats_.set_padding_bytes_written(stats_.padding_bytes_written() + size);
  // |wptr_| is deliberately not advanced when writing a padding record.
}

bool TraceBuffer::TryPatchChunkContents(ProducerID producer_id,
                                        WriterID writer_id,
                                        ChunkID chunk_id,
                                        const Patch* patches,
                                        size_t patches_size,
                                        bool other_patches_pending) {
  PERFETTO_CHECK(!read_only_);
  ChunkMeta::Key key(producer_id, writer_id, chunk_id);
  auto it = index_.find(key);
  if (it == index_.end()) {
    stats_.set_patches_failed(stats_.patches_failed() + 1);
    return false;
  }
  ChunkMeta& chunk_meta = it->second;

  // Check that the index is consistent with the actual ProducerID/WriterID
  // stored in the ChunkRecord.

  ChunkRecord* chunk_record = GetChunkRecordAt(begin() + chunk_meta.record_off);
  PERFETTO_DCHECK(ChunkMeta::Key(*chunk_record) == key);
  uint8_t* chunk_begin = reinterpret_cast<uint8_t*>(chunk_record);
  PERFETTO_DCHECK(chunk_begin >= begin());
  uint8_t* chunk_end = chunk_begin + chunk_record->size;
  PERFETTO_DCHECK(chunk_end <= end());

  static_assert(Patch::kSize == SharedMemoryABI::kPacketHeaderSize,
                "Patch::kSize out of sync with SharedMemoryABI");

  for (size_t i = 0; i < patches_size; i++) {
    uint8_t* ptr =
        chunk_begin + sizeof(ChunkRecord) + patches[i].offset_untrusted;
    TRACE_BUFFER_DLOG("PatchChunk {%" PRIu32 ",%" PRIu32
                      ",%u} size=%zu @ %zu with {%02x %02x %02x %02x} cur "
                      "{%02x %02x %02x %02x}",
                      producer_id, writer_id, chunk_id, chunk_end - chunk_begin,
                      patches[i].offset_untrusted, patches[i].data[0],
                      patches[i].data[1], patches[i].data[2],
                      patches[i].data[3], ptr[0], ptr[1], ptr[2], ptr[3]);
    if (ptr < chunk_begin + sizeof(ChunkRecord) ||
        ptr > chunk_end - Patch::kSize) {
      // Either the IPC was so slow and in the meantime the writer managed to
      // wrap over |chunk_id| or the producer sent a malicious IPC.
      stats_.set_patches_failed(stats_.patches_failed() + 1);
      return false;
    }

    memcpy(ptr, &patches[i].data[0], Patch::kSize);
  }
  TRACE_BUFFER_DLOG("Chunk raw (after patch): %s",
                    base::HexDump(chunk_begin, chunk_record->size).c_str());

  stats_.set_patches_succeeded(stats_.patches_succeeded() + patches_size);
  if (!other_patches_pending) {
    chunk_meta.flags &= ~kChunkNeedsPatching;
    chunk_record->flags = chunk_meta.flags;
  }
  return true;
}

void TraceBuffer::BeginRead() {
  read_iter_ = GetReadIterForSequence(index_.begin());
#if PERFETTO_DCHECK_IS_ON()
  changed_since_last_read_ = false;
#endif
}

TraceBuffer::SequenceIterator TraceBuffer::GetReadIterForSequence(
    ChunkMap::iterator seq_begin) {
  SequenceIterator iter;
  iter.seq_begin = seq_begin;
  if (seq_begin == index_.end()) {
    iter.cur = iter.seq_end = index_.end();
    return iter;
  }

#if PERFETTO_DCHECK_IS_ON()
  // Either |seq_begin| is == index_.begin() or the item immediately before must
  // belong to a different {ProducerID, WriterID} sequence.
  if (seq_begin != index_.begin() && seq_begin != index_.end()) {
    auto prev_it = seq_begin;
    prev_it--;
    PERFETTO_DCHECK(
        seq_begin == index_.begin() ||
        std::tie(prev_it->first.producer_id, prev_it->first.writer_id) <
            std::tie(seq_begin->first.producer_id, seq_begin->first.writer_id));
  }
#endif

  // Find the first entry that has a greater {ProducerID, WriterID} (or just
  // index_.end() if we reached the end).
  ChunkMeta::Key key = seq_begin->first;  // Deliberate copy.
  key.chunk_id = kMaxChunkID;
  iter.seq_end = index_.upper_bound(key);
  PERFETTO_DCHECK(iter.seq_begin != iter.seq_end);

  // Now find the first entry between [seq_begin, seq_end) that is
  // > last_chunk_id_written_. This is where we the sequence will start (see
  // notes about wrapping of IDs in the header).
  auto producer_and_writer_id = std::make_pair(key.producer_id, key.writer_id);
  PERFETTO_DCHECK(last_chunk_id_written_.count(producer_and_writer_id));
  iter.wrapping_id = last_chunk_id_written_[producer_and_writer_id];
  key.chunk_id = iter.wrapping_id;
  iter.cur = index_.upper_bound(key);
  if (iter.cur == iter.seq_end)
    iter.cur = iter.seq_begin;
  return iter;
}

void TraceBuffer::SequenceIterator::MoveNext() {
  // Stop iterating when we reach the end of the sequence.
  // Note: |seq_begin| might be == |seq_end|.
  if (cur == seq_end || cur->first.chunk_id == wrapping_id) {
    cur = seq_end;
    return;
  }

  // If the current chunk wasn't completed yet, we shouldn't advance past it as
  // it may be rewritten with additional packets.
  if (!cur->second.is_complete()) {
    cur = seq_end;
    return;
  }

  ChunkID last_chunk_id = cur->first.chunk_id;
  if (++cur == seq_end)
    cur = seq_begin;

  // There may be a missing chunk in the sequence of chunks, in which case the
  // next chunk's ID won't follow the last one's. If so, skip the rest of the
  // sequence. We'll return to it later once the hole is filled.
  if (last_chunk_id + 1 != cur->first.chunk_id)
    cur = seq_end;
}

bool TraceBuffer::ReadNextTracePacket(
    TracePacket* packet,
    PacketSequenceProperties* sequence_properties,
    bool* previous_packet_on_sequence_dropped) {
  // Note: MoveNext() moves only within the next chunk within the same
  // {ProducerID, WriterID} sequence. Here we want to:
  // - return the next patched+complete packet in the current sequence, if any.
  // - return the first patched+complete packet in the next sequence, if any.
  // - return false if none of the above is found.
  TRACE_BUFFER_DLOG("ReadNextTracePacket()");

  // Just in case we forget to initialize these below.
  *sequence_properties = {0, kInvalidUid, base::kInvalidPid, 0};
  *previous_packet_on_sequence_dropped = false;

  // At the start of each sequence iteration, we consider the last read packet
  // dropped. While iterating over the chunks in the sequence, we update this
  // flag based on our knowledge about the last packet that was read from each
  // chunk (|last_read_packet_skipped| in ChunkMeta).
  bool previous_packet_dropped = true;

#if PERFETTO_DCHECK_IS_ON()
  PERFETTO_DCHECK(!changed_since_last_read_);
#endif
  for (;; read_iter_.MoveNext()) {
    if (PERFETTO_UNLIKELY(!read_iter_.is_valid())) {
      // We ran out of chunks in the current {ProducerID, WriterID} sequence or
      // we just reached the index_.end().

      if (PERFETTO_UNLIKELY(read_iter_.seq_end == index_.end()))
        return false;

      // We reached the end of sequence, move to the next one.
      // Note: ++read_iter_.seq_end might become index_.end(), but
      // GetReadIterForSequence() knows how to deal with that.
      read_iter_ = GetReadIterForSequence(read_iter_.seq_end);
      PERFETTO_DCHECK(read_iter_.is_valid() && read_iter_.cur != index_.end());
      previous_packet_dropped = true;
    }

    ChunkMeta* chunk_meta = &*read_iter_;

    // If the chunk has holes that are awaiting to be patched out-of-band,
    // skip the current sequence and move to the next one.
    if (chunk_meta->flags & kChunkNeedsPatching) {
      read_iter_.MoveToEnd();
      continue;
    }

    const ProducerID trusted_producer_id = read_iter_.producer_id();
    const WriterID writer_id = read_iter_.writer_id();
    const ProducerAndWriterID producer_and_writer_id =
        MkProducerAndWriterID(trusted_producer_id, writer_id);
    const uid_t trusted_uid = chunk_meta->trusted_uid;
    const pid_t trusted_pid = chunk_meta->trusted_pid;

    // At this point we have a chunk in |chunk_meta| that has not been fully
    // read. We don't know yet whether we have enough data to read the full
    // packet (in the case it's fragmented over several chunks) and we are about
    // to find that out. Specifically:
    // A) If the first fragment is unread and is a fragment continuing from a
    //    previous chunk, it means we have missed the previous ChunkID. In
    //    fact, if this wasn't the case, a previous call to ReadNext() shouldn't
    //    have moved the cursor to this chunk.
    // B) Any fragment > 0 && < last is always readable. By definition an inner
    //    packet is never fragmented and hence doesn't require neither stitching
    //    nor any out-of-band patching. The same applies to the last packet
    //    iff it doesn't continue on the next chunk.
    // C) If the last packet (which might be also the only packet in the chunk)
    //    is a fragment and continues on the next chunk, we peek at the next
    //    chunks and, if we have all of them, mark as read and move the cursor.
    //
    // +---------------+   +-------------------+  +---------------+
    // | ChunkID: 1    |   | ChunkID: 2        |  | ChunkID: 3    |
    // |---------------+   +-------------------+  +---------------+
    // | Packet 1      |   |                   |  | ... Packet 3  |
    // | Packet 2      |   | ... Packet 3  ... |  | Packet 4      |
    // | Packet 3  ... |   |                   |  | Packet 5 ...  |
    // +---------------+   +-------------------+  +---------------+

    PERFETTO_DCHECK(chunk_meta->num_fragments_read <=
                    chunk_meta->num_fragments);

    // If we didn't read any packets from this chunk, the last packet was from
    // the previous chunk we iterated over; so don't update
    // |previous_packet_dropped| in this case.
    if (chunk_meta->num_fragments_read > 0)
      previous_packet_dropped = chunk_meta->last_read_packet_skipped();

    while (chunk_meta->num_fragments_read < chunk_meta->num_fragments) {
      enum { kSkip = 0, kReadOnePacket, kTryReadAhead } action;
      if (chunk_meta->num_fragments_read == 0) {
        if (chunk_meta->flags & kFirstPacketContinuesFromPrevChunk) {
          action = kSkip;  // Case A.
        } else if (chunk_meta->num_fragments == 1 &&
                   (chunk_meta->flags & kLastPacketContinuesOnNextChunk)) {
          action = kTryReadAhead;  // Case C.
        } else {
          action = kReadOnePacket;  // Case B.
        }
      } else if (chunk_meta->num_fragments_read <
                     chunk_meta->num_fragments - 1 ||
                 !(chunk_meta->flags & kLastPacketContinuesOnNextChunk)) {
        action = kReadOnePacket;  // Case B.
      } else {
        action = kTryReadAhead;  // Case C.
      }

      TRACE_BUFFER_DLOG("  chunk %u, packet %hu of %hu, action=%d",
                        read_iter_.chunk_id(), chunk_meta->num_fragments_read,
                        chunk_meta->num_fragments, action);

      if (action == kSkip) {
        // This fragment will be skipped forever, not just in this ReadPacket()
        // iteration. This happens by virtue of ReadNextPacketInChunk()
        // incrementing the |num_fragments_read| and marking the fragment as
        // read even if we didn't really.
        ReadNextPacketInChunk(producer_and_writer_id, chunk_meta, nullptr);
        chunk_meta->set_last_read_packet_skipped(true);
        previous_packet_dropped = true;
        continue;
      }

      if (action == kReadOnePacket) {
        // The easy peasy case B.
        ReadPacketResult result =
            ReadNextPacketInChunk(producer_and_writer_id, chunk_meta, packet);

        if (PERFETTO_LIKELY(result == ReadPacketResult::kSucceeded)) {
          *sequence_properties = {trusted_producer_id, trusted_uid, trusted_pid,
                                  writer_id};
          *previous_packet_on_sequence_dropped = previous_packet_dropped;
          return true;
        } else if (result == ReadPacketResult::kFailedEmptyPacket) {
          // We can ignore and skip empty packets.
          PERFETTO_DCHECK(packet->slices().empty());
          continue;
        }

        // In extremely rare cases (producer bugged / malicious) the chunk might
        // contain an invalid fragment. In such case we don't want to stall the
        // sequence but just skip the chunk and move on. ReadNextPacketInChunk()
        // marks the chunk as fully read, so we don't attempt to read from it
        // again in a future call to ReadBuffers(). It also already records an
        // abi violation for this.
        PERFETTO_DCHECK(result == ReadPacketResult::kFailedInvalidPacket);
        chunk_meta->set_last_read_packet_skipped(true);
        previous_packet_dropped = true;
        break;
      }

      PERFETTO_DCHECK(action == kTryReadAhead);
      ReadAheadResult ra_res = ReadAhead(packet);
      if (ra_res == ReadAheadResult::kSucceededReturnSlices) {
        stats_.set_readaheads_succeeded(stats_.readaheads_succeeded() + 1);
        *sequence_properties = {trusted_producer_id, trusted_uid, trusted_pid,
                                writer_id};
        *previous_packet_on_sequence_dropped = previous_packet_dropped;
        return true;
      }

      if (ra_res == ReadAheadResult::kFailedMoveToNextSequence) {
        // readahead didn't find a contiguous packet sequence. We'll try again
        // on the next ReadPacket() call.
        stats_.set_readaheads_failed(stats_.readaheads_failed() + 1);

        // TODO(primiano): optimization: this MoveToEnd() is the reason why
        // MoveNext() (that is called in the outer for(;;MoveNext)) needs to
        // deal gracefully with the case of |cur|==|seq_end|. Maybe we can do
        // something to avoid that check by reshuffling the code here?
        read_iter_.MoveToEnd();

        // This break will go back to beginning of the for(;;MoveNext()). That
        // will move to the next sequence because we set the read iterator to
        // its end.
        break;
      }

      PERFETTO_DCHECK(ra_res == ReadAheadResult::kFailedStayOnSameSequence);

      // In this case ReadAhead() might advance |read_iter_|, so we need to
      // re-cache the |chunk_meta| pointer to point to the current chunk.
      chunk_meta = &*read_iter_;
      chunk_meta->set_last_read_packet_skipped(true);
      previous_packet_dropped = true;
    }  // while(...)  [iterate over packet fragments for the current chunk].
  }    // for(;;MoveNext()) [iterate over chunks].
}

TraceBuffer::ReadAheadResult TraceBuffer::ReadAhead(TracePacket* packet) {
  static_assert(static_cast<ChunkID>(kMaxChunkID + 1) == 0,
                "relying on kMaxChunkID to wrap naturally");
  TRACE_BUFFER_DLOG(" readahead start @ chunk %u", read_iter_.chunk_id());
  ChunkID next_chunk_id = read_iter_.chunk_id() + 1;
  SequenceIterator it = read_iter_;
  for (it.MoveNext(); it.is_valid(); it.MoveNext(), next_chunk_id++) {
    // We should stay within the same sequence while iterating here.
    PERFETTO_DCHECK(it.producer_id() == read_iter_.producer_id() &&
                    it.writer_id() == read_iter_.writer_id());

    TRACE_BUFFER_DLOG("   expected chunk ID: %u, actual ID: %u", next_chunk_id,
                      it.chunk_id());

    if (PERFETTO_UNLIKELY((*it).num_fragments == 0))
      continue;

    // If we miss the next chunk, stop looking in the current sequence and
    // try another sequence. This chunk might come in the near future.
    // The second condition is the edge case of a buggy/malicious
    // producer. The ChunkID is contiguous but its flags don't make sense.
    if (it.chunk_id() != next_chunk_id ||
        PERFETTO_UNLIKELY(
            !((*it).flags & kFirstPacketContinuesFromPrevChunk))) {
      return ReadAheadResult::kFailedMoveToNextSequence;
    }

    // If the chunk is contiguous but has not been patched yet move to the next
    // sequence and try coming back here on the next ReadNextTracePacket() call.
    // TODO(primiano): add a test to cover this, it's a subtle case.
    if ((*it).flags & kChunkNeedsPatching)
      return ReadAheadResult::kFailedMoveToNextSequence;

    // This is the case of an intermediate chunk which contains only one
    // fragment which continues on the next chunk. This is the case for large
    // packets, e.g.: [Packet0, Packet1(0)] [Packet1(1)] [Packet1(2), ...]
    // (Packet1(X) := fragment X of Packet1).
    if ((*it).num_fragments == 1 &&
        ((*it).flags & kLastPacketContinuesOnNextChunk)) {
      continue;
    }

    // We made it! We got all fragments for the packet without holes.
    TRACE_BUFFER_DLOG("  readahead success @ chunk %u", it.chunk_id());
    PERFETTO_DCHECK(((*it).num_fragments == 1 &&
                     !((*it).flags & kLastPacketContinuesOnNextChunk)) ||
                    (*it).num_fragments > 1);

    // Now let's re-iterate over the [read_iter_, it] sequence and mark
    // all the fragments as read.
    bool packet_corruption = false;
    for (;;) {
      PERFETTO_DCHECK(read_iter_.is_valid());
      TRACE_BUFFER_DLOG("    commit chunk %u", read_iter_.chunk_id());
      if (PERFETTO_LIKELY((*read_iter_).num_fragments > 0)) {
        // In the unlikely case of a corrupted packet (corrupted or empty
        // fragment), invalidate the all stitching and move on to the next chunk
        // in the same sequence, if any.
        auto pw_id = MkProducerAndWriterID(it.producer_id(), it.writer_id());
        packet_corruption |=
            ReadNextPacketInChunk(pw_id, &*read_iter_, packet) ==
            ReadPacketResult::kFailedInvalidPacket;
      }
      if (read_iter_.cur == it.cur)
        break;
      read_iter_.MoveNext();
    }  // for(;;)
    PERFETTO_DCHECK(read_iter_.cur == it.cur);

    if (PERFETTO_UNLIKELY(packet_corruption)) {
      // ReadNextPacketInChunk() already records an abi violation for this case.
      *packet = TracePacket();  // clear.
      return ReadAheadResult::kFailedStayOnSameSequence;
    }

    return ReadAheadResult::kSucceededReturnSlices;
  }  // for(it...)  [readahead loop]
  return ReadAheadResult::kFailedMoveToNextSequence;
}

TraceBuffer::ReadPacketResult TraceBuffer::ReadNextPacketInChunk(
    ProducerAndWriterID producer_and_writer_id,
    ChunkMeta* const chunk_meta,
    TracePacket* packet) {
  PERFETTO_DCHECK(chunk_meta->num_fragments_read < chunk_meta->num_fragments);
  PERFETTO_DCHECK(!(chunk_meta->flags & kChunkNeedsPatching));

  const uint8_t* record_begin = begin() + chunk_meta->record_off;
  DcheckIsAlignedAndWithinBounds(record_begin);
  auto* chunk_record = reinterpret_cast<const ChunkRecord*>(record_begin);
  const uint8_t* record_end = record_begin + chunk_record->size;
  const uint8_t* packets_begin = record_begin + sizeof(ChunkRecord);
  const uint8_t* packet_begin = packets_begin + chunk_meta->cur_fragment_offset;

  if (PERFETTO_UNLIKELY(packet_begin < packets_begin ||
                        packet_begin >= record_end)) {
    // The producer has a bug or is malicious and did declare that the chunk
    // contains more packets beyond its boundaries.
    stats_.set_abi_violations(stats_.abi_violations() + 1);
    PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
    chunk_meta->cur_fragment_offset = 0;
    chunk_meta->num_fragments_read = chunk_meta->num_fragments;
    if (PERFETTO_LIKELY(chunk_meta->is_complete())) {
      stats_.set_chunks_read(stats_.chunks_read() + 1);
      stats_.set_bytes_read(stats_.bytes_read() + chunk_record->size);
    }
    return ReadPacketResult::kFailedInvalidPacket;
  }

  // A packet (or a fragment) starts with a varint stating its size, followed
  // by its content. The varint shouldn't be larger than 4 bytes (just in case
  // the producer is using a redundant encoding)
  uint64_t packet_size = 0;
  const uint8_t* header_end =
      std::min(packet_begin + protozero::proto_utils::kMessageLengthFieldSize,
               record_end);
  const uint8_t* packet_data = protozero::proto_utils::ParseVarInt(
      packet_begin, header_end, &packet_size);

  const uint8_t* next_packet = packet_data + packet_size;
  if (PERFETTO_UNLIKELY(next_packet <= packet_begin ||
                        next_packet > record_end)) {
    // In BufferExhaustedPolicy::kDrop mode, TraceWriter may abort a fragmented
    // packet by writing an invalid size in the last fragment's header. We
    // should handle this case without recording an ABI violation (since Android
    // R).
    if (packet_size != SharedMemoryABI::kPacketSizeDropPacket) {
      stats_.set_abi_violations(stats_.abi_violations() + 1);
      PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
    } else {
      stats_.set_trace_writer_packet_loss(stats_.trace_writer_packet_loss() +
                                          1);
    }
    chunk_meta->cur_fragment_offset = 0;
    chunk_meta->num_fragments_read = chunk_meta->num_fragments;
    if (PERFETTO_LIKELY(chunk_meta->is_complete())) {
      stats_.set_chunks_read(stats_.chunks_read() + 1);
      stats_.set_bytes_read(stats_.bytes_read() + chunk_record->size);
    }
    return ReadPacketResult::kFailedInvalidPacket;
  }

  chunk_meta->cur_fragment_offset =
      static_cast<uint16_t>(next_packet - packets_begin);
  chunk_meta->num_fragments_read++;

  if (PERFETTO_UNLIKELY(chunk_meta->num_fragments_read ==
                            chunk_meta->num_fragments &&
                        chunk_meta->is_complete())) {
    stats_.set_chunks_read(stats_.chunks_read() + 1);
    stats_.set_bytes_read(stats_.bytes_read() + chunk_record->size);
    auto* writer_stats = writer_stats_.Insert(producer_and_writer_id, {}).first;
    writer_stats->used_chunk_hist.Add(chunk_meta->cur_fragment_offset);
  } else {
    // We have at least one more packet to parse. It should be within the chunk.
    if (chunk_meta->cur_fragment_offset + sizeof(ChunkRecord) >=
        chunk_record->size) {
      PERFETTO_DCHECK(suppress_client_dchecks_for_testing_);
    }
  }

  chunk_meta->set_last_read_packet_skipped(false);

  if (PERFETTO_UNLIKELY(packet_size == 0))
    return ReadPacketResult::kFailedEmptyPacket;

  if (PERFETTO_LIKELY(packet))
    packet->AddSlice(packet_data, static_cast<size_t>(packet_size));

  return ReadPacketResult::kSucceeded;
}

void TraceBuffer::DiscardWrite() {
  PERFETTO_DCHECK(overwrite_policy_ == kDiscard);
  discard_writes_ = true;
  stats_.set_chunks_discarded(stats_.chunks_discarded() + 1);
  TRACE_BUFFER_DLOG("  discarding write");
}

std::unique_ptr<TraceBuffer> TraceBuffer::CloneReadOnly() const {
  std::unique_ptr<TraceBuffer> buf(new TraceBuffer(CloneCtor(), *this));
  if (!buf->data_.IsValid())
    return nullptr;  // PagedMemory::Allocate() failed. We are out of memory.
  return buf;
}

TraceBuffer::TraceBuffer(CloneCtor, const TraceBuffer& src)
    : overwrite_policy_(src.overwrite_policy_),
      read_only_(true),
      discard_writes_(src.discard_writes_) {
  if (!Initialize(src.data_.size()))
    return;  // TraceBuffer::Clone() will check |data_| and return nullptr.

  // The assignments below must be done after Initialize().

  data_.EnsureCommitted(data_.size());
  memcpy(data_.Get(), src.data_.Get(), src.data_.size());
  last_chunk_id_written_ = src.last_chunk_id_written_;

  stats_ = src.stats_;
  stats_.set_bytes_read(0);
  stats_.set_chunks_read(0);
  stats_.set_readaheads_failed(0);
  stats_.set_readaheads_succeeded(0);

  // Copy the index of chunk metadata and reset the read states.
  index_ = ChunkMap(src.index_);
  for (auto& kv : index_) {
    ChunkMeta& chunk_meta = kv.second;
    chunk_meta.num_fragments_read = 0;
    chunk_meta.cur_fragment_offset = 0;
    chunk_meta.set_last_read_packet_skipped(false);
  }
  read_iter_ = SequenceIterator();
}

}  // namespace perfetto