aboutsummaryrefslogtreecommitdiff
path: root/mojo/public/cpp/bindings/connector.h
blob: cb065c174d7d76b0d8372ffffbb9812960de0ffe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef MOJO_PUBLIC_CPP_BINDINGS_CONNECTOR_H_
#define MOJO_PUBLIC_CPP_BINDINGS_CONNECTOR_H_

#include <memory>

#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/optional.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread_checker.h"
#include "mojo/public/cpp/bindings/bindings_export.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/public/cpp/system/simple_watcher.h"

namespace base {
class Lock;
}

namespace mojo {

// The Connector class is responsible for performing read/write operations on a
// MessagePipe. It writes messages it receives through the MessageReceiver
// interface that it subclasses, and it forwards messages it reads through the
// MessageReceiver interface assigned as its incoming receiver.
//
// NOTE:
//   - MessagePipe I/O is non-blocking.
//   - Sending messages can be configured to be thread safe (please see comments
//     of the constructor). Other than that, the object should only be accessed
//     on the creating thread.
class MOJO_CPP_BINDINGS_EXPORT Connector
    : NON_EXPORTED_BASE(public MessageReceiver) {
 public:
  enum ConnectorConfig {
    // Connector::Accept() is only called from a single thread.
    SINGLE_THREADED_SEND,
    // Connector::Accept() is allowed to be called from multiple threads.
    MULTI_THREADED_SEND
  };

  // The Connector takes ownership of |message_pipe|.
  Connector(ScopedMessagePipeHandle message_pipe,
            ConnectorConfig config,
            scoped_refptr<base::SingleThreadTaskRunner> runner);
  ~Connector() override;

  // Sets the receiver to handle messages read from the message pipe.  The
  // Connector will read messages from the pipe regardless of whether or not an
  // incoming receiver has been set.
  void set_incoming_receiver(MessageReceiver* receiver) {
    DCHECK(thread_checker_.CalledOnValidThread());
    incoming_receiver_ = receiver;
  }

  // Errors from incoming receivers will force the connector into an error
  // state, where no more messages will be processed. This method is used
  // during testing to prevent that from happening.
  void set_enforce_errors_from_incoming_receiver(bool enforce) {
    DCHECK(thread_checker_.CalledOnValidThread());
    enforce_errors_from_incoming_receiver_ = enforce;
  }

  // Sets the error handler to receive notifications when an error is
  // encountered while reading from the pipe or waiting to read from the pipe.
  void set_connection_error_handler(const base::Closure& error_handler) {
    DCHECK(thread_checker_.CalledOnValidThread());
    connection_error_handler_ = error_handler;
  }

  // Returns true if an error was encountered while reading from the pipe or
  // waiting to read from the pipe.
  bool encountered_error() const {
    DCHECK(thread_checker_.CalledOnValidThread());
    return error_;
  }

  // Closes the pipe. The connector is put into a quiescent state.
  //
  // Please note that this method shouldn't be called unless it results from an
  // explicit request of the user of bindings (e.g., the user sets an
  // InterfacePtr to null or closes a Binding).
  void CloseMessagePipe();

  // Releases the pipe. Connector is put into a quiescent state.
  ScopedMessagePipeHandle PassMessagePipe();

  // Enters the error state. The upper layer may do this for unrecoverable
  // issues such as invalid messages are received. If a connection error handler
  // has been set, it will be called asynchronously.
  //
  // It is a no-op if the connector is already in the error state or there isn't
  // a bound message pipe. Otherwise, it closes the message pipe, which notifies
  // the other end and also prevents potential danger (say, the caller raises
  // an error because it believes the other end is malicious). In order to
  // appear to the user that the connector still binds to a message pipe, it
  // creates a new message pipe, closes one end and binds to the other.
  void RaiseError();

  // Is the connector bound to a MessagePipe handle?
  bool is_valid() const {
    DCHECK(thread_checker_.CalledOnValidThread());
    return message_pipe_.is_valid();
  }

  // Waits for the next message on the pipe, blocking until one arrives,
  // |deadline| elapses, or an error happens. Returns |true| if a message has
  // been delivered, |false| otherwise.
  bool WaitForIncomingMessage(MojoDeadline deadline);

  // See Binding for details of pause/resume.
  void PauseIncomingMethodCallProcessing();
  void ResumeIncomingMethodCallProcessing();

  // MessageReceiver implementation:
  bool Accept(Message* message) override;

  MessagePipeHandle handle() const {
    DCHECK(thread_checker_.CalledOnValidThread());
    return message_pipe_.get();
  }

  // Allows |message_pipe_| to be watched while others perform sync handle
  // watching on the same thread. Please see comments of
  // SyncHandleWatcher::AllowWokenUpBySyncWatchOnSameThread().
  void AllowWokenUpBySyncWatchOnSameThread();

  // Watches |message_pipe_| (as well as other handles registered to be watched
  // together) synchronously.
  // This method:
  //   - returns true when |should_stop| is set to true;
  //   - return false when any error occurs, including |message_pipe_| being
  //     closed.
  bool SyncWatch(const bool* should_stop);

  // Whether currently the control flow is inside the sync handle watcher
  // callback.
  // It always returns false after CloseMessagePipe()/PassMessagePipe().
  bool during_sync_handle_watcher_callback() const {
    return sync_handle_watcher_callback_count_ > 0;
  }

  base::SingleThreadTaskRunner* task_runner() const {
    return task_runner_.get();
  }

  // Sets the tag used by the heap profiler.
  // |tag| must be a const string literal.
  void SetWatcherHeapProfilerTag(const char* tag);

 private:
  class ActiveDispatchTracker;
  class MessageLoopNestingObserver;

  // Callback of mojo::SimpleWatcher.
  void OnWatcherHandleReady(MojoResult result);
  // Callback of SyncHandleWatcher.
  void OnSyncHandleWatcherHandleReady(MojoResult result);
  void OnHandleReadyInternal(MojoResult result);

  void WaitToReadMore();

  // Returns false if it is impossible to receive more messages in the future.
  // |this| may have been destroyed in that case.
  WARN_UNUSED_RESULT bool ReadSingleMessage(MojoResult* read_result);

  // |this| can be destroyed during message dispatch.
  void ReadAllAvailableMessages();

  // If |force_pipe_reset| is true, this method replaces the existing
  // |message_pipe_| with a dummy message pipe handle (whose peer is closed).
  // If |force_async_handler| is true, |connection_error_handler_| is called
  // asynchronously.
  void HandleError(bool force_pipe_reset, bool force_async_handler);

  // Cancels any calls made to |waiter_|.
  void CancelWait();

  void EnsureSyncWatcherExists();

  base::Closure connection_error_handler_;

  ScopedMessagePipeHandle message_pipe_;
  MessageReceiver* incoming_receiver_ = nullptr;

  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
  std::unique_ptr<SimpleWatcher> handle_watcher_;

  bool error_ = false;
  bool drop_writes_ = false;
  bool enforce_errors_from_incoming_receiver_ = true;

  bool paused_ = false;

  // If sending messages is allowed from multiple threads, |lock_| is used to
  // protect modifications to |message_pipe_| and |drop_writes_|.
  base::Optional<base::Lock> lock_;

  std::unique_ptr<SyncHandleWatcher> sync_watcher_;
  bool allow_woken_up_by_others_ = false;
  // If non-zero, currently the control flow is inside the sync handle watcher
  // callback.
  size_t sync_handle_watcher_callback_count_ = 0;

  base::ThreadChecker thread_checker_;

  base::Lock connected_lock_;
  bool connected_ = true;

  // The tag used to track heap allocations that originated from a Watcher
  // notification.
  const char* heap_profiler_tag_ = nullptr;

  // A cached pointer to the MessageLoopNestingObserver for the MessageLoop on
  // which this Connector was created.
  MessageLoopNestingObserver* const nesting_observer_;

  // |true| iff the Connector is currently dispatching a message. Used to detect
  // nested dispatch operations.
  bool is_dispatching_ = false;

  // Create a single weak ptr and use it everywhere, to avoid the malloc/free
  // cost of creating a new weak ptr whenever it is needed.
  // NOTE: This weak pointer is invalidated when the message pipe is closed or
  // transferred (i.e., when |connected_| is set to false).
  base::WeakPtr<Connector> weak_self_;
  base::WeakPtrFactory<Connector> weak_factory_;

  DISALLOW_COPY_AND_ASSIGN(Connector);
};

}  // namespace mojo

#endif  // MOJO_PUBLIC_CPP_BINDINGS_CONNECTOR_H_