summaryrefslogtreecommitdiff
path: root/src/prefetcher/prefetcher_daemon.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/prefetcher/prefetcher_daemon.cc')
-rw-r--r--src/prefetcher/prefetcher_daemon.cc1367
1 files changed, 0 insertions, 1367 deletions
diff --git a/src/prefetcher/prefetcher_daemon.cc b/src/prefetcher/prefetcher_daemon.cc
deleted file mode 100644
index f4b9087..0000000
--- a/src/prefetcher/prefetcher_daemon.cc
+++ /dev/null
@@ -1,1367 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "prefetcher/minijail.h"
-#include "common/cmd_utils.h"
-#include "prefetcher/prefetcher_daemon.h"
-#include "prefetcher/session_manager.h"
-#include "prefetcher/session.h"
-
-#include <android-base/logging.h>
-#include <android-base/properties.h>
-
-#include <deque>
-#include <iomanip>
-#include <string>
-#include <sstream>
-#include <vector>
-
-#include <fcntl.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-namespace iorap::prefetcher {
-
-// Gate super-spammy IPC logging behind a property.
-// This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower.
-static bool LogVerboseIpc() {
- static bool initialized = false;
- static bool verbose_ipc;
-
- if (initialized == false) {
- initialized = true;
-
- verbose_ipc =
- ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false);
- }
-
- return verbose_ipc;
-}
-
-static const bool kInstallMiniJail =
- ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true);
-
-static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd";
-
-static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size
-
-using ArgString = const char*;
-
-std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) {
- switch (ps) {
- case ReadAheadKind::kFadvise:
- os << "fadvise";
- break;
- case ReadAheadKind::kMmapLocked:
- os << "mmap";
- break;
- case ReadAheadKind::kMlock:
- os << "mlock";
- break;
- default:
- os << "<invalid>";
- }
- return os;
-}
-
-std::ostream& operator<<(std::ostream& os, CommandChoice choice) {
- switch (choice) {
- case CommandChoice::kRegisterFilePath:
- os << "kRegisterFilePath";
- break;
- case CommandChoice::kUnregisterFilePath:
- os << "kUnregisterFilePath";
- break;
- case CommandChoice::kReadAhead:
- os << "kReadAhead";
- break;
- case CommandChoice::kExit:
- os << "kExit";
- break;
- case CommandChoice::kCreateSession:
- os << "kCreateSession";
- break;
- case CommandChoice::kDestroySession:
- os << "kDestroySession";
- break;
- case CommandChoice::kDumpSession:
- os << "kDumpSession";
- break;
- case CommandChoice::kDumpEverything:
- os << "kDumpEverything";
- break;
- case CommandChoice::kCreateFdSession:
- os << "kCreateFdSession";
- break;
- default:
- CHECK(false) << "forgot to handle this choice";
- break;
- }
- return os;
-}
-
-std::ostream& operator<<(std::ostream& os, const Command& command) {
- os << "Command{";
- os << "choice=" << command.choice << ",";
-
- bool has_session_id = true;
- bool has_id = true;
- switch (command.choice) {
- case CommandChoice::kDumpEverything:
- case CommandChoice::kExit:
- has_session_id = false;
- FALLTHROUGH_INTENDED;
- case CommandChoice::kCreateFdSession:
- case CommandChoice::kCreateSession:
- case CommandChoice::kDestroySession:
- case CommandChoice::kDumpSession:
- has_id = false;
- break;
- default:
- break;
- }
-
- if (has_session_id) {
- os << "sid=" << command.session_id << ",";
- }
-
- if (has_id) {
- os << "id=" << command.id << ",";
- }
-
- switch (command.choice) {
- case CommandChoice::kRegisterFilePath:
- os << "file_path=";
-
- if (command.file_path) {
- os << *(command.file_path);
- } else {
- os << "(nullopt)";
- }
- break;
- case CommandChoice::kUnregisterFilePath:
- break;
- case CommandChoice::kReadAhead:
- os << "read_ahead_kind=" << command.read_ahead_kind << ",";
- os << "length=" << command.length << ",";
- os << "offset=" << command.offset << ",";
- break;
- case CommandChoice::kExit:
- break;
- case CommandChoice::kCreateFdSession:
- os << "fd=";
- if (command.fd.has_value()) {
- os << command.fd.value();
- } else {
- os << "(nullopt)";
- }
- os << ",";
- FALLTHROUGH_INTENDED;
- case CommandChoice::kCreateSession:
- os << "description=";
- if (command.file_path) {
- os << "'" << *(command.file_path) << "'";
- } else {
- os << "(nullopt)";
- }
- break;
- case CommandChoice::kDestroySession:
- break;
- case CommandChoice::kDumpSession:
- break;
- case CommandChoice::kDumpEverything:
- break;
- default:
- CHECK(false) << "forgot to handle this choice";
- break;
- }
-
- os << "}";
-
- return os;
-}
-
-template <typename T>
-struct ParseResult {
- T value;
- char* next_token;
- size_t stream_size;
-
- ParseResult() : value{}, next_token{nullptr}, stream_size{} {
- }
-
- constexpr operator bool() const {
- return next_token != nullptr;
- }
-};
-
-// Very spammy: Keep it off by default. Set to true if changing this code.
-static constexpr bool kDebugParsingRead = false;
-
-#define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead "
-
-
-
-// Parse a strong type T from a buffer stream.
-// If there's insufficient space left to parse the value, an empty ParseResult is returned.
-template <typename T>
-ParseResult<T> ParsingRead(char* stream, size_t stream_size) {
- if (stream == nullptr) {
- DEBUG_PREAD << "stream was null";
- return {};
- }
-
- if constexpr (std::is_same_v<T, std::string>) {
- ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size);
-
- if (!length) {
- DEBUG_PREAD << "could not find length";
- // Not enough bytes left?
- return {};
- }
-
- ParseResult<std::string> string_result;
- string_result.value.reserve(length);
-
- stream = length.next_token;
- stream_size = length.stream_size;
-
- for (size_t i = 0; i < length.value; ++i) {
- ParseResult<char> char_result = ParsingRead<char>(stream, stream_size);
-
- stream = char_result.next_token;
- stream_size = char_result.stream_size;
-
- if (!char_result) {
- DEBUG_PREAD << "too few chars in stream, expected length: " << length.value;
- // Not enough bytes left?
- return {};
- }
-
- string_result.value += char_result.value;
-
- DEBUG_PREAD << "string preliminary is : " << string_result.value;
- }
-
- DEBUG_PREAD << "parsed string to: " << string_result.value;
- string_result.next_token = stream;
- return string_result;
- } else {
- if (sizeof(T) > stream_size) {
- return {};
- }
-
- ParseResult<T> result;
- result.next_token = stream + sizeof(T);
- result.stream_size = stream_size - sizeof(T);
-
- memcpy(&result.value, stream, sizeof(T));
-
- return result;
- }
-}
-
-// Convenience overload to chain multiple ParsingRead together.
-template <typename T, typename U>
-ParseResult<T> ParsingRead(ParseResult<U> result) {
- return ParsingRead<T>(result.next_token, result.stream_size);
-}
-
-class CommandParser {
- public:
- CommandParser(PrefetcherForkParameters params) {
- params_ = params;
- }
-
- std::vector<Command> ParseSocketCommands(bool& eof) {
- eof = false;
-
- std::vector<Command> commands_vec;
-
- std::vector<char> buf_vector;
- buf_vector.resize(1024*1024); // 1MB.
- char* buf = &buf_vector[0];
-
- // Binary only parsing. The higher level code can parse text
- // with ifstream if it really wants to.
- char* stream = &buf[0];
- size_t stream_size = buf_vector.size();
-
- while (true) {
- if (stream_size == 0) {
- // TODO: reply with an overflow command.
- LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
- stream = &buf[0];
- stream_size = buf_vector.size();
- memset(&buf[0], /*c*/0, buf_vector.size());
- }
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")";
- }
-
- ssize_t count;
- struct msghdr hdr;
- memset(&hdr, 0, sizeof(hdr));
-
- {
- union {
- struct cmsghdr cmh;
- char control[CMSG_SPACE(sizeof(int))];
- } control_un;
- memset(&control_un, 0, sizeof(control_un));
-
- /* Set 'control_un' to describe ancillary data that we want to receive */
- control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */
- control_un.cmh.cmsg_level = SOL_SOCKET;
- control_un.cmh.cmsg_type = SCM_CREDENTIALS;
-
- // the regular message data will be read into stream
- struct iovec iov;
- memset(&iov, 0, sizeof(iov));
- iov.iov_base = stream;
- iov.iov_len = stream_size;
-
- /* Set hdr fields to describe 'control_un' */
- hdr.msg_control = control_un.control;
- hdr.msg_controllen = sizeof(control_un.control);
- hdr.msg_iov = &iov;
- hdr.msg_iovlen = 1;
- hdr.msg_name = nullptr; /* no peer address */
- hdr.msg_namelen = 0;
-
- count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0));
- }
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size;
- }
-
- if (count < 0) {
- PLOG(ERROR) << "failed to recvmsg from input fd";
- break;
- // TODO: let the daemon be restarted by higher level code?
- } else if (count == 0) {
- LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
- eof = true;
- break;
- // TODO: let the daemon be restarted by higher level code?
- }
-
- {
- /* Extract fd from ancillary data if present */
- struct cmsghdr* hp;
- hp = CMSG_FIRSTHDR(&hdr);
- if (hp &&
- // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing?
- // (hp->cmsg_len == CMSG_LEN(sizeof(int))) &&
- (hp->cmsg_level == SOL_SOCKET) &&
- (hp->cmsg_type == SCM_RIGHTS)) {
-
- int passed_fd = *(int*) CMSG_DATA(hp);
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd;
- }
-
- // tack the FD into our dequeue.
- // we assume the FDs are sent in-order same as the regular iov are sent in-order.
- longbuf_fds_.insert(longbuf_fds_.end(), passed_fd);
- } else if (hp != nullptr) {
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS,"
- << "cmsg_len=" << hp->cmsg_len << ","
- << "cmsg_level=" << hp->cmsg_level << ","
- << "cmsg_type=" << hp->cmsg_type;
- }
- }
- }
-
- longbuf_.insert(longbuf_.end(), stream, stream + count);
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
- }
-
- // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]*
- {
- if (longbuf_.size() == 0) {
- break;
- }
-
- std::vector<char> v(longbuf_.begin(),
- longbuf_.end());
-
- std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()};
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
- if (WOULD_LOG(VERBOSE)) {
- std::stringstream dump;
- dump << std::hex << std::setfill('0');
- for (size_t i = 0; i < v.size(); ++i) {
- dump << std::setw(2) << static_cast<unsigned>(v[i]);
- }
-
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
- }
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size();
- if (WOULD_LOG(VERBOSE)) {
- std::stringstream dump;
- for (size_t i = 0; i < v_fds.size(); ++i) {
- dump << v_fds[i] << ", ";
- }
-
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str();
- }
-
- }
-
- size_t v_fds_off = 0;
- size_t consumed_fds_total = 0;
-
- size_t v_off = 0;
- size_t consumed_bytes = std::numeric_limits<size_t>::max();
- size_t consumed_total = 0;
-
- while (true) {
- std::optional<Command> maybe_command;
- maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
- consumed_total += consumed_bytes;
- // Normal every time we get to the end of a buffer.
- if (!maybe_command) {
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
- }
- break;
- }
-
- if (maybe_command->RequiresFd()) {
- if (v_fds_off < v_fds.size()) {
- maybe_command->fd = v_fds[v_fds_off++];
- consumed_fds_total++;
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "Append the FD to " << *maybe_command;
- }
- } else {
- LOG(WARNING) << "Failed to acquire FD for " << *maybe_command;
- }
- }
-
- // in the next pass ignore what we already consumed.
- v_off += consumed_bytes;
-
- // true as long we don't hit the 'break' above.
- DCHECK_EQ(v_off, consumed_total);
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
- << "," << *maybe_command;
-
- // Pretty-print a single command for debugging/testing.
- LOG(VERBOSE) << *maybe_command;
- }
-
- // add to the commands we parsed.
- commands_vec.push_back(*maybe_command);
- }
-
- // erase however many were consumed
- longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
-
- // erase however many FDs were consumed.
- longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total);
- }
- break;
- }
-
- return commands_vec;
- }
-
- std::vector<Command> ParseCommands(bool& eof) {
- eof = false;
-
- std::vector<Command> commands_vec;
-
- std::vector<char> buf_vector;
- buf_vector.resize(kPipeBufferSize);
- char* buf = &buf_vector[0];
-
- // Binary only parsing. The higher level code can parse text
- // with ifstream if it really wants to.
- char* stream = &buf[0];
- size_t stream_size = buf_vector.size();
-
- while (true) {
- if (stream_size == 0) {
- // TODO: reply with an overflow command.
- LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
- stream = &buf[0];
- stream_size = buf_vector.size();
- memset(&buf[0], /*c*/0, buf_vector.size());
- }
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")";
- }
- ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size));
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size;
- }
-
- if (count < 0) {
- PLOG(ERROR) << "failed to read from input fd";
- break;
- // TODO: let the daemon be restarted by higher level code?
- } else if (count == 0) {
- LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
- eof = true;
- break;
- // TODO: let the daemon be restarted by higher level code?
- }
-
- longbuf_.insert(longbuf_.end(), stream, stream + count);
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
- }
-
- std::optional<Command> maybe_command;
- {
- if (longbuf_.size() == 0) {
- break;
- }
-
- std::vector<char> v(longbuf_.begin(),
- longbuf_.end());
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
- if (WOULD_LOG(VERBOSE)) {
- std::stringstream dump;
- dump << std::hex << std::setfill('0');
- for (size_t i = 0; i < v.size(); ++i) {
- dump << std::setw(2) << static_cast<unsigned>(v[i]);
- }
-
- LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
- }
- }
-
- size_t v_off = 0;
- size_t consumed_bytes = std::numeric_limits<size_t>::max();
- size_t consumed_total = 0;
-
- while (true) {
- maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
- consumed_total += consumed_bytes;
- // Normal every time we get to the end of a buffer.
- if (!maybe_command) {
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
- }
- break;
- }
-
- // in the next pass ignore what we already consumed.
- v_off += consumed_bytes;
-
- // true as long we don't hit the 'break' above.
- DCHECK_EQ(v_off, consumed_total);
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
- << "," << *maybe_command;
-
- // Pretty-print a single command for debugging/testing.
- LOG(VERBOSE) << *maybe_command;
- }
-
- // add to the commands we parsed.
- commands_vec.push_back(*maybe_command);
- }
-
- // erase however many were consumed
- longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
- }
- break;
- }
-
- return commands_vec;
- }
-
- private:
- bool IsTextMode() const {
- return params_.format_text;
- }
-
- PrefetcherForkParameters params_;
-
- // A buffer long enough to contain a lot of buffers.
- // This handles reads that only contain a partial command.
- std::deque<char> longbuf_;
-
- // File descriptor buffers.
- std::deque<int> longbuf_fds_;
-};
-
-static constexpr bool kDebugCommandRead = true;
-
-#define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read "
-
-std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) {
- *consumed_bytes = 0;
- if (buf == nullptr) {
- return std::nullopt;
- }
-
- Command cmd{}; // zero-initialize any unused fields
- ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size);
- cmd.choice = parsed_choice.value;
-
- if (!parsed_choice) {
- DEBUG_READ << "no choice";
- return std::nullopt;
- }
-
- switch (parsed_choice.value) {
- case CommandChoice::kRegisterFilePath: {
- ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
- if (!parsed_session_id) {
- DEBUG_READ << "no parsed session id";
- return std::nullopt;
- }
-
- ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
- if (!parsed_id) {
- DEBUG_READ << "no parsed id";
- return std::nullopt;
- }
-
- ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id);
-
- if (!parsed_file_path) {
- DEBUG_READ << "no file path";
- return std::nullopt;
- }
- *consumed_bytes = parsed_file_path.next_token - buf;
-
- cmd.session_id = parsed_session_id.value;
- cmd.id = parsed_id.value;
- cmd.file_path = parsed_file_path.value;
-
- break;
- }
- case CommandChoice::kUnregisterFilePath: {
- ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
- if (!parsed_session_id) {
- DEBUG_READ << "no parsed session id";
- return std::nullopt;
- }
-
- ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
- if (!parsed_id) {
- DEBUG_READ << "no parsed id";
- return std::nullopt;
- }
- *consumed_bytes = parsed_id.next_token - buf;
-
- cmd.session_id = parsed_session_id.value;
- cmd.id = parsed_id.value;
-
- break;
- }
- case CommandChoice::kReadAhead: {
- ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
- if (!parsed_session_id) {
- DEBUG_READ << "no parsed session id";
- return std::nullopt;
- }
-
- ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
- if (!parsed_id) {
- DEBUG_READ << "no parsed id";
- return std::nullopt;
- }
-
- ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id);
- if (!parsed_kind) {
- DEBUG_READ << "no parsed kind";
- return std::nullopt;
- }
- ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind);
- if (!parsed_length) {
- DEBUG_READ << "no parsed length";
- return std::nullopt;
- }
- ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length);
- if (!parsed_offset) {
- DEBUG_READ << "no parsed offset";
- return std::nullopt;
- }
- *consumed_bytes = parsed_offset.next_token - buf;
-
- cmd.session_id = parsed_session_id.value;
- cmd.id = parsed_id.value;
- cmd.read_ahead_kind = parsed_kind.value;
- cmd.length = parsed_length.value;
- cmd.offset = parsed_offset.value;
-
- break;
- }
- case CommandChoice::kCreateSession:
- case CommandChoice::kCreateFdSession: {
- ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
- if (!parsed_session_id) {
- DEBUG_READ << "no parsed session id";
- return std::nullopt;
- }
-
- ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id);
-
- if (!parsed_description) {
- DEBUG_READ << "no description";
- return std::nullopt;
- }
- *consumed_bytes = parsed_description.next_token - buf;
-
- cmd.session_id = parsed_session_id.value;
- cmd.file_path = parsed_description.value;
-
- break;
- }
- case CommandChoice::kDestroySession:
- case CommandChoice::kDumpSession: {
- ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
- if (!parsed_session_id) {
- DEBUG_READ << "no parsed session id";
- return std::nullopt;
- }
-
- *consumed_bytes = parsed_session_id.next_token - buf;
-
- cmd.session_id = parsed_session_id.value;
-
- break;
- }
- case CommandChoice::kExit:
- case CommandChoice::kDumpEverything:
- *consumed_bytes = parsed_choice.next_token - buf;
- // Only need to parse the choice.
- break;
- default:
- LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value);
- break;
- }
-
- return cmd;
-}
-
-bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const {
- *produced_bytes = 0;
- if (buf == nullptr) {
- LOG(WARNING) << "null buf, is this expected?";
- return false;
- }
-
- bool has_enough_space = false;
- size_t space_requirement = std::numeric_limits<size_t>::max();
-
- space_requirement = sizeof(choice);
-
- switch (choice) {
- case CommandChoice::kRegisterFilePath:
- space_requirement += sizeof(session_id);
- space_requirement += sizeof(id);
- space_requirement += sizeof(uint32_t); // string length
-
- if (!file_path) {
- LOG(WARNING) << "Missing file path for kRegisterFilePath";
- return false;
- }
-
- space_requirement += file_path->size(); // string contents
- break;
- case CommandChoice::kUnregisterFilePath:
- space_requirement += sizeof(session_id);
- space_requirement += sizeof(id);
- break;
- case CommandChoice::kReadAhead:
- space_requirement += sizeof(session_id);
- space_requirement += sizeof(id);
- space_requirement += sizeof(read_ahead_kind);
- space_requirement += sizeof(length);
- space_requirement += sizeof(offset);
- break;
- case CommandChoice::kCreateSession:
- case CommandChoice::kCreateFdSession:
- space_requirement += sizeof(session_id);
- space_requirement += sizeof(uint32_t); // string length
-
- if (!file_path) {
- LOG(WARNING) << "Missing file path for kCreateSession";
- return false;
- }
-
- space_requirement += file_path->size(); // string contents
- break;
- case CommandChoice::kDestroySession:
- case CommandChoice::kDumpSession:
- space_requirement += sizeof(session_id);
- break;
- case CommandChoice::kExit:
- case CommandChoice::kDumpEverything:
- // Only need space for the choice.
- break;
- default:
- LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice);
- break;
- }
-
- if (buf_size < space_requirement) {
- return false;
- }
-
- *produced_bytes = space_requirement;
-
- // Always write out the choice.
- size_t buf_offset = 0;
-
- memcpy(&buf[buf_offset], &choice, sizeof(choice));
- buf_offset += sizeof(choice);
-
- switch (choice) {
- case CommandChoice::kRegisterFilePath:
- memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
- buf_offset += sizeof(session_id);
- memcpy(&buf[buf_offset], &id, sizeof(id));
- buf_offset += sizeof(id);
-
- {
- uint32_t string_length = static_cast<uint32_t>(file_path->size());
- memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
- buf_offset += sizeof(string_length);
- }
-
- DCHECK(file_path.has_value());
-
- memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
- buf_offset += file_path->size();
- break;
- case CommandChoice::kUnregisterFilePath:
- memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
- buf_offset += sizeof(session_id);
- memcpy(&buf[buf_offset], &id, sizeof(id));
- buf_offset += sizeof(id);
- break;
- case CommandChoice::kReadAhead:
- memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
- buf_offset += sizeof(session_id);
- memcpy(&buf[buf_offset], &id, sizeof(id));
- buf_offset += sizeof(id);
- memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind));
- buf_offset += sizeof(read_ahead_kind);
- memcpy(&buf[buf_offset], &length, sizeof(length));
- buf_offset += sizeof(length);
- memcpy(&buf[buf_offset], &offset, sizeof(offset));
- buf_offset += sizeof(offset);
- break;
- case CommandChoice::kCreateSession:
- case CommandChoice::kCreateFdSession:
- memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
- buf_offset += sizeof(session_id);
-
- {
- uint32_t string_length = static_cast<uint32_t>(file_path->size());
- memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
- buf_offset += sizeof(string_length);
- }
-
- DCHECK(file_path.has_value());
-
- memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
- buf_offset += file_path->size();
-
- DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size();
- DCHECK_EQ(buf_offset, *produced_bytes) << *this;
-
- break;
- case CommandChoice::kDestroySession:
- case CommandChoice::kDumpSession:
- memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
- buf_offset += sizeof(session_id);
- break;
- case CommandChoice::kExit:
- case CommandChoice::kDumpEverything:
- // Only need to write out the choice.
- break;
- default:
- LOG(FATAL) << "should have fallen out in the above switch"
- << static_cast<uint32_t>(choice);
- break;
- }
-
- DCHECK_EQ(buf_offset, space_requirement) << *this;
- DCHECK_EQ(buf_offset, *produced_bytes) << *this;
-
- return true;
-}
-
-class PrefetcherDaemon::Impl {
- public:
- std::optional<PrefetcherForkParameters> StartPipesViaFork() {
- int pipefds[2];
- if (pipe(&pipefds[0]) != 0) {
- PLOG(FATAL) << "Failed to create read/write pipes";
- }
-
- if (WOULD_LOG(VERBOSE)) {
- long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ));
- if (pipe_size < 0) {
- PLOG(ERROR) << "Failed to F_GETPIPE_SZ:";
- }
- LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size;
- }
-
- for (int i = 0; i < 2; ++i) {
- // Default pipe size is usually 64KB.
- // Increase to 1MB so that iorapd has to rarely run during prefetching.
- if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) {
- PLOG(FATAL) << "Failed to increase pipe size to max";
- }
- }
-
- pipefd_read_ = pipefds[0];
- pipefd_write_ = pipefds[1];
-
- PrefetcherForkParameters params;
- params.input_fd = pipefd_read_;
- params.output_fd = pipefd_write_;
- params.format_text = false;
- params.use_sockets = false;
-
- bool res = StartViaFork(params);
- if (res) {
- return params;
- } else {
- return std::nullopt;
- }
- }
-
-std::optional<PrefetcherForkParameters> StartSocketViaFork() {
- int socket_fds[2];
- if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) {
- PLOG(FATAL) << "Failed to create read/write socketpair";
- }
-
- pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader
- pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer
-
- PrefetcherForkParameters params;
- params.input_fd = pipefd_read_;
- params.output_fd = pipefd_write_;
- params.format_text = false;
- params.use_sockets = true;
-
- bool res = StartViaFork(params);
- if (res) {
- return params;
- } else {
- return std::nullopt;
- }
- }
-
- bool StartViaFork(PrefetcherForkParameters params) {
- params_ = params;
-
- forked_ = true;
- child_ = fork();
-
- if (child_ == -1) {
- LOG(FATAL) << "Failed to fork PrefetcherDaemon";
- } else if (child_ > 0) { // we are the caller of this function
- LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_;
-
- return true;
- } else {
- // we are the child that was forked.
- std::stringstream argv; // for logging
- std::vector<std::string> argv_vec;
-
- {
- std::stringstream s;
- s << "--input-fd";
- argv_vec.push_back(s.str());
-
- std::stringstream s2;
- s2 << params.input_fd;
- argv_vec.push_back(s2.str());
-
- argv << " --input-fd" << " " << params.input_fd;
- }
-
- {
- std::stringstream s;
- s << "--output-fd";
- argv_vec.push_back(s.str());
-
- std::stringstream s2;
- s2 << params.output_fd;
- argv_vec.push_back(s2.str());
-
- argv << " --output-fd" << " " << params.output_fd;
- }
-
-
- if (params.use_sockets) {
- std::stringstream s;
- s << "--use-sockets";
- argv_vec.push_back(s.str());
-
- argv << " --use-sockets";
- }
-
- if (WOULD_LOG(VERBOSE)) {
- std::stringstream s;
- s << "--verbose";
- argv_vec.push_back(s.str());
-
- argv << " --verbose";
- }
-
- std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec);
-
- LOG(DEBUG) << "fork+exec: " << kCommandFileName << " "
- << argv.str();
- execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr);
- // This should never return.
- _exit(EXIT_FAILURE);
- }
-
- DCHECK(false);
- return false;
- }
-
- // TODO: Not very useful since this can never return 'true'
- // -> in the child we would've already execd which loses all this code.
- bool IsDaemon() {
- // In the child the pid is always 0.
- return child_ > 0;
- }
-
- bool Main(PrefetcherForkParameters params) {
- LOG(VERBOSE) << "PrefetcherDaemon::Main " << params;
-
- CommandParser command_parser{params};
-
- Command next_command{};
-
- std::vector<Command> many_commands;
-
- // Ensure alogd is pre-initialized before installing minijail.
- LOG(DEBUG) << "Installing minijail";
-
- // Install seccomp filter using libminijail.
- if (kInstallMiniJail) {
- MiniJail();
- }
-
- while (true) {
- bool eof = false;
-
- if (params.use_sockets) {
- // use recvmsg(2). supports receiving FDs.
- many_commands = command_parser.ParseSocketCommands(/*out*/eof);
- } else {
- // use read(2). does not support receiving FDs.
- many_commands = command_parser.ParseCommands(/*out*/eof);
- }
-
- if (eof) {
- LOG(WARNING) << "PrefetcherDaemon got EOF, terminating";
- return true;
- }
-
- for (auto& command : many_commands) {
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "PrefetcherDaemon got command: " << command;
- }
-
- if (command.choice == CommandChoice::kExit) {
- LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating";
- return true;
- }
-
- if (!ReceiveCommand(command)) {
- // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command;
- }
-
- // ReceiveCommand should dup to keep the FD. Avoid leaks.
- if (command.fd.has_value()) {
- close(*command.fd);
- }
- }
- }
-
- LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating";
-
- return true;
- // Terminate.
- }
-
- Impl(PrefetcherDaemon* daemon) {
- session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect);
- DCHECK(session_manager_ != nullptr);
- };
-
- ~Impl() {
- // Don't do anything if we never called 'StartViaFork'
- if (forked_) {
- if (!IsDaemon()) {
- int status;
- waitpid(child_, /*out*/&status, /*options*/0);
- } else {
- LOG(WARNING) << "execve should have avoided this path";
- // DCHECK(false) << "not possible because the execve would avoid this path";
- }
- }
- }
-
- bool SendCommand(const Command& command) {
- // Only parent is the sender.
- DCHECK(forked_);
- //DCHECK(!IsDaemon());
-
- char buf[1024];
- size_t stream_size;
- if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) {
- PLOG(ERROR) << "Failed to serialize command: " << command;
- return false;
- }
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf
- << ", size=" << stream_size<< ")";
- }
-
- if (params_.use_sockets) {
- /* iov contains the normal message (Command) */
- struct iovec iov;
- memset(&iov, 0, sizeof(iov));
- iov.iov_base = &buf[0];
- iov.iov_len = stream_size;
-
- struct msghdr msg;
- memset(&msg, 0, sizeof(msg));
-
- /* point to iov to transmit */
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
- /* no dest address; socket is connected */
- msg.msg_name = nullptr;
- msg.msg_namelen = 0;
-
- // append a CMSG with SCM_RIGHTS if we have an FD.
- if (command.fd.has_value()) {
- union {
- struct cmsghdr cmh;
- char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */
- } control_un;
- memset(&control_un, 0, sizeof(control_un));
-
- msg.msg_control = &control_un.control[0];
- msg.msg_controllen = sizeof(control_un.control);
-
- struct cmsghdr *hp;
- hp = CMSG_FIRSTHDR(&msg);
- hp->cmsg_len = CMSG_LEN(sizeof(int));
- hp->cmsg_level = SOL_SOCKET;
- hp->cmsg_type = SCM_RIGHTS;
- *((int *) CMSG_DATA(hp)) = *(command.fd);
-
- DCHECK(command.RequiresFd()) << command;
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd);
- }
- }
-
- // TODO: add CMSG for the FD passage.
-
- if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) {
- PLOG(ERROR) << "Failed to sendmsg command: " << command;
- return false;
- }
- } else {
- if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) {
- PLOG(ERROR) << "Failed to write command: " << command;
- return false;
- }
- }
-
- if (LogVerboseIpc()) {
- LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf
- << ", size=" << stream_size<< ")";
- }
-
- // TODO: also read the reply?
- return true;
- }
-
- bool ReceiveCommand(const Command& command) {
- // Only child is the command receiver.
- // DCHECK(IsDaemon());
-
- switch (command.choice) {
- case CommandChoice::kRegisterFilePath: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
-
- if (!session) {
- LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
- return false;
- }
-
- CHECK(command.file_path.has_value()) << command;
- return session->RegisterFilePath(command.id, *command.file_path);
- }
- case CommandChoice::kUnregisterFilePath: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
-
- if (!session) {
- LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
- return false;
- }
-
- return session->UnregisterFilePath(command.id);
- }
- case CommandChoice::kReadAhead: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
-
- if (!session) {
- LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
- return false;
- }
-
- return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset);
- }
- // TODO: unreadahead
- case CommandChoice::kExit: {
- LOG(WARNING) << "kExit should be handled earlier.";
- return true;
- }
- case CommandChoice::kCreateSession: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
- if (session != nullptr) {
- LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
- return false;
- }
- CHECK(command.file_path.has_value()) << command;
- if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path)
- == nullptr) {
- LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command;
- return false;
- }
- return true;
- }
- case CommandChoice::kDestroySession: {
- if (!session_manager_->DestroySession(command.session_id)) {
- LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command;
- return false;
- }
- return true;
- }
- case CommandChoice::kDumpSession: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
-
- if (!session) {
- LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
- return false;
- }
-
- // TODO: Consider doing dumpsys support somehow?
- session->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
- return true;
- }
- case CommandChoice::kDumpEverything: {
- session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
- break;
- }
- case CommandChoice::kCreateFdSession: {
- std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
- if (session != nullptr) {
- LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
- return false;
- }
- CHECK(command.file_path.has_value()) << command;
- CHECK(command.fd.has_value()) << command;
-
- LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd);
-
- // TODO: Maybe use CreateFdSession instead?
- session =
- session_manager_->CreateSession(command.session_id,
- /*description*/*command.file_path,
- command.fd.value());
- if (session == nullptr) {
- LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command;
- return false;
- }
-
- return session->ProcessFd(*command.fd);
- }
- }
-
- return true;
- }
-
- pid_t child_;
- bool forked_;
- int pipefd_read_;
- int pipefd_write_;
- PrefetcherForkParameters params_;
- // do not ever use an indirect session manager here, as it would cause a lifetime cycle.
- std::unique_ptr<SessionManager> session_manager_; // direct only.
-};
-
-PrefetcherDaemon::PrefetcherDaemon()
- : impl_{new Impl{this}} {
- LOG(VERBOSE) << "PrefetcherDaemon() constructor";
-}
-
-bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) {
- return impl_->StartViaFork(std::move(params));
-}
-
-
-std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() {
- return impl_->StartPipesViaFork();
-}
-
-std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() {
- return impl_->StartSocketViaFork();
-}
-
-bool PrefetcherDaemon::Main(PrefetcherForkParameters params) {
- return impl_->Main(params);
-}
-
-bool PrefetcherDaemon::SendCommand(const Command& command) {
- return impl_->SendCommand(command);
-}
-
-PrefetcherDaemon::~PrefetcherDaemon() {
- // required for unique_ptr for incomplete types.
-}
-
-} // namespace iorap::prefetcher