summaryrefslogtreecommitdiff
path: root/mojo/edk/system/data_pipe_consumer_dispatcher.cc
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/edk/system/data_pipe_consumer_dispatcher.cc')
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.cc562
1 files changed, 562 insertions, 0 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
new file mode 100644
index 0000000000..f3387324fc
--- /dev/null
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -0,0 +1,562 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <limits>
+#include <utility>
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_shared_buffer.h"
+#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/data_pipe_control_message.h"
+#include "mojo/edk/system/node_controller.h"
+#include "mojo/edk/system/ports_message.h"
+#include "mojo/edk/system/request_context.h"
+#include "mojo/public/c/system/data_pipe.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+const uint8_t kFlagPeerClosed = 0x01;
+
+#pragma pack(push, 1)
+
+struct SerializedState {
+ MojoCreateDataPipeOptions options;
+ uint64_t pipe_id;
+ uint32_t read_offset;
+ uint32_t bytes_available;
+ uint8_t flags;
+ char padding[7];
+};
+
+static_assert(sizeof(SerializedState) % 8 == 0,
+ "Invalid SerializedState size.");
+
+#pragma pack(pop)
+
+} // namespace
+
+// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
+// reference to the dispatcher to ensure it lives as long as the observed port.
+class DataPipeConsumerDispatcher::PortObserverThunk
+ : public NodeController::PortObserver {
+ public:
+ explicit PortObserverThunk(
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
+ : dispatcher_(dispatcher) {}
+
+ private:
+ ~PortObserverThunk() override {}
+
+ // NodeController::PortObserver:
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
+
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
+
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
+};
+
+DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
+ NodeController* node_controller,
+ const ports::PortRef& control_port,
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
+ const MojoCreateDataPipeOptions& options,
+ bool initialized,
+ uint64_t pipe_id)
+ : options_(options),
+ node_controller_(node_controller),
+ control_port_(control_port),
+ pipe_id_(pipe_id),
+ watchers_(this),
+ shared_ring_buffer_(shared_ring_buffer) {
+ if (initialized) {
+ base::AutoLock lock(lock_);
+ InitializeNoLock();
+ }
+}
+
+Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
+ return Type::DATA_PIPE_CONSUMER;
+}
+
+MojoResult DataPipeConsumerDispatcher::Close() {
+ base::AutoLock lock(lock_);
+ DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
+ return CloseNoLock();
+}
+
+MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
+ uint32_t* num_bytes,
+ MojoReadDataFlags flags) {
+ base::AutoLock lock(lock_);
+
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (in_two_phase_read_)
+ return MOJO_RESULT_BUSY;
+
+ const bool had_new_data = new_data_available_;
+ new_data_available_ = false;
+
+ if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
+ if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
+ (flags & MOJO_READ_DATA_FLAG_DISCARD))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
+ DVLOG_IF(2, elements)
+ << "Query mode: ignoring non-null |elements|";
+ *num_bytes = static_cast<uint32_t>(bytes_available_);
+
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ return MOJO_RESULT_OK;
+ }
+
+ bool discard = false;
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
+ // These flags are mutally exclusive.
+ if (flags & MOJO_READ_DATA_FLAG_PEEK)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ DVLOG_IF(2, elements)
+ << "Discard mode: ignoring non-null |elements|";
+ discard = true;
+ }
+
+ uint32_t max_num_bytes_to_read = *num_bytes;
+ if (max_num_bytes_to_read % options_.element_num_bytes != 0)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
+ uint32_t min_num_bytes_to_read =
+ all_or_none ? max_num_bytes_to_read : 0;
+
+ if (min_num_bytes_to_read > bytes_available_) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_OUT_OF_RANGE;
+ }
+
+ uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
+ if (bytes_to_read == 0) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ if (!discard) {
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ CHECK(data);
+
+ uint8_t* destination = static_cast<uint8_t*>(elements);
+ CHECK(destination);
+
+ DCHECK_LE(read_offset_, options_.capacity_num_bytes);
+ uint32_t tail_bytes_to_copy =
+ std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
+ uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
+ if (tail_bytes_to_copy > 0)
+ memcpy(destination, data + read_offset_, tail_bytes_to_copy);
+ if (head_bytes_to_copy > 0)
+ memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
+ }
+ *num_bytes = bytes_to_read;
+
+ bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
+ if (discard || !peek) {
+ read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
+ bytes_available_ -= bytes_to_read;
+
+ base::AutoUnlock unlock(lock_);
+ NotifyRead(bytes_to_read);
+ }
+
+ // We may have just read the last available data and thus changed the signals
+ // state.
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
+ uint32_t* buffer_num_bytes,
+ MojoReadDataFlags flags) {
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (in_two_phase_read_)
+ return MOJO_RESULT_BUSY;
+
+ // These flags may not be used in two-phase mode.
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
+ (flags & MOJO_READ_DATA_FLAG_QUERY) ||
+ (flags & MOJO_READ_DATA_FLAG_PEEK))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ const bool had_new_data = new_data_available_;
+ new_data_available_ = false;
+
+ if (bytes_available_ == 0) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ DCHECK_LT(read_offset_, options_.capacity_num_bytes);
+ uint32_t bytes_to_read = std::min(bytes_available_,
+ options_.capacity_num_bytes - read_offset_);
+
+ CHECK(ring_buffer_mapping_);
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ CHECK(data);
+
+ in_two_phase_read_ = true;
+ *buffer = data + read_offset_;
+ *buffer_num_bytes = bytes_to_read;
+ two_phase_max_bytes_read_ = bytes_to_read;
+
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
+ base::AutoLock lock(lock_);
+ if (!in_two_phase_read_)
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ if (in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ CHECK(shared_ring_buffer_);
+
+ MojoResult rv;
+ if (num_bytes_read > two_phase_max_bytes_read_ ||
+ num_bytes_read % options_.element_num_bytes != 0) {
+ rv = MOJO_RESULT_INVALID_ARGUMENT;
+ } else {
+ rv = MOJO_RESULT_OK;
+ read_offset_ =
+ (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
+
+ DCHECK_GE(bytes_available_, num_bytes_read);
+ bytes_available_ -= num_bytes_read;
+
+ base::AutoUnlock unlock(lock_);
+ NotifyRead(num_bytes_read);
+ }
+
+ in_two_phase_read_ = false;
+ two_phase_max_bytes_read_ = 0;
+
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
+ return rv;
+}
+
+HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
+ base::AutoLock lock(lock_);
+ return GetHandleSignalsStateNoLock();
+}
+
+MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
+ const scoped_refptr<WatcherDispatcher>& watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
+ WatcherDispatcher* watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Remove(watcher, context);
+}
+
+void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
+ *num_ports = 1;
+ *num_handles = 1;
+}
+
+bool DataPipeConsumerDispatcher::EndSerialize(
+ void* destination,
+ ports::PortName* ports,
+ PlatformHandle* platform_handles) {
+ SerializedState* state = static_cast<SerializedState*>(destination);
+ memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
+ memset(state->padding, 0, sizeof(state->padding));
+
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ state->pipe_id = pipe_id_;
+ state->read_offset = read_offset_;
+ state->bytes_available = bytes_available_;
+ state->flags = peer_closed_ ? kFlagPeerClosed : 0;
+
+ ports[0] = control_port_.name();
+
+ buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
+ platform_handles[0] = buffer_handle_for_transit_.get();
+
+ return true;
+}
+
+bool DataPipeConsumerDispatcher::BeginTransit() {
+ base::AutoLock lock(lock_);
+ if (in_transit_)
+ return false;
+ in_transit_ = !in_two_phase_read_;
+ return in_transit_;
+}
+
+void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
+ node_controller_->SetPortObserver(control_port_, nullptr);
+
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ in_transit_ = false;
+ transferred_ = true;
+ ignore_result(buffer_handle_for_transit_.release());
+ CloseNoLock();
+}
+
+void DataPipeConsumerDispatcher::CancelTransit() {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ in_transit_ = false;
+ buffer_handle_for_transit_.reset();
+ UpdateSignalsStateNoLock();
+}
+
+// static
+scoped_refptr<DataPipeConsumerDispatcher>
+DataPipeConsumerDispatcher::Deserialize(const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles) {
+ if (num_ports != 1 || num_handles != 1 ||
+ num_bytes != sizeof(SerializedState)) {
+ return nullptr;
+ }
+
+ const SerializedState* state = static_cast<const SerializedState*>(data);
+
+ NodeController* node_controller = internal::g_core->GetNodeController();
+ ports::PortRef port;
+ if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
+ return nullptr;
+
+ PlatformHandle buffer_handle;
+ std::swap(buffer_handle, handles[0]);
+ scoped_refptr<PlatformSharedBuffer> ring_buffer =
+ PlatformSharedBuffer::CreateFromPlatformHandle(
+ state->options.capacity_num_bytes,
+ false /* read_only */,
+ ScopedPlatformHandle(buffer_handle));
+ if (!ring_buffer) {
+ DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
+ return nullptr;
+ }
+
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
+ new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
+ state->options, false /* initialized */,
+ state->pipe_id);
+
+ {
+ base::AutoLock lock(dispatcher->lock_);
+ dispatcher->read_offset_ = state->read_offset;
+ dispatcher->bytes_available_ = state->bytes_available;
+ dispatcher->new_data_available_ = state->bytes_available > 0;
+ dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
+ dispatcher->InitializeNoLock();
+ dispatcher->UpdateSignalsStateNoLock();
+ }
+
+ return dispatcher;
+}
+
+DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
+ DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
+ !in_transit_);
+}
+
+void DataPipeConsumerDispatcher::InitializeNoLock() {
+ lock_.AssertAcquired();
+
+ if (shared_ring_buffer_) {
+ DCHECK(!ring_buffer_mapping_);
+ ring_buffer_mapping_ =
+ shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
+ if (!ring_buffer_mapping_) {
+ DLOG(ERROR) << "Failed to map shared buffer.";
+ shared_ring_buffer_ = nullptr;
+ }
+ }
+
+ base::AutoUnlock unlock(lock_);
+ node_controller_->SetPortObserver(
+ control_port_,
+ make_scoped_refptr(new PortObserverThunk(this)));
+}
+
+MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
+ lock_.AssertAcquired();
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ is_closed_ = true;
+ ring_buffer_mapping_.reset();
+ shared_ring_buffer_ = nullptr;
+
+ watchers_.NotifyClosed();
+ if (!transferred_) {
+ base::AutoUnlock unlock(lock_);
+ node_controller_->ClosePort(control_port_);
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+HandleSignalsState
+DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
+ lock_.AssertAcquired();
+
+ HandleSignalsState rv;
+ if (shared_ring_buffer_ && bytes_available_) {
+ if (!in_two_phase_read_) {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ if (new_data_available_)
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
+ }
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ } else if (!peer_closed_ && shared_ring_buffer_) {
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ }
+
+ if (shared_ring_buffer_) {
+ if (new_data_available_ || !peer_closed_)
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
+ }
+
+ if (peer_closed_)
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+
+ return rv;
+}
+
+void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
+ << num_bytes << " bytes read. [control_port="
+ << control_port_.name() << "]";
+
+ SendDataPipeControlMessage(node_controller_, control_port_,
+ DataPipeCommand::DATA_WAS_READ, num_bytes);
+}
+
+void DataPipeConsumerDispatcher::OnPortStatusChanged() {
+ DCHECK(RequestContext::current());
+
+ base::AutoLock lock(lock_);
+
+ // We stop observing the control port as soon it's transferred, but this can
+ // race with events which are raised right before that happens. This is fine
+ // to ignore.
+ if (transferred_)
+ return;
+
+ DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
+
+ UpdateSignalsStateNoLock();
+}
+
+void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
+ lock_.AssertAcquired();
+
+ bool was_peer_closed = peer_closed_;
+ size_t previous_bytes_available = bytes_available_;
+
+ ports::PortStatus port_status;
+ int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
+ if (rv != ports::OK || !port_status.receiving_messages) {
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
+ << " [control_port=" << control_port_.name() << "]";
+ peer_closed_ = true;
+ } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
+ ports::ScopedMessage message;
+ do {
+ int rv = node_controller_->node()->GetMessage(
+ control_port_, &message, nullptr);
+ if (rv != ports::OK)
+ peer_closed_ = true;
+ if (message) {
+ if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
+ peer_closed_ = true;
+ break;
+ }
+
+ const DataPipeControlMessage* m =
+ static_cast<const DataPipeControlMessage*>(
+ message->payload_bytes());
+
+ if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
+ DLOG(ERROR) << "Unexpected control message from producer.";
+ peer_closed_ = true;
+ break;
+ }
+
+ if (static_cast<size_t>(bytes_available_) + m->num_bytes >
+ options_.capacity_num_bytes) {
+ DLOG(ERROR) << "Producer claims to have written too many bytes.";
+ peer_closed_ = true;
+ break;
+ }
+
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
+ << m->num_bytes << " bytes were written. [control_port="
+ << control_port_.name() << "]";
+
+ bytes_available_ += m->num_bytes;
+ }
+ } while (message);
+ }
+
+ bool has_new_data = bytes_available_ != previous_bytes_available;
+ if (has_new_data)
+ new_data_available_ = true;
+
+ if (peer_closed_ != was_peer_closed || has_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+}
+
+} // namespace edk
+} // namespace mojo