diff options
Diffstat (limited to 'mojo/edk/system/data_pipe_consumer_dispatcher.cc')
-rw-r--r-- | mojo/edk/system/data_pipe_consumer_dispatcher.cc | 562 |
1 files changed, 562 insertions, 0 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc new file mode 100644 index 0000000000..f3387324fc --- /dev/null +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc @@ -0,0 +1,562 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/data_pipe_consumer_dispatcher.h" + +#include <stddef.h> +#include <stdint.h> + +#include <algorithm> +#include <limits> +#include <utility> + +#include "base/bind.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop/message_loop.h" +#include "mojo/edk/embedder/embedder_internal.h" +#include "mojo/edk/embedder/platform_shared_buffer.h" +#include "mojo/edk/system/core.h" +#include "mojo/edk/system/data_pipe_control_message.h" +#include "mojo/edk/system/node_controller.h" +#include "mojo/edk/system/ports_message.h" +#include "mojo/edk/system/request_context.h" +#include "mojo/public/c/system/data_pipe.h" + +namespace mojo { +namespace edk { + +namespace { + +const uint8_t kFlagPeerClosed = 0x01; + +#pragma pack(push, 1) + +struct SerializedState { + MojoCreateDataPipeOptions options; + uint64_t pipe_id; + uint32_t read_offset; + uint32_t bytes_available; + uint8_t flags; + char padding[7]; +}; + +static_assert(sizeof(SerializedState) % 8 == 0, + "Invalid SerializedState size."); + +#pragma pack(pop) + +} // namespace + +// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a +// reference to the dispatcher to ensure it lives as long as the observed port. +class DataPipeConsumerDispatcher::PortObserverThunk + : public NodeController::PortObserver { + public: + explicit PortObserverThunk( + scoped_refptr<DataPipeConsumerDispatcher> dispatcher) + : dispatcher_(dispatcher) {} + + private: + ~PortObserverThunk() override {} + + // NodeController::PortObserver: + void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } + + scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; + + DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); +}; + +DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( + NodeController* node_controller, + const ports::PortRef& control_port, + scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, + const MojoCreateDataPipeOptions& options, + bool initialized, + uint64_t pipe_id) + : options_(options), + node_controller_(node_controller), + control_port_(control_port), + pipe_id_(pipe_id), + watchers_(this), + shared_ring_buffer_(shared_ring_buffer) { + if (initialized) { + base::AutoLock lock(lock_); + InitializeNoLock(); + } +} + +Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { + return Type::DATA_PIPE_CONSUMER; +} + +MojoResult DataPipeConsumerDispatcher::Close() { + base::AutoLock lock(lock_); + DVLOG(1) << "Closing data pipe consumer " << pipe_id_; + return CloseNoLock(); +} + +MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, + uint32_t* num_bytes, + MojoReadDataFlags flags) { + base::AutoLock lock(lock_); + + if (!shared_ring_buffer_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + if (in_two_phase_read_) + return MOJO_RESULT_BUSY; + + const bool had_new_data = new_data_available_; + new_data_available_ = false; + + if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { + if ((flags & MOJO_READ_DATA_FLAG_PEEK) || + (flags & MOJO_READ_DATA_FLAG_DISCARD)) + return MOJO_RESULT_INVALID_ARGUMENT; + DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. + DVLOG_IF(2, elements) + << "Query mode: ignoring non-null |elements|"; + *num_bytes = static_cast<uint32_t>(bytes_available_); + + if (had_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + return MOJO_RESULT_OK; + } + + bool discard = false; + if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { + // These flags are mutally exclusive. + if (flags & MOJO_READ_DATA_FLAG_PEEK) + return MOJO_RESULT_INVALID_ARGUMENT; + DVLOG_IF(2, elements) + << "Discard mode: ignoring non-null |elements|"; + discard = true; + } + + uint32_t max_num_bytes_to_read = *num_bytes; + if (max_num_bytes_to_read % options_.element_num_bytes != 0) + return MOJO_RESULT_INVALID_ARGUMENT; + + bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; + uint32_t min_num_bytes_to_read = + all_or_none ? max_num_bytes_to_read : 0; + + if (min_num_bytes_to_read > bytes_available_) { + if (had_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION + : MOJO_RESULT_OUT_OF_RANGE; + } + + uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); + if (bytes_to_read == 0) { + if (had_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION + : MOJO_RESULT_SHOULD_WAIT; + } + + if (!discard) { + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); + CHECK(data); + + uint8_t* destination = static_cast<uint8_t*>(elements); + CHECK(destination); + + DCHECK_LE(read_offset_, options_.capacity_num_bytes); + uint32_t tail_bytes_to_copy = + std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); + uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; + if (tail_bytes_to_copy > 0) + memcpy(destination, data + read_offset_, tail_bytes_to_copy); + if (head_bytes_to_copy > 0) + memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); + } + *num_bytes = bytes_to_read; + + bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); + if (discard || !peek) { + read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; + bytes_available_ -= bytes_to_read; + + base::AutoUnlock unlock(lock_); + NotifyRead(bytes_to_read); + } + + // We may have just read the last available data and thus changed the signals + // state. + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + + return MOJO_RESULT_OK; +} + +MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, + uint32_t* buffer_num_bytes, + MojoReadDataFlags flags) { + base::AutoLock lock(lock_); + if (!shared_ring_buffer_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + if (in_two_phase_read_) + return MOJO_RESULT_BUSY; + + // These flags may not be used in two-phase mode. + if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || + (flags & MOJO_READ_DATA_FLAG_QUERY) || + (flags & MOJO_READ_DATA_FLAG_PEEK)) + return MOJO_RESULT_INVALID_ARGUMENT; + + const bool had_new_data = new_data_available_; + new_data_available_ = false; + + if (bytes_available_ == 0) { + if (had_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION + : MOJO_RESULT_SHOULD_WAIT; + } + + DCHECK_LT(read_offset_, options_.capacity_num_bytes); + uint32_t bytes_to_read = std::min(bytes_available_, + options_.capacity_num_bytes - read_offset_); + + CHECK(ring_buffer_mapping_); + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); + CHECK(data); + + in_two_phase_read_ = true; + *buffer = data + read_offset_; + *buffer_num_bytes = bytes_to_read; + two_phase_max_bytes_read_ = bytes_to_read; + + if (had_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + + return MOJO_RESULT_OK; +} + +MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { + base::AutoLock lock(lock_); + if (!in_two_phase_read_) + return MOJO_RESULT_FAILED_PRECONDITION; + + if (in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + CHECK(shared_ring_buffer_); + + MojoResult rv; + if (num_bytes_read > two_phase_max_bytes_read_ || + num_bytes_read % options_.element_num_bytes != 0) { + rv = MOJO_RESULT_INVALID_ARGUMENT; + } else { + rv = MOJO_RESULT_OK; + read_offset_ = + (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; + + DCHECK_GE(bytes_available_, num_bytes_read); + bytes_available_ -= num_bytes_read; + + base::AutoUnlock unlock(lock_); + NotifyRead(num_bytes_read); + } + + in_two_phase_read_ = false; + two_phase_max_bytes_read_ = 0; + + watchers_.NotifyState(GetHandleSignalsStateNoLock()); + + return rv; +} + +HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { + base::AutoLock lock(lock_); + return GetHandleSignalsStateNoLock(); +} + +MojoResult DataPipeConsumerDispatcher::AddWatcherRef( + const scoped_refptr<WatcherDispatcher>& watcher, + uintptr_t context) { + base::AutoLock lock(lock_); + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); +} + +MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( + WatcherDispatcher* watcher, + uintptr_t context) { + base::AutoLock lock(lock_); + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + return watchers_.Remove(watcher, context); +} + +void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, + uint32_t* num_ports, + uint32_t* num_handles) { + base::AutoLock lock(lock_); + DCHECK(in_transit_); + *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); + *num_ports = 1; + *num_handles = 1; +} + +bool DataPipeConsumerDispatcher::EndSerialize( + void* destination, + ports::PortName* ports, + PlatformHandle* platform_handles) { + SerializedState* state = static_cast<SerializedState*>(destination); + memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); + memset(state->padding, 0, sizeof(state->padding)); + + base::AutoLock lock(lock_); + DCHECK(in_transit_); + state->pipe_id = pipe_id_; + state->read_offset = read_offset_; + state->bytes_available = bytes_available_; + state->flags = peer_closed_ ? kFlagPeerClosed : 0; + + ports[0] = control_port_.name(); + + buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); + platform_handles[0] = buffer_handle_for_transit_.get(); + + return true; +} + +bool DataPipeConsumerDispatcher::BeginTransit() { + base::AutoLock lock(lock_); + if (in_transit_) + return false; + in_transit_ = !in_two_phase_read_; + return in_transit_; +} + +void DataPipeConsumerDispatcher::CompleteTransitAndClose() { + node_controller_->SetPortObserver(control_port_, nullptr); + + base::AutoLock lock(lock_); + DCHECK(in_transit_); + in_transit_ = false; + transferred_ = true; + ignore_result(buffer_handle_for_transit_.release()); + CloseNoLock(); +} + +void DataPipeConsumerDispatcher::CancelTransit() { + base::AutoLock lock(lock_); + DCHECK(in_transit_); + in_transit_ = false; + buffer_handle_for_transit_.reset(); + UpdateSignalsStateNoLock(); +} + +// static +scoped_refptr<DataPipeConsumerDispatcher> +DataPipeConsumerDispatcher::Deserialize(const void* data, + size_t num_bytes, + const ports::PortName* ports, + size_t num_ports, + PlatformHandle* handles, + size_t num_handles) { + if (num_ports != 1 || num_handles != 1 || + num_bytes != sizeof(SerializedState)) { + return nullptr; + } + + const SerializedState* state = static_cast<const SerializedState*>(data); + + NodeController* node_controller = internal::g_core->GetNodeController(); + ports::PortRef port; + if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) + return nullptr; + + PlatformHandle buffer_handle; + std::swap(buffer_handle, handles[0]); + scoped_refptr<PlatformSharedBuffer> ring_buffer = + PlatformSharedBuffer::CreateFromPlatformHandle( + state->options.capacity_num_bytes, + false /* read_only */, + ScopedPlatformHandle(buffer_handle)); + if (!ring_buffer) { + DLOG(ERROR) << "Failed to deserialize shared buffer handle."; + return nullptr; + } + + scoped_refptr<DataPipeConsumerDispatcher> dispatcher = + new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, + state->options, false /* initialized */, + state->pipe_id); + + { + base::AutoLock lock(dispatcher->lock_); + dispatcher->read_offset_ = state->read_offset; + dispatcher->bytes_available_ = state->bytes_available; + dispatcher->new_data_available_ = state->bytes_available > 0; + dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; + dispatcher->InitializeNoLock(); + dispatcher->UpdateSignalsStateNoLock(); + } + + return dispatcher; +} + +DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { + DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && + !in_transit_); +} + +void DataPipeConsumerDispatcher::InitializeNoLock() { + lock_.AssertAcquired(); + + if (shared_ring_buffer_) { + DCHECK(!ring_buffer_mapping_); + ring_buffer_mapping_ = + shared_ring_buffer_->Map(0, options_.capacity_num_bytes); + if (!ring_buffer_mapping_) { + DLOG(ERROR) << "Failed to map shared buffer."; + shared_ring_buffer_ = nullptr; + } + } + + base::AutoUnlock unlock(lock_); + node_controller_->SetPortObserver( + control_port_, + make_scoped_refptr(new PortObserverThunk(this))); +} + +MojoResult DataPipeConsumerDispatcher::CloseNoLock() { + lock_.AssertAcquired(); + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + is_closed_ = true; + ring_buffer_mapping_.reset(); + shared_ring_buffer_ = nullptr; + + watchers_.NotifyClosed(); + if (!transferred_) { + base::AutoUnlock unlock(lock_); + node_controller_->ClosePort(control_port_); + } + + return MOJO_RESULT_OK; +} + +HandleSignalsState +DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { + lock_.AssertAcquired(); + + HandleSignalsState rv; + if (shared_ring_buffer_ && bytes_available_) { + if (!in_two_phase_read_) { + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; + if (new_data_available_) + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; + } + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; + } else if (!peer_closed_ && shared_ring_buffer_) { + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; + } + + if (shared_ring_buffer_) { + if (new_data_available_ || !peer_closed_) + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; + } + + if (peer_closed_) + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; + + return rv; +} + +void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " + << num_bytes << " bytes read. [control_port=" + << control_port_.name() << "]"; + + SendDataPipeControlMessage(node_controller_, control_port_, + DataPipeCommand::DATA_WAS_READ, num_bytes); +} + +void DataPipeConsumerDispatcher::OnPortStatusChanged() { + DCHECK(RequestContext::current()); + + base::AutoLock lock(lock_); + + // We stop observing the control port as soon it's transferred, but this can + // race with events which are raised right before that happens. This is fine + // to ignore. + if (transferred_) + return; + + DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; + + UpdateSignalsStateNoLock(); +} + +void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { + lock_.AssertAcquired(); + + bool was_peer_closed = peer_closed_; + size_t previous_bytes_available = bytes_available_; + + ports::PortStatus port_status; + int rv = node_controller_->node()->GetStatus(control_port_, &port_status); + if (rv != ports::OK || !port_status.receiving_messages) { + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" + << " [control_port=" << control_port_.name() << "]"; + peer_closed_ = true; + } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { + ports::ScopedMessage message; + do { + int rv = node_controller_->node()->GetMessage( + control_port_, &message, nullptr); + if (rv != ports::OK) + peer_closed_ = true; + if (message) { + if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { + peer_closed_ = true; + break; + } + + const DataPipeControlMessage* m = + static_cast<const DataPipeControlMessage*>( + message->payload_bytes()); + + if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { + DLOG(ERROR) << "Unexpected control message from producer."; + peer_closed_ = true; + break; + } + + if (static_cast<size_t>(bytes_available_) + m->num_bytes > + options_.capacity_num_bytes) { + DLOG(ERROR) << "Producer claims to have written too many bytes."; + peer_closed_ = true; + break; + } + + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " + << m->num_bytes << " bytes were written. [control_port=" + << control_port_.name() << "]"; + + bytes_available_ += m->num_bytes; + } + } while (message); + } + + bool has_new_data = bytes_available_ != previous_bytes_available; + if (has_new_data) + new_data_available_ = true; + + if (peer_closed_ != was_peer_closed || has_new_data) + watchers_.NotifyState(GetHandleSignalsStateNoLock()); +} + +} // namespace edk +} // namespace mojo |