aboutsummaryrefslogtreecommitdiff
path: root/pw_multisink/public/pw_multisink/multisink.h
blob: 6627840d3d28dad3dfb889675f30b83b56a3e5ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once

#include <limits>
#include <mutex>

#include "pw_bytes/span.h"
#include "pw_function/function.h"
#include "pw_multisink/config.h"
#include "pw_result/result.h"
#include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"

namespace pw {
namespace multisink {

// An asynchronous single-writer multi-reader queue that ensures readers can
// poll for dropped message counts, which is useful for logging or similar
// scenarios where readers need to be aware of the input message sequence.
//
// This class is thread-safe but NOT IRQ-safe when
// PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
class MultiSink {
 public:
  // An asynchronous reader which is attached to a MultiSink via AttachDrain.
  // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
  // entry sequence information for clients when popping.
  class Drain {
   public:
    // Holds the context for a peeked entry, tha the user may pass to `PopEntry`
    // to advance the drain.
    class PeekedEntry {
     public:
      // Provides access to the peeked entry's data.
      ConstByteSpan entry() const { return entry_; }

     private:
      friend MultiSink;
      friend MultiSink::Drain;

      constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id)
          : entry_(entry), sequence_id_(sequence_id) {}

      uint32_t sequence_id() const { return sequence_id_; }

      const ConstByteSpan entry_;
      const uint32_t sequence_id_;
    };

    constexpr Drain()
        : last_handled_sequence_id_(0),
          last_peek_sequence_id_(0),
          multisink_(nullptr) {}

    // Returns the next available entry if it exists and acquires the latest
    // drop count in parallel.
    //
    // The `drop_count_out` is set to the number of entries that were dropped
    // since the last call to PopEntry, if the read operation was successful or
    // returned OutOfRange (i.e. no entries to read). Otherwise, it is set to
    // zero, so should always be processed.
    //
    // Drop counts are internally maintained with a 32-bit counter. If
    // UINT32_MAX entries have been handled by the attached multisink between
    // subsequent calls to PopEntry, the drop count will overflow and will
    // report a lower count erroneously. Users should ensure that sinks call
    // PopEntry at least once every UINT32_MAX entries.
    //
    // Example Usage:
    //
    // void ProcessEntriesFromDrain(Drain& drain) {
    //   std::array<std::byte, kEntryBufferSize> buffer;
    //   uint32_t drop_count = 0;
    //
    //   // Example#1: Request the drain for a new entry.
    //   {
    //     const Result<ConstByteSpan> result = drain.PopEntry(buffer,
    //                                                         drop_count);
    //
    //     // If a non-zero drop count is received, process them.
    //     if (drop_count > 0) {
    //       ProcessDropCount(drop_count);
    //     }
    //
    //     // If the call was successful, process the entry that was received.
    //     if (result.ok()) {
    //       ProcessEntry(result.value());
    //     }
    //   }
    //
    //   // Example#2: Drain out all messages.
    //   {
    //     Result<ConstByteSpan> result = Status::OutOfRange();
    //     do {
    //       result = drain.PopEntry(buffer, drop_count);
    //
    //       if (drop_count > 0) {
    //         ProcessDropCount(drop_count);
    //       }
    //
    //       if (result.ok()) {
    //         ProcessEntry(result.value());
    //       }
    //
    //       // Keep trying until we hit OutOfRange. Note that a new entry may
    //       // have arrived after the PopEntry call.
    //     } while (!result.IsOutOfRange());
    //   }
    // }
    // Precondition: the buffer data must not be corrupt, otherwise there will
    // be a crash.
    //
    // Return values:
    // OK - An entry was successfully read from the multisink.
    // OUT_OF_RANGE - No entries were available.
    // FAILED_PRECONDITION - The drain must be attached to a sink.
    // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
    // the next available entry, which was discarded.
    Result<ConstByteSpan> PopEntry(ByteSpan buffer, uint32_t& drop_count_out)
        PW_LOCKS_EXCLUDED(multisink_->lock_);

    // Removes the previously peeked entry from the multisink.
    //
    // Example Usage:
    //
    //  // Peek entry to send it, and remove entry from multisink on success.
    //  uint32_t drop_count;
    //  const Result<PeekedEntry> peek_result =
    //      PeekEntry(out_buffer, drop_count);
    //  if (!peek_result.ok()) {
    //    return peek_result.status();
    //  }
    //  Status send_status = UserSendFunction(peek_result.value().entry())
    //  if (!send_status.ok())
    //    return send_status;
    //  }
    //  PW_CHECK_OK(PopEntry(peek_result.value());
    //
    // Precondition: the buffer data must not be corrupt, otherwise there will
    // be a crash.
    //
    // Return values:
    // OK - the entry or entries were removed from the multisink succesfully.
    // FAILED_PRECONDITION - The drain must be attached to a sink.
    Status PopEntry(const PeekedEntry& entry)
        PW_LOCKS_EXCLUDED(multisink_->lock_);

    // Returns a copy of the next available entry if it exists and acquires the
    // latest drop count, without moving the drain forward, except if there is a
    // RESOURCE_EXHAUSTED error when peeking, in which case the drain is
    // automatically advanced.
    // The `drop_count_out` follows the same logic as `PopEntry`. The user must
    // call `PopEntry` once the data in peek was used successfully.
    //
    // Precondition: the buffer data must not be corrupt, otherwise there will
    // be a crash.
    //
    // Return values:
    // OK - An entry was successfully read from the multisink.
    // OUT_OF_RANGE - No entries were available.
    // FAILED_PRECONDITION - The drain must be attached to a sink.
    // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
    // the next available entry, which was discarded.
    Result<PeekedEntry> PeekEntry(ByteSpan buffer, uint32_t& drop_count_out)
        PW_LOCKS_EXCLUDED(multisink_->lock_);

    // Drains are not copyable or movable.
    Drain(const Drain&) = delete;
    Drain& operator=(const Drain&) = delete;
    Drain(Drain&&) = delete;
    Drain& operator=(Drain&&) = delete;

   protected:
    friend MultiSink;

    // The `reader_` and `last_handled_sequence_id_` are managed by attached
    // multisink and are guarded by `multisink_->lock_` when used.
    ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
    uint32_t last_handled_sequence_id_;
    uint32_t last_peek_sequence_id_;
    MultiSink* multisink_;
  };

  // A pure-virtual listener of a MultiSink, attached via AttachListener.
  // MultiSink's invoke listeners when new data arrives, allowing them to
  // schedule the draining of messages out of the MultiSink.
  class Listener : public IntrusiveList<Listener>::Item {
   public:
    constexpr Listener() {}
    virtual ~Listener() = default;

    // Listeners are not copyable or movable.
    Listener(const Listener&) = delete;
    Listener& operator=(const Drain&) = delete;
    Listener(Listener&&) = delete;
    Listener& operator=(Drain&&) = delete;

   protected:
    friend MultiSink;

    // Invoked by the attached multisink when a new entry or drop count is
    // available. The multisink lock is held during this call, so neither the
    // multisink nor it's drains can be used during this callback.
    virtual void OnNewEntryAvailable() = 0;
  };

  class iterator {
   public:
    iterator& operator++() {
      it_++;
      return *this;
    }
    iterator operator++(int) {
      iterator original = *this;
      ++*this;
      return original;
    }

    ConstByteSpan& operator*() {
      entry_ = (*it_).buffer;
      return entry_;
    }
    ConstByteSpan* operator->() { return &operator*(); }

    constexpr bool operator==(const iterator& rhs) const {
      return it_ == rhs.it_;
    }

    constexpr bool operator!=(const iterator& rhs) const {
      return it_ != rhs.it_;
    }

    // Returns the status of the last iteration operation. If the iterator
    // fails to read an entry, it will move to iterator::end() and indicate
    // the failure reason here.
    //
    // Return values:
    // OK - iteration is successful and iterator points to the next entry.
    // DATA_LOSS - Failed to read the metadata at this location.
    Status status() const { return it_.status(); }

   private:
    friend class MultiSink;

    iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
        : it_(reader) {}
    iterator() {}

    ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
    ConstByteSpan entry_;
  };

  class UnsafeIterationWrapper {
   public:
    using element_type = ConstByteSpan;
    using value_type = std::remove_cv_t<ConstByteSpan>;
    using pointer = ConstByteSpan*;
    using reference = ConstByteSpan&;
    using const_iterator = iterator;  // Standard alias for iterable types.

    iterator begin() const { return iterator(*reader_); }
    iterator end() const { return iterator(); }
    const_iterator cbegin() const { return begin(); }
    const_iterator cend() const { return end(); }

   private:
    friend class MultiSink;
    UnsafeIterationWrapper(
        ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
        : reader_(&reader) {}
    ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
  };

  UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
    return UnsafeIterationWrapper(oldest_entry_drain_.reader_);
  }

  // Constructs a multisink using a ring buffer backed by the provided buffer.
  MultiSink(ByteSpan buffer) : ring_buffer_(true), sequence_id_(0) {
    ring_buffer_.SetBuffer(buffer)
        .IgnoreError();  // TODO(pwbug/387): Handle Status properly
    AttachDrain(oldest_entry_drain_);
  }

  // Write an entry to the multisink. If available space is less than the
  // size of the entry, the internal ring buffer will push the oldest entries
  // out to make space, so long as the entry is not larger than the buffer.
  // The sequence ID of the multisink will always increment as a result of
  // calling HandleEntry, regardless of whether pushing the entry succeeds.
  //
  // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
  // function must not be called from an interrupt context.
  // Precondition: entry.size() <= `ring_buffer_` size
  void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);

  // Notifies the multisink of messages dropped before ingress. The writer
  // may use this to signal to readers that an entry (or entries) failed
  // before being sent to the multisink (e.g. the writer failed to encode
  // the message). This API increments the sequence ID of the multisink by
  // the provided `drop_count`.
  void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);

  // Attach a drain to the multisink. Drains may not be associated with more
  // than one multisink at a time. Drains can consume entries pushed before
  // the drain was attached, so long as they have not yet been evicted from
  // the underlying ring buffer.
  //
  // Precondition: The drain must not be attached to a multisink.
  void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);

  // Detaches a drain from the multisink. Drains may only be detached if they
  // were previously attached to this multisink.
  //
  // Precondition: The drain must be attached to this multisink.
  void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);

  // Attach a listener to the multisink. The listener will be notified
  // immediately when attached, to allow late drain users to consume existing
  // entries. If draining in response to the notification, ensure that the drain
  // is attached prior to registering the listener; attempting to drain when
  // unattached will crash. Once attached, listeners are invoked on all new
  // messages.
  //
  // Precondition: The listener must not be attached to a multisink.
  void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);

  // Detaches a listener from the multisink.
  //
  // Precondition: The listener must be attached to this multisink.
  void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);

  // Removes all data from the internal buffer. The multisink's sequence ID is
  // not modified, so readers may interpret this event as droppping entries.
  void Clear() PW_LOCKS_EXCLUDED(lock_);

  // Uses MultiSink's unsafe iteration to dump the contents to a user-provided
  // callback. max_num_entries can be used to limit the dump to the N most
  // recent entries.
  //
  // Returns:
  //   OK - Successfully dumped entire multisink.
  //   DATA_LOSS - Corruption detected, some entries may have been lost.
  Status UnsafeForEachEntry(
      const Function<void(ConstByteSpan)>& callback,
      size_t max_num_entries = std::numeric_limits<size_t>::max());

 protected:
  friend Drain;

  enum class Request { kPop, kPeek };
  // Removes the previously peeked entry from the front of the multisink.
  Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry)
      PW_LOCKS_EXCLUDED(lock_);

  // Gets a copy of the entry from the provided drain and unpacks sequence ID
  // information. The entry is removed from the multisink when `request` is set
  // to `Request::kPop`. Drains use this API to strip away sequence ID
  // information for drop calculation.
  //
  // Precondition: the buffer data must not be corrupt, otherwise there will
  // be a crash.
  //
  // Returns:
  // OK - An entry was successfully read from the multisink. The
  // `drop_count_out` is set to the difference between the current sequence ID
  // and the last handled ID.
  // FAILED_PRECONDITION - The drain is not attached to
  // a multisink.
  // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
  // the next available entry, which was discarded.
  Result<ConstByteSpan> PeekOrPopEntry(Drain& drain,
                                       ByteSpan buffer,
                                       Request request,
                                       uint32_t& drop_count_out,
                                       uint32_t& entry_sequence_id_out)
      PW_LOCKS_EXCLUDED(lock_);

 private:
  // Notifies attached listeners of new entries or an updated drop count.
  void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);

  IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
  ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
  Drain oldest_entry_drain_ PW_GUARDED_BY(lock_);
  uint32_t sequence_id_ PW_GUARDED_BY(lock_);
  LockType lock_;
};

}  // namespace multisink
}  // namespace pw