summaryrefslogtreecommitdiff
path: root/base/message_loop/message_pump_libevent.h
blob: 4017d93a118019837a10a2d89f2e9d1e30b6b422 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_MESSAGE_LOOP_MESSAGE_PUMP_LIBEVENT_H_
#define BASE_MESSAGE_LOOP_MESSAGE_PUMP_LIBEVENT_H_

#include <memory>
#include <tuple>

#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ptr_exclusion.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump.h"
#include "base/message_loop/message_pump_buildflags.h"
#include "base/message_loop/watchable_io_message_pump_posix.h"
#include "base/threading/thread_checker.h"
#include "third_party/libevent/event.h"

// Declare structs we need from libevent.h rather than including it
struct event_base;
struct event;
namespace base {

class MessagePumpEpoll;

// Class to monitor sockets and issue callbacks when sockets are ready for I/O
// TODO(dkegel): add support for background file IO somehow
class BASE_EXPORT MessagePumpLibevent : public MessagePump,
                                        public WatchableIOMessagePumpPosix {
 public:
  class FdWatchController;

  // Parameters used to construct and describe an EpollInterest.
  struct EpollInterestParams {
    // The file descriptor of interest.
    int fd;

    // Indicates an interest in being able to read() from `fd`.
    bool read;

    // Indicates an interest in being able to write() to `fd`.
    bool write;

    // Indicates whether this interest is a one-shot interest, meaning that it
    // must be automatically deactivated every time it triggers an epoll event.
    bool one_shot;

    bool IsEqual(const EpollInterestParams& rhs) const {
      return std::tie(fd, read, write, one_shot) ==
             std::tie(rhs.fd, rhs.read, rhs.write, rhs.one_shot);
    }
  };

  // Represents a single controller's interest in a file descriptor via epoll,
  // and tracks whether that interest is currently active. Though an interest
  // persists as long as its controller is alive and hasn't changed interests,
  // it only participates in epoll waits while active. These objects are only
  // used when MessagePumpLibevent is configured to use the epoll API instead of
  // libevent.
  class EpollInterest : public RefCounted<EpollInterest> {
   public:
    EpollInterest(FdWatchController* controller,
                  const EpollInterestParams& params);
    EpollInterest(const EpollInterest&) = delete;
    EpollInterest& operator=(const EpollInterest&) = delete;

    FdWatchController* controller() { return controller_; }
    const EpollInterestParams& params() const { return params_; }

    bool active() const { return active_; }
    void set_active(bool active) { active_ = active; }

    // Only meaningful between WatchForControllerDestruction() and
    // StopWatchingForControllerDestruction().
    bool was_controller_destroyed() const { return was_controller_destroyed_; }

    void WatchForControllerDestruction() {
      DCHECK(!controller_->was_destroyed_);
      controller_->was_destroyed_ = &was_controller_destroyed_;
    }

    void StopWatchingForControllerDestruction() {
      if (!was_controller_destroyed_) {
        DCHECK_EQ(controller_->was_destroyed_, &was_controller_destroyed_);
        controller_->was_destroyed_ = nullptr;
      }
    }

   private:
    friend class RefCounted<EpollInterest>;
    ~EpollInterest();

    const raw_ptr<FdWatchController, DanglingUntriaged> controller_;
    const EpollInterestParams params_;
    bool active_ = true;
    bool was_controller_destroyed_ = false;
  };

  // Note that this class is used as the FdWatchController for both
  // MessagePumpLibevent *and* MessagePumpEpoll in order to avoid unnecessary
  // code churn during experimentation and eventual transition. Consumers
  // construct their own FdWatchController instances, so switching this type
  // at runtime would require potentially complex logic changes to all
  // consumers.
  class FdWatchController : public FdWatchControllerInterface {
   public:
    explicit FdWatchController(const Location& from_here);

    FdWatchController(const FdWatchController&) = delete;
    FdWatchController& operator=(const FdWatchController&) = delete;

    // Implicitly calls StopWatchingFileDescriptor.
    ~FdWatchController() override;

    // FdWatchControllerInterface:
    bool StopWatchingFileDescriptor() override;

   private:
    friend class MessagePumpEpoll;
    friend class MessagePumpLibevent;
    friend class MessagePumpLibeventTest;

    // Common methods called by both pump implementations.
    void set_watcher(FdWatcher* watcher) { watcher_ = watcher; }

    // Methods called only by MessagePumpLibevent
    void set_libevent_pump(MessagePumpLibevent* pump) { libevent_pump_ = pump; }
    MessagePumpLibevent* libevent_pump() const { return libevent_pump_; }

    void Init(std::unique_ptr<event> e);
    std::unique_ptr<event> ReleaseEvent();

    void OnFileCanReadWithoutBlocking(int fd, MessagePumpLibevent* pump);
    void OnFileCanWriteWithoutBlocking(int fd, MessagePumpLibevent* pump);

    // Methods called only by MessagePumpEpoll
    void set_epoll_pump(WeakPtr<MessagePumpEpoll> pump) {
      epoll_pump_ = std::move(pump);
    }
    const scoped_refptr<EpollInterest>& epoll_interest() const {
      return epoll_interest_;
    }

    // Creates a new Interest described by `params` and adopts it as this
    // controller's exclusive interest. Any prior interest is dropped by the
    // controller and should be unregistered on the MessagePumpEpoll.
    const scoped_refptr<EpollInterest>& AssignEpollInterest(
        const EpollInterestParams& params);

    void OnFdReadable();
    void OnFdWritable();

    // Common state
    raw_ptr<FdWatcher> watcher_ = nullptr;

    // If this pointer is non-null when the FdWatchController is destroyed, the
    // pointee is set to true.
    raw_ptr<bool> was_destroyed_ = nullptr;

    // State used only with libevent
    std::unique_ptr<event> event_;
    raw_ptr<MessagePumpLibevent> libevent_pump_ = nullptr;

    // State used only with epoll
    WeakPtr<MessagePumpEpoll> epoll_pump_;
    scoped_refptr<EpollInterest> epoll_interest_;
  };

  MessagePumpLibevent();

#if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
  // Constructs a MessagePumpLibevent which is forced to use epoll directly
  // instead of libevent.
  enum { kUseEpoll };
  explicit MessagePumpLibevent(decltype(kUseEpoll));
#endif

  MessagePumpLibevent(const MessagePumpLibevent&) = delete;
  MessagePumpLibevent& operator=(const MessagePumpLibevent&) = delete;

  ~MessagePumpLibevent() override;

  // Must be called early in process startup, but after FeatureList
  // initialization. This allows MessagePumpLibevent to query and cache the
  // enabled state of any relevant features.
  static void InitializeFeatures();

  bool WatchFileDescriptor(int fd,
                           bool persistent,
                           int mode,
                           FdWatchController* controller,
                           FdWatcher* delegate);

  // MessagePump methods:
  void Run(Delegate* delegate) override;
  void Quit() override;
  void ScheduleWork() override;
  void ScheduleDelayedWork(
      const Delegate::NextWorkInfo& next_work_info) override;

 private:
  friend class MessagePumpLibeventTest;

  // Risky part of constructor.  Returns true on success.
  bool Init();

  // Called by libevent to tell us a registered FD can be read/written to.
  static void OnLibeventNotification(int fd, short flags, void* context);

  // Unix pipe used to implement ScheduleWork()
  // ... callback; called by libevent inside Run() when pipe is ready to read
  static void OnWakeup(int socket, short flags, void* context);

  struct RunState {
    explicit RunState(Delegate* delegate_in) : delegate(delegate_in) {}

    // `delegate` is not a raw_ptr<...> for performance reasons (based on
    // analysis of sampling profiler data and tab_search:top100:2020).
    RAW_PTR_EXCLUSION Delegate* const delegate;

    // Used to flag that the current Run() invocation should return ASAP.
    bool should_quit = false;
  };

#if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
  // If direct use of epoll is enabled, this is the MessagePumpEpoll instance
  // used. In that case, all libevent state below is ignored and unused.
  // Otherwise this is null.
  std::unique_ptr<MessagePumpEpoll> epoll_pump_;
#endif

  // State for the current invocation of Run(). null if not running.
  // This field is not a raw_ptr<> because it was filtered by the rewriter for:
  // #addr-of
  RAW_PTR_EXCLUSION RunState* run_state_ = nullptr;

  // This flag is set if libevent has processed I/O events.
  bool processed_io_events_ = false;

  struct EventBaseFree {
    inline void operator()(event_base* e) const {
      if (e)
        event_base_free(e);
    }
  };
  // Libevent dispatcher.  Watches all sockets registered with it, and sends
  // readiness callbacks when a socket is ready for I/O.
  std::unique_ptr<event_base, EventBaseFree> event_base_{event_base_new()};

  // ... write end; ScheduleWork() writes a single byte to it
  int wakeup_pipe_in_ = -1;
  // ... read end; OnWakeup reads it and then breaks Run() out of its sleep
  int wakeup_pipe_out_ = -1;
  // ... libevent wrapper for read end
  std::unique_ptr<event> wakeup_event_;

  ThreadChecker watch_file_descriptor_caller_checker_;
};

}  // namespace base

#endif  // BASE_MESSAGE_LOOP_MESSAGE_PUMP_LIBEVENT_H_