diff options
Diffstat (limited to 'mojo/public/cpp/system/file_data_pipe_producer.cc')
-rw-r--r-- | mojo/public/cpp/system/file_data_pipe_producer.cc | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/mojo/public/cpp/system/file_data_pipe_producer.cc b/mojo/public/cpp/system/file_data_pipe_producer.cc new file mode 100644 index 0000000000..6038bbe16b --- /dev/null +++ b/mojo/public/cpp/system/file_data_pipe_producer.cc @@ -0,0 +1,289 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/cpp/system/file_data_pipe_producer.h" + +#include <algorithm> +#include <limits> +#include <memory> +#include <utility> + +#include "base/bind.h" +#include "base/callback.h" +#include "base/location.h" +#include "base/memory/ref_counted_delete_on_sequence.h" +#include "base/numerics/safe_conversions.h" +#include "base/sequenced_task_runner.h" +#include "base/synchronization/lock.h" +#include "base/task_scheduler/post_task.h" +#include "base/threading/sequenced_task_runner_handle.h" +#include "mojo/public/cpp/system/simple_watcher.h" + +namespace mojo { + +namespace { + +// No good reason not to attempt very large pipe transactions in case the data +// pipe in use has a very large capacity available, so we default to trying +// 64 MB chunks whenever a producer is writable. +constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024; + +MojoResult FileErrorToMojoResult(base::File::Error error) { + switch (error) { + case base::File::FILE_OK: + return MOJO_RESULT_OK; + case base::File::FILE_ERROR_NOT_FOUND: + return MOJO_RESULT_NOT_FOUND; + case base::File::FILE_ERROR_SECURITY: + case base::File::FILE_ERROR_ACCESS_DENIED: + return MOJO_RESULT_PERMISSION_DENIED; + case base::File::FILE_ERROR_TOO_MANY_OPENED: + case base::File::FILE_ERROR_NO_MEMORY: + return MOJO_RESULT_RESOURCE_EXHAUSTED; + case base::File::FILE_ERROR_ABORT: + return MOJO_RESULT_ABORTED; + default: + return MOJO_RESULT_UNKNOWN; + } +} + +} // namespace + +class FileDataPipeProducer::FileSequenceState + : public base::RefCountedDeleteOnSequence<FileSequenceState> { + public: + using CompletionCallback = + base::OnceCallback<void(ScopedDataPipeProducerHandle producer, + MojoResult result)>; + + FileSequenceState( + ScopedDataPipeProducerHandle producer_handle, + scoped_refptr<base::SequencedTaskRunner> file_task_runner, + CompletionCallback callback, + scoped_refptr<base::SequencedTaskRunner> callback_task_runner, + std::unique_ptr<Observer> observer) + : base::RefCountedDeleteOnSequence<FileSequenceState>( + std::move(file_task_runner)), + callback_task_runner_(std::move(callback_task_runner)), + producer_handle_(std::move(producer_handle)), + callback_(std::move(callback)), + observer_(std::move(observer)) {} + + void Cancel() { + base::AutoLock lock(lock_); + is_cancelled_ = true; + } + + void StartFromFile(base::File file, size_t max_bytes) { + owning_task_runner()->PostTask( + FROM_HERE, + base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this, + std::move(file), max_bytes)); + } + + void StartFromPath(const base::FilePath& path) { + owning_task_runner()->PostTask( + FROM_HERE, + base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this, + path)); + } + + private: + friend class base::DeleteHelper<FileSequenceState>; + friend class base::RefCountedDeleteOnSequence<FileSequenceState>; + + ~FileSequenceState() = default; + + void StartFromFileOnFileSequence(base::File file, size_t max_bytes) { + if (file.error_details() != base::File::FILE_OK) { + Finish(FileErrorToMojoResult(file.error_details())); + return; + } + file_ = std::move(file); + max_bytes_ = max_bytes; + TransferSomeBytes(); + if (producer_handle_.is_valid()) { + // If we didn't nail it all on the first transaction attempt, setup a + // watcher and complete the read asynchronously. + watcher_ = std::make_unique<SimpleWatcher>( + FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC, + base::SequencedTaskRunnerHandle::Get()); + watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, + MOJO_WATCH_CONDITION_SATISFIED, + base::Bind(&FileSequenceState::OnHandleReady, this)); + } + } + + void StartFromPathOnFileSequence(const base::FilePath& path) { + StartFromFileOnFileSequence( + base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ), + std::numeric_limits<size_t>::max()); + } + + void OnHandleReady(MojoResult result, const HandleSignalsState& state) { + { + // Stop ourselves from doing redundant work if we've been cancelled from + // another thread. Note that we do not rely on this for any kind of thread + // safety concerns. + base::AutoLock lock(lock_); + if (is_cancelled_) + return; + } + + if (result != MOJO_RESULT_OK) { + // Either the consumer pipe has been closed or something terrible + // happened. In any case, we'll never be able to write more data. + Finish(result); + return; + } + + TransferSomeBytes(); + } + + void TransferSomeBytes() { + while (true) { + // Lock as much of the pipe as we can. + void* pipe_buffer; + uint32_t size = kDefaultMaxReadSize; + MojoResult result = producer_handle_->BeginWriteData( + &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE); + if (result == MOJO_RESULT_SHOULD_WAIT) + return; + if (result != MOJO_RESULT_OK) { + Finish(result); + return; + } + + // Attempt to read that many bytes from the file, directly into the data + // pipe. Note that while |max_bytes_remaining| may be very large, the + // length we attempt read is bounded by the much smaller + // |kDefaultMaxReadSize| via |size|. + DCHECK(base::IsValueInRangeForNumericType<int>(size)); + const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_; + int attempted_read_size = static_cast<int>( + std::min(static_cast<size_t>(size), max_bytes_remaining)); + int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer), + attempted_read_size); + base::File::Error read_error; + if (read_size < 0) { + read_error = base::File::GetLastFileError(); + DCHECK_NE(base::File::FILE_OK, read_error); + if (observer_) + observer_->OnBytesRead(pipe_buffer, 0u, read_error); + } else { + read_error = base::File::FILE_OK; + if (observer_) { + observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size), + base::File::FILE_OK); + } + } + producer_handle_->EndWriteData( + read_size >= 0 ? static_cast<uint32_t>(read_size) : 0); + + if (read_size < 0) { + Finish(FileErrorToMojoResult(read_error)); + return; + } + + bytes_transferred_ += read_size; + DCHECK_LE(bytes_transferred_, max_bytes_); + + if (read_size < attempted_read_size) { + // ReadAtCurrentPos makes a best effort to read all requested bytes. We + // reasonably assume if it fails to read what we ask for, we've hit EOF. + Finish(MOJO_RESULT_OK); + return; + } + + if (bytes_transferred_ == max_bytes_) { + // We've read as much as we were asked to read. + Finish(MOJO_RESULT_OK); + return; + } + } + } + + void Finish(MojoResult result) { + if (observer_) { + observer_->OnDoneReading(); + observer_ = nullptr; + } + watcher_.reset(); + callback_task_runner_->PostTask( + FROM_HERE, base::BindOnce(std::move(callback_), + std::move(producer_handle_), result)); + } + + const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_; + + // State which is effectively owned and used only on the file sequence. + ScopedDataPipeProducerHandle producer_handle_; + base::File file_; + size_t max_bytes_ = 0; + size_t bytes_transferred_ = 0; + CompletionCallback callback_; + std::unique_ptr<SimpleWatcher> watcher_; + + // Guards |is_cancelled_|. + base::Lock lock_; + bool is_cancelled_ = false; + std::unique_ptr<Observer> observer_; + + DISALLOW_COPY_AND_ASSIGN(FileSequenceState); +}; + +FileDataPipeProducer::FileDataPipeProducer( + ScopedDataPipeProducerHandle producer, + std::unique_ptr<Observer> observer) + : producer_(std::move(producer)), + observer_(std::move(observer)), + weak_factory_(this) {} + +FileDataPipeProducer::~FileDataPipeProducer() { + if (file_sequence_state_) + file_sequence_state_->Cancel(); +} + +void FileDataPipeProducer::WriteFromFile(base::File file, + CompletionCallback callback) { + WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(), + std::move(callback)); +} + +void FileDataPipeProducer::WriteFromFile(base::File file, + size_t max_bytes, + CompletionCallback callback) { + InitializeNewRequest(std::move(callback)); + file_sequence_state_->StartFromFile(std::move(file), max_bytes); +} + +void FileDataPipeProducer::WriteFromPath(const base::FilePath& path, + CompletionCallback callback) { + InitializeNewRequest(std::move(callback)); + file_sequence_state_->StartFromPath(path); +} + +void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) { + DCHECK(!file_sequence_state_); + + LOG(FATAL) << "unsupported in libchrome"; + // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits( + // {base::MayBlock(), base::TaskPriority::BACKGROUND}); + // file_sequence_state_ = new FileSequenceState( + // std::move(producer_), file_task_runner, + // base::BindOnce(&FileDataPipeProducer::OnWriteComplete, + // weak_factory_.GetWeakPtr(), std::move(callback)), + // base::SequencedTaskRunnerHandle::Get(), std::move(observer_)); +} + +void FileDataPipeProducer::OnWriteComplete( + CompletionCallback callback, + ScopedDataPipeProducerHandle producer, + MojoResult ready_result) { + producer_ = std::move(producer); + file_sequence_state_ = nullptr; + std::move(callback).Run(ready_result); +} + +} // namespace mojo |