diff options
Diffstat (limited to 'src/prefetcher/prefetcher_daemon.cc')
-rw-r--r-- | src/prefetcher/prefetcher_daemon.cc | 1367 |
1 files changed, 0 insertions, 1367 deletions
diff --git a/src/prefetcher/prefetcher_daemon.cc b/src/prefetcher/prefetcher_daemon.cc deleted file mode 100644 index f4b9087..0000000 --- a/src/prefetcher/prefetcher_daemon.cc +++ /dev/null @@ -1,1367 +0,0 @@ -// Copyright (C) 2019 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "prefetcher/minijail.h" -#include "common/cmd_utils.h" -#include "prefetcher/prefetcher_daemon.h" -#include "prefetcher/session_manager.h" -#include "prefetcher/session.h" - -#include <android-base/logging.h> -#include <android-base/properties.h> - -#include <deque> -#include <iomanip> -#include <string> -#include <sstream> -#include <vector> - -#include <fcntl.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/wait.h> -#include <sys/un.h> -#include <unistd.h> - -namespace iorap::prefetcher { - -// Gate super-spammy IPC logging behind a property. -// This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower. -static bool LogVerboseIpc() { - static bool initialized = false; - static bool verbose_ipc; - - if (initialized == false) { - initialized = true; - - verbose_ipc = - ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false); - } - - return verbose_ipc; -} - -static const bool kInstallMiniJail = - ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true); - -static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd"; - -static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size - -using ArgString = const char*; - -std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) { - switch (ps) { - case ReadAheadKind::kFadvise: - os << "fadvise"; - break; - case ReadAheadKind::kMmapLocked: - os << "mmap"; - break; - case ReadAheadKind::kMlock: - os << "mlock"; - break; - default: - os << "<invalid>"; - } - return os; -} - -std::ostream& operator<<(std::ostream& os, CommandChoice choice) { - switch (choice) { - case CommandChoice::kRegisterFilePath: - os << "kRegisterFilePath"; - break; - case CommandChoice::kUnregisterFilePath: - os << "kUnregisterFilePath"; - break; - case CommandChoice::kReadAhead: - os << "kReadAhead"; - break; - case CommandChoice::kExit: - os << "kExit"; - break; - case CommandChoice::kCreateSession: - os << "kCreateSession"; - break; - case CommandChoice::kDestroySession: - os << "kDestroySession"; - break; - case CommandChoice::kDumpSession: - os << "kDumpSession"; - break; - case CommandChoice::kDumpEverything: - os << "kDumpEverything"; - break; - case CommandChoice::kCreateFdSession: - os << "kCreateFdSession"; - break; - default: - CHECK(false) << "forgot to handle this choice"; - break; - } - return os; -} - -std::ostream& operator<<(std::ostream& os, const Command& command) { - os << "Command{"; - os << "choice=" << command.choice << ","; - - bool has_session_id = true; - bool has_id = true; - switch (command.choice) { - case CommandChoice::kDumpEverything: - case CommandChoice::kExit: - has_session_id = false; - FALLTHROUGH_INTENDED; - case CommandChoice::kCreateFdSession: - case CommandChoice::kCreateSession: - case CommandChoice::kDestroySession: - case CommandChoice::kDumpSession: - has_id = false; - break; - default: - break; - } - - if (has_session_id) { - os << "sid=" << command.session_id << ","; - } - - if (has_id) { - os << "id=" << command.id << ","; - } - - switch (command.choice) { - case CommandChoice::kRegisterFilePath: - os << "file_path="; - - if (command.file_path) { - os << *(command.file_path); - } else { - os << "(nullopt)"; - } - break; - case CommandChoice::kUnregisterFilePath: - break; - case CommandChoice::kReadAhead: - os << "read_ahead_kind=" << command.read_ahead_kind << ","; - os << "length=" << command.length << ","; - os << "offset=" << command.offset << ","; - break; - case CommandChoice::kExit: - break; - case CommandChoice::kCreateFdSession: - os << "fd="; - if (command.fd.has_value()) { - os << command.fd.value(); - } else { - os << "(nullopt)"; - } - os << ","; - FALLTHROUGH_INTENDED; - case CommandChoice::kCreateSession: - os << "description="; - if (command.file_path) { - os << "'" << *(command.file_path) << "'"; - } else { - os << "(nullopt)"; - } - break; - case CommandChoice::kDestroySession: - break; - case CommandChoice::kDumpSession: - break; - case CommandChoice::kDumpEverything: - break; - default: - CHECK(false) << "forgot to handle this choice"; - break; - } - - os << "}"; - - return os; -} - -template <typename T> -struct ParseResult { - T value; - char* next_token; - size_t stream_size; - - ParseResult() : value{}, next_token{nullptr}, stream_size{} { - } - - constexpr operator bool() const { - return next_token != nullptr; - } -}; - -// Very spammy: Keep it off by default. Set to true if changing this code. -static constexpr bool kDebugParsingRead = false; - -#define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead " - - - -// Parse a strong type T from a buffer stream. -// If there's insufficient space left to parse the value, an empty ParseResult is returned. -template <typename T> -ParseResult<T> ParsingRead(char* stream, size_t stream_size) { - if (stream == nullptr) { - DEBUG_PREAD << "stream was null"; - return {}; - } - - if constexpr (std::is_same_v<T, std::string>) { - ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size); - - if (!length) { - DEBUG_PREAD << "could not find length"; - // Not enough bytes left? - return {}; - } - - ParseResult<std::string> string_result; - string_result.value.reserve(length); - - stream = length.next_token; - stream_size = length.stream_size; - - for (size_t i = 0; i < length.value; ++i) { - ParseResult<char> char_result = ParsingRead<char>(stream, stream_size); - - stream = char_result.next_token; - stream_size = char_result.stream_size; - - if (!char_result) { - DEBUG_PREAD << "too few chars in stream, expected length: " << length.value; - // Not enough bytes left? - return {}; - } - - string_result.value += char_result.value; - - DEBUG_PREAD << "string preliminary is : " << string_result.value; - } - - DEBUG_PREAD << "parsed string to: " << string_result.value; - string_result.next_token = stream; - return string_result; - } else { - if (sizeof(T) > stream_size) { - return {}; - } - - ParseResult<T> result; - result.next_token = stream + sizeof(T); - result.stream_size = stream_size - sizeof(T); - - memcpy(&result.value, stream, sizeof(T)); - - return result; - } -} - -// Convenience overload to chain multiple ParsingRead together. -template <typename T, typename U> -ParseResult<T> ParsingRead(ParseResult<U> result) { - return ParsingRead<T>(result.next_token, result.stream_size); -} - -class CommandParser { - public: - CommandParser(PrefetcherForkParameters params) { - params_ = params; - } - - std::vector<Command> ParseSocketCommands(bool& eof) { - eof = false; - - std::vector<Command> commands_vec; - - std::vector<char> buf_vector; - buf_vector.resize(1024*1024); // 1MB. - char* buf = &buf_vector[0]; - - // Binary only parsing. The higher level code can parse text - // with ifstream if it really wants to. - char* stream = &buf[0]; - size_t stream_size = buf_vector.size(); - - while (true) { - if (stream_size == 0) { - // TODO: reply with an overflow command. - LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; - stream = &buf[0]; - stream_size = buf_vector.size(); - memset(&buf[0], /*c*/0, buf_vector.size()); - } - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")"; - } - - ssize_t count; - struct msghdr hdr; - memset(&hdr, 0, sizeof(hdr)); - - { - union { - struct cmsghdr cmh; - char control[CMSG_SPACE(sizeof(int))]; - } control_un; - memset(&control_un, 0, sizeof(control_un)); - - /* Set 'control_un' to describe ancillary data that we want to receive */ - control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */ - control_un.cmh.cmsg_level = SOL_SOCKET; - control_un.cmh.cmsg_type = SCM_CREDENTIALS; - - // the regular message data will be read into stream - struct iovec iov; - memset(&iov, 0, sizeof(iov)); - iov.iov_base = stream; - iov.iov_len = stream_size; - - /* Set hdr fields to describe 'control_un' */ - hdr.msg_control = control_un.control; - hdr.msg_controllen = sizeof(control_un.control); - hdr.msg_iov = &iov; - hdr.msg_iovlen = 1; - hdr.msg_name = nullptr; /* no peer address */ - hdr.msg_namelen = 0; - - count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0)); - } - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size; - } - - if (count < 0) { - PLOG(ERROR) << "failed to recvmsg from input fd"; - break; - // TODO: let the daemon be restarted by higher level code? - } else if (count == 0) { - LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; - eof = true; - break; - // TODO: let the daemon be restarted by higher level code? - } - - { - /* Extract fd from ancillary data if present */ - struct cmsghdr* hp; - hp = CMSG_FIRSTHDR(&hdr); - if (hp && - // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing? - // (hp->cmsg_len == CMSG_LEN(sizeof(int))) && - (hp->cmsg_level == SOL_SOCKET) && - (hp->cmsg_type == SCM_RIGHTS)) { - - int passed_fd = *(int*) CMSG_DATA(hp); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd; - } - - // tack the FD into our dequeue. - // we assume the FDs are sent in-order same as the regular iov are sent in-order. - longbuf_fds_.insert(longbuf_fds_.end(), passed_fd); - } else if (hp != nullptr) { - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS," - << "cmsg_len=" << hp->cmsg_len << "," - << "cmsg_level=" << hp->cmsg_level << "," - << "cmsg_type=" << hp->cmsg_type; - } - } - } - - longbuf_.insert(longbuf_.end(), stream, stream + count); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); - } - - // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]* - { - if (longbuf_.size() == 0) { - break; - } - - std::vector<char> v(longbuf_.begin(), - longbuf_.end()); - - std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()}; - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); - if (WOULD_LOG(VERBOSE)) { - std::stringstream dump; - dump << std::hex << std::setfill('0'); - for (size_t i = 0; i < v.size(); ++i) { - dump << std::setw(2) << static_cast<unsigned>(v[i]); - } - - LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); - } - LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size(); - if (WOULD_LOG(VERBOSE)) { - std::stringstream dump; - for (size_t i = 0; i < v_fds.size(); ++i) { - dump << v_fds[i] << ", "; - } - - LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str(); - } - - } - - size_t v_fds_off = 0; - size_t consumed_fds_total = 0; - - size_t v_off = 0; - size_t consumed_bytes = std::numeric_limits<size_t>::max(); - size_t consumed_total = 0; - - while (true) { - std::optional<Command> maybe_command; - maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); - consumed_total += consumed_bytes; - // Normal every time we get to the end of a buffer. - if (!maybe_command) { - if (LogVerboseIpc()) { - LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); - } - break; - } - - if (maybe_command->RequiresFd()) { - if (v_fds_off < v_fds.size()) { - maybe_command->fd = v_fds[v_fds_off++]; - consumed_fds_total++; - if (LogVerboseIpc()) { - LOG(VERBOSE) << "Append the FD to " << *maybe_command; - } - } else { - LOG(WARNING) << "Failed to acquire FD for " << *maybe_command; - } - } - - // in the next pass ignore what we already consumed. - v_off += consumed_bytes; - - // true as long we don't hit the 'break' above. - DCHECK_EQ(v_off, consumed_total); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() - << "," << *maybe_command; - - // Pretty-print a single command for debugging/testing. - LOG(VERBOSE) << *maybe_command; - } - - // add to the commands we parsed. - commands_vec.push_back(*maybe_command); - } - - // erase however many were consumed - longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); - - // erase however many FDs were consumed. - longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total); - } - break; - } - - return commands_vec; - } - - std::vector<Command> ParseCommands(bool& eof) { - eof = false; - - std::vector<Command> commands_vec; - - std::vector<char> buf_vector; - buf_vector.resize(kPipeBufferSize); - char* buf = &buf_vector[0]; - - // Binary only parsing. The higher level code can parse text - // with ifstream if it really wants to. - char* stream = &buf[0]; - size_t stream_size = buf_vector.size(); - - while (true) { - if (stream_size == 0) { - // TODO: reply with an overflow command. - LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; - stream = &buf[0]; - stream_size = buf_vector.size(); - memset(&buf[0], /*c*/0, buf_vector.size()); - } - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")"; - } - ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size)); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size; - } - - if (count < 0) { - PLOG(ERROR) << "failed to read from input fd"; - break; - // TODO: let the daemon be restarted by higher level code? - } else if (count == 0) { - LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; - eof = true; - break; - // TODO: let the daemon be restarted by higher level code? - } - - longbuf_.insert(longbuf_.end(), stream, stream + count); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); - } - - std::optional<Command> maybe_command; - { - if (longbuf_.size() == 0) { - break; - } - - std::vector<char> v(longbuf_.begin(), - longbuf_.end()); - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); - if (WOULD_LOG(VERBOSE)) { - std::stringstream dump; - dump << std::hex << std::setfill('0'); - for (size_t i = 0; i < v.size(); ++i) { - dump << std::setw(2) << static_cast<unsigned>(v[i]); - } - - LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); - } - } - - size_t v_off = 0; - size_t consumed_bytes = std::numeric_limits<size_t>::max(); - size_t consumed_total = 0; - - while (true) { - maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); - consumed_total += consumed_bytes; - // Normal every time we get to the end of a buffer. - if (!maybe_command) { - if (LogVerboseIpc()) { - LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); - } - break; - } - - // in the next pass ignore what we already consumed. - v_off += consumed_bytes; - - // true as long we don't hit the 'break' above. - DCHECK_EQ(v_off, consumed_total); - if (LogVerboseIpc()) { - LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() - << "," << *maybe_command; - - // Pretty-print a single command for debugging/testing. - LOG(VERBOSE) << *maybe_command; - } - - // add to the commands we parsed. - commands_vec.push_back(*maybe_command); - } - - // erase however many were consumed - longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); - } - break; - } - - return commands_vec; - } - - private: - bool IsTextMode() const { - return params_.format_text; - } - - PrefetcherForkParameters params_; - - // A buffer long enough to contain a lot of buffers. - // This handles reads that only contain a partial command. - std::deque<char> longbuf_; - - // File descriptor buffers. - std::deque<int> longbuf_fds_; -}; - -static constexpr bool kDebugCommandRead = true; - -#define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read " - -std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) { - *consumed_bytes = 0; - if (buf == nullptr) { - return std::nullopt; - } - - Command cmd{}; // zero-initialize any unused fields - ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size); - cmd.choice = parsed_choice.value; - - if (!parsed_choice) { - DEBUG_READ << "no choice"; - return std::nullopt; - } - - switch (parsed_choice.value) { - case CommandChoice::kRegisterFilePath: { - ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); - if (!parsed_session_id) { - DEBUG_READ << "no parsed session id"; - return std::nullopt; - } - - ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); - if (!parsed_id) { - DEBUG_READ << "no parsed id"; - return std::nullopt; - } - - ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id); - - if (!parsed_file_path) { - DEBUG_READ << "no file path"; - return std::nullopt; - } - *consumed_bytes = parsed_file_path.next_token - buf; - - cmd.session_id = parsed_session_id.value; - cmd.id = parsed_id.value; - cmd.file_path = parsed_file_path.value; - - break; - } - case CommandChoice::kUnregisterFilePath: { - ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); - if (!parsed_session_id) { - DEBUG_READ << "no parsed session id"; - return std::nullopt; - } - - ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); - if (!parsed_id) { - DEBUG_READ << "no parsed id"; - return std::nullopt; - } - *consumed_bytes = parsed_id.next_token - buf; - - cmd.session_id = parsed_session_id.value; - cmd.id = parsed_id.value; - - break; - } - case CommandChoice::kReadAhead: { - ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); - if (!parsed_session_id) { - DEBUG_READ << "no parsed session id"; - return std::nullopt; - } - - ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); - if (!parsed_id) { - DEBUG_READ << "no parsed id"; - return std::nullopt; - } - - ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id); - if (!parsed_kind) { - DEBUG_READ << "no parsed kind"; - return std::nullopt; - } - ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind); - if (!parsed_length) { - DEBUG_READ << "no parsed length"; - return std::nullopt; - } - ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length); - if (!parsed_offset) { - DEBUG_READ << "no parsed offset"; - return std::nullopt; - } - *consumed_bytes = parsed_offset.next_token - buf; - - cmd.session_id = parsed_session_id.value; - cmd.id = parsed_id.value; - cmd.read_ahead_kind = parsed_kind.value; - cmd.length = parsed_length.value; - cmd.offset = parsed_offset.value; - - break; - } - case CommandChoice::kCreateSession: - case CommandChoice::kCreateFdSession: { - ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); - if (!parsed_session_id) { - DEBUG_READ << "no parsed session id"; - return std::nullopt; - } - - ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id); - - if (!parsed_description) { - DEBUG_READ << "no description"; - return std::nullopt; - } - *consumed_bytes = parsed_description.next_token - buf; - - cmd.session_id = parsed_session_id.value; - cmd.file_path = parsed_description.value; - - break; - } - case CommandChoice::kDestroySession: - case CommandChoice::kDumpSession: { - ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); - if (!parsed_session_id) { - DEBUG_READ << "no parsed session id"; - return std::nullopt; - } - - *consumed_bytes = parsed_session_id.next_token - buf; - - cmd.session_id = parsed_session_id.value; - - break; - } - case CommandChoice::kExit: - case CommandChoice::kDumpEverything: - *consumed_bytes = parsed_choice.next_token - buf; - // Only need to parse the choice. - break; - default: - LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value); - break; - } - - return cmd; -} - -bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const { - *produced_bytes = 0; - if (buf == nullptr) { - LOG(WARNING) << "null buf, is this expected?"; - return false; - } - - bool has_enough_space = false; - size_t space_requirement = std::numeric_limits<size_t>::max(); - - space_requirement = sizeof(choice); - - switch (choice) { - case CommandChoice::kRegisterFilePath: - space_requirement += sizeof(session_id); - space_requirement += sizeof(id); - space_requirement += sizeof(uint32_t); // string length - - if (!file_path) { - LOG(WARNING) << "Missing file path for kRegisterFilePath"; - return false; - } - - space_requirement += file_path->size(); // string contents - break; - case CommandChoice::kUnregisterFilePath: - space_requirement += sizeof(session_id); - space_requirement += sizeof(id); - break; - case CommandChoice::kReadAhead: - space_requirement += sizeof(session_id); - space_requirement += sizeof(id); - space_requirement += sizeof(read_ahead_kind); - space_requirement += sizeof(length); - space_requirement += sizeof(offset); - break; - case CommandChoice::kCreateSession: - case CommandChoice::kCreateFdSession: - space_requirement += sizeof(session_id); - space_requirement += sizeof(uint32_t); // string length - - if (!file_path) { - LOG(WARNING) << "Missing file path for kCreateSession"; - return false; - } - - space_requirement += file_path->size(); // string contents - break; - case CommandChoice::kDestroySession: - case CommandChoice::kDumpSession: - space_requirement += sizeof(session_id); - break; - case CommandChoice::kExit: - case CommandChoice::kDumpEverything: - // Only need space for the choice. - break; - default: - LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice); - break; - } - - if (buf_size < space_requirement) { - return false; - } - - *produced_bytes = space_requirement; - - // Always write out the choice. - size_t buf_offset = 0; - - memcpy(&buf[buf_offset], &choice, sizeof(choice)); - buf_offset += sizeof(choice); - - switch (choice) { - case CommandChoice::kRegisterFilePath: - memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); - buf_offset += sizeof(session_id); - memcpy(&buf[buf_offset], &id, sizeof(id)); - buf_offset += sizeof(id); - - { - uint32_t string_length = static_cast<uint32_t>(file_path->size()); - memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); - buf_offset += sizeof(string_length); - } - - DCHECK(file_path.has_value()); - - memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); - buf_offset += file_path->size(); - break; - case CommandChoice::kUnregisterFilePath: - memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); - buf_offset += sizeof(session_id); - memcpy(&buf[buf_offset], &id, sizeof(id)); - buf_offset += sizeof(id); - break; - case CommandChoice::kReadAhead: - memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); - buf_offset += sizeof(session_id); - memcpy(&buf[buf_offset], &id, sizeof(id)); - buf_offset += sizeof(id); - memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind)); - buf_offset += sizeof(read_ahead_kind); - memcpy(&buf[buf_offset], &length, sizeof(length)); - buf_offset += sizeof(length); - memcpy(&buf[buf_offset], &offset, sizeof(offset)); - buf_offset += sizeof(offset); - break; - case CommandChoice::kCreateSession: - case CommandChoice::kCreateFdSession: - memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); - buf_offset += sizeof(session_id); - - { - uint32_t string_length = static_cast<uint32_t>(file_path->size()); - memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); - buf_offset += sizeof(string_length); - } - - DCHECK(file_path.has_value()); - - memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); - buf_offset += file_path->size(); - - DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size(); - DCHECK_EQ(buf_offset, *produced_bytes) << *this; - - break; - case CommandChoice::kDestroySession: - case CommandChoice::kDumpSession: - memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); - buf_offset += sizeof(session_id); - break; - case CommandChoice::kExit: - case CommandChoice::kDumpEverything: - // Only need to write out the choice. - break; - default: - LOG(FATAL) << "should have fallen out in the above switch" - << static_cast<uint32_t>(choice); - break; - } - - DCHECK_EQ(buf_offset, space_requirement) << *this; - DCHECK_EQ(buf_offset, *produced_bytes) << *this; - - return true; -} - -class PrefetcherDaemon::Impl { - public: - std::optional<PrefetcherForkParameters> StartPipesViaFork() { - int pipefds[2]; - if (pipe(&pipefds[0]) != 0) { - PLOG(FATAL) << "Failed to create read/write pipes"; - } - - if (WOULD_LOG(VERBOSE)) { - long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ)); - if (pipe_size < 0) { - PLOG(ERROR) << "Failed to F_GETPIPE_SZ:"; - } - LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size; - } - - for (int i = 0; i < 2; ++i) { - // Default pipe size is usually 64KB. - // Increase to 1MB so that iorapd has to rarely run during prefetching. - if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) { - PLOG(FATAL) << "Failed to increase pipe size to max"; - } - } - - pipefd_read_ = pipefds[0]; - pipefd_write_ = pipefds[1]; - - PrefetcherForkParameters params; - params.input_fd = pipefd_read_; - params.output_fd = pipefd_write_; - params.format_text = false; - params.use_sockets = false; - - bool res = StartViaFork(params); - if (res) { - return params; - } else { - return std::nullopt; - } - } - -std::optional<PrefetcherForkParameters> StartSocketViaFork() { - int socket_fds[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) { - PLOG(FATAL) << "Failed to create read/write socketpair"; - } - - pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader - pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer - - PrefetcherForkParameters params; - params.input_fd = pipefd_read_; - params.output_fd = pipefd_write_; - params.format_text = false; - params.use_sockets = true; - - bool res = StartViaFork(params); - if (res) { - return params; - } else { - return std::nullopt; - } - } - - bool StartViaFork(PrefetcherForkParameters params) { - params_ = params; - - forked_ = true; - child_ = fork(); - - if (child_ == -1) { - LOG(FATAL) << "Failed to fork PrefetcherDaemon"; - } else if (child_ > 0) { // we are the caller of this function - LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_; - - return true; - } else { - // we are the child that was forked. - std::stringstream argv; // for logging - std::vector<std::string> argv_vec; - - { - std::stringstream s; - s << "--input-fd"; - argv_vec.push_back(s.str()); - - std::stringstream s2; - s2 << params.input_fd; - argv_vec.push_back(s2.str()); - - argv << " --input-fd" << " " << params.input_fd; - } - - { - std::stringstream s; - s << "--output-fd"; - argv_vec.push_back(s.str()); - - std::stringstream s2; - s2 << params.output_fd; - argv_vec.push_back(s2.str()); - - argv << " --output-fd" << " " << params.output_fd; - } - - - if (params.use_sockets) { - std::stringstream s; - s << "--use-sockets"; - argv_vec.push_back(s.str()); - - argv << " --use-sockets"; - } - - if (WOULD_LOG(VERBOSE)) { - std::stringstream s; - s << "--verbose"; - argv_vec.push_back(s.str()); - - argv << " --verbose"; - } - - std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec); - - LOG(DEBUG) << "fork+exec: " << kCommandFileName << " " - << argv.str(); - execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr); - // This should never return. - _exit(EXIT_FAILURE); - } - - DCHECK(false); - return false; - } - - // TODO: Not very useful since this can never return 'true' - // -> in the child we would've already execd which loses all this code. - bool IsDaemon() { - // In the child the pid is always 0. - return child_ > 0; - } - - bool Main(PrefetcherForkParameters params) { - LOG(VERBOSE) << "PrefetcherDaemon::Main " << params; - - CommandParser command_parser{params}; - - Command next_command{}; - - std::vector<Command> many_commands; - - // Ensure alogd is pre-initialized before installing minijail. - LOG(DEBUG) << "Installing minijail"; - - // Install seccomp filter using libminijail. - if (kInstallMiniJail) { - MiniJail(); - } - - while (true) { - bool eof = false; - - if (params.use_sockets) { - // use recvmsg(2). supports receiving FDs. - many_commands = command_parser.ParseSocketCommands(/*out*/eof); - } else { - // use read(2). does not support receiving FDs. - many_commands = command_parser.ParseCommands(/*out*/eof); - } - - if (eof) { - LOG(WARNING) << "PrefetcherDaemon got EOF, terminating"; - return true; - } - - for (auto& command : many_commands) { - if (LogVerboseIpc()) { - LOG(VERBOSE) << "PrefetcherDaemon got command: " << command; - } - - if (command.choice == CommandChoice::kExit) { - LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating"; - return true; - } - - if (!ReceiveCommand(command)) { - // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command; - } - - // ReceiveCommand should dup to keep the FD. Avoid leaks. - if (command.fd.has_value()) { - close(*command.fd); - } - } - } - - LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating"; - - return true; - // Terminate. - } - - Impl(PrefetcherDaemon* daemon) { - session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect); - DCHECK(session_manager_ != nullptr); - }; - - ~Impl() { - // Don't do anything if we never called 'StartViaFork' - if (forked_) { - if (!IsDaemon()) { - int status; - waitpid(child_, /*out*/&status, /*options*/0); - } else { - LOG(WARNING) << "execve should have avoided this path"; - // DCHECK(false) << "not possible because the execve would avoid this path"; - } - } - } - - bool SendCommand(const Command& command) { - // Only parent is the sender. - DCHECK(forked_); - //DCHECK(!IsDaemon()); - - char buf[1024]; - size_t stream_size; - if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) { - PLOG(ERROR) << "Failed to serialize command: " << command; - return false; - } - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf - << ", size=" << stream_size<< ")"; - } - - if (params_.use_sockets) { - /* iov contains the normal message (Command) */ - struct iovec iov; - memset(&iov, 0, sizeof(iov)); - iov.iov_base = &buf[0]; - iov.iov_len = stream_size; - - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - - /* point to iov to transmit */ - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - - /* no dest address; socket is connected */ - msg.msg_name = nullptr; - msg.msg_namelen = 0; - - // append a CMSG with SCM_RIGHTS if we have an FD. - if (command.fd.has_value()) { - union { - struct cmsghdr cmh; - char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */ - } control_un; - memset(&control_un, 0, sizeof(control_un)); - - msg.msg_control = &control_un.control[0]; - msg.msg_controllen = sizeof(control_un.control); - - struct cmsghdr *hp; - hp = CMSG_FIRSTHDR(&msg); - hp->cmsg_len = CMSG_LEN(sizeof(int)); - hp->cmsg_level = SOL_SOCKET; - hp->cmsg_type = SCM_RIGHTS; - *((int *) CMSG_DATA(hp)) = *(command.fd); - - DCHECK(command.RequiresFd()) << command; - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd); - } - } - - // TODO: add CMSG for the FD passage. - - if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) { - PLOG(ERROR) << "Failed to sendmsg command: " << command; - return false; - } - } else { - if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) { - PLOG(ERROR) << "Failed to write command: " << command; - return false; - } - } - - if (LogVerboseIpc()) { - LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf - << ", size=" << stream_size<< ")"; - } - - // TODO: also read the reply? - return true; - } - - bool ReceiveCommand(const Command& command) { - // Only child is the command receiver. - // DCHECK(IsDaemon()); - - switch (command.choice) { - case CommandChoice::kRegisterFilePath: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - - if (!session) { - LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; - return false; - } - - CHECK(command.file_path.has_value()) << command; - return session->RegisterFilePath(command.id, *command.file_path); - } - case CommandChoice::kUnregisterFilePath: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - - if (!session) { - LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; - return false; - } - - return session->UnregisterFilePath(command.id); - } - case CommandChoice::kReadAhead: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - - if (!session) { - LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; - return false; - } - - return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset); - } - // TODO: unreadahead - case CommandChoice::kExit: { - LOG(WARNING) << "kExit should be handled earlier."; - return true; - } - case CommandChoice::kCreateSession: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - if (session != nullptr) { - LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; - return false; - } - CHECK(command.file_path.has_value()) << command; - if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path) - == nullptr) { - LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command; - return false; - } - return true; - } - case CommandChoice::kDestroySession: { - if (!session_manager_->DestroySession(command.session_id)) { - LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command; - return false; - } - return true; - } - case CommandChoice::kDumpSession: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - - if (!session) { - LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; - return false; - } - - // TODO: Consider doing dumpsys support somehow? - session->Dump(LOG_STREAM(DEBUG), /*multiline*/true); - return true; - } - case CommandChoice::kDumpEverything: { - session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true); - break; - } - case CommandChoice::kCreateFdSession: { - std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); - if (session != nullptr) { - LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; - return false; - } - CHECK(command.file_path.has_value()) << command; - CHECK(command.fd.has_value()) << command; - - LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd); - - // TODO: Maybe use CreateFdSession instead? - session = - session_manager_->CreateSession(command.session_id, - /*description*/*command.file_path, - command.fd.value()); - if (session == nullptr) { - LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command; - return false; - } - - return session->ProcessFd(*command.fd); - } - } - - return true; - } - - pid_t child_; - bool forked_; - int pipefd_read_; - int pipefd_write_; - PrefetcherForkParameters params_; - // do not ever use an indirect session manager here, as it would cause a lifetime cycle. - std::unique_ptr<SessionManager> session_manager_; // direct only. -}; - -PrefetcherDaemon::PrefetcherDaemon() - : impl_{new Impl{this}} { - LOG(VERBOSE) << "PrefetcherDaemon() constructor"; -} - -bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) { - return impl_->StartViaFork(std::move(params)); -} - - -std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() { - return impl_->StartPipesViaFork(); -} - -std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() { - return impl_->StartSocketViaFork(); -} - -bool PrefetcherDaemon::Main(PrefetcherForkParameters params) { - return impl_->Main(params); -} - -bool PrefetcherDaemon::SendCommand(const Command& command) { - return impl_->SendCommand(command); -} - -PrefetcherDaemon::~PrefetcherDaemon() { - // required for unique_ptr for incomplete types. -} - -} // namespace iorap::prefetcher |