diff options
Diffstat (limited to 'fcp/tensorflow/file_descriptor_filesystem.cc')
-rw-r--r-- | fcp/tensorflow/file_descriptor_filesystem.cc | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/fcp/tensorflow/file_descriptor_filesystem.cc b/fcp/tensorflow/file_descriptor_filesystem.cc new file mode 100644 index 0000000..1659839 --- /dev/null +++ b/fcp/tensorflow/file_descriptor_filesystem.cc @@ -0,0 +1,250 @@ +#include "fcp/tensorflow/file_descriptor_filesystem.h" + +#include <errno.h> +#include <sys/stat.h> +#include <unistd.h> + +#include <algorithm> +#include <memory> +#include <vector> + +#include "absl/memory/memory.h" +#include "absl/strings/numbers.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "fcp/base/monitoring.h" +#include "tensorflow/core/platform/env.h" + +namespace tensorflow { + +namespace fcp { + +using ::tensorflow::Status; + +static constexpr char kFdFilesystemPrefix[] = "fd:///"; +static constexpr size_t kMaxWriteChunkSize = 64u * 1024; // 64KB + +namespace { +// Copied from base implementation in +// //external/tensorflow/tensorflow/tsl/platform/default/posix_file_system.cc +absl::Status ReadBytesFromFd(int fd, uint64_t offset, size_t n, + absl::string_view* result, char* scratch) { + absl::Status s; + char* dst = scratch; + while (n > 0 && s.ok()) { + ssize_t r = pread(fd, dst, n, static_cast<off_t>(offset)); + if (r > 0) { + dst += r; + n -= r; + offset += r; + } else if (r == 0) { + s = absl::OutOfRangeError(absl::StrCat( + "Read fewer bytes than requested. Total read bytes ", offset)); + } else if (errno == EINTR || errno == EAGAIN) { + // Retry + } else { + s = absl::UnknownError(absl::StrCat("Failed to read: errno ", errno)); + } + } + *result = absl::string_view(scratch, dst - scratch); + return s; +} + +class FdRandomAccessFile : public RandomAccessFile { + public: + explicit FdRandomAccessFile(int fd) : fd_(fd) {} + + ~FdRandomAccessFile() override { close(fd_); } + + Status Read(uint64 offset, size_t n, StringPiece* result, + char* scratch) const override { + absl::string_view sv; + absl::Status s = ReadBytesFromFd(fd_, offset, n, &sv, scratch); + if (s.ok()) { + *result = StringPiece(sv.data(), sv.size()); + return Status(); + } else { + return Status(static_cast<tensorflow::errors::Code>(s.code()), + s.message()); + } + } + + private: + int fd_; +}; + +class FdWritableFile : public WritableFile { + public: + explicit FdWritableFile(int fd) : fd_(fd) {} + + ~FdWritableFile() override { Close(); } + + Status Append(StringPiece data) override { + return Write(data.data(), data.size()); + } + + Status Close() override { + if (has_closed_) return Status(); + close(fd_); + has_closed_ = true; + return Status(); + } + Status Flush() override { return Status(); } + Status Sync() override { return Status(); } + + private: + Status Write(const void* data, size_t data_size) { + size_t write_len = data_size; + do { + size_t chunk_size = std::min<size_t>(write_len, kMaxWriteChunkSize); + ssize_t wrote = write(fd_, data, chunk_size); + if (wrote < 0) { + return Status(tensorflow::error::Code::UNKNOWN, + absl::StrCat("Failed to write: ", errno)); + } + data = static_cast<const uint8_t*>(data) + wrote; + write_len -= wrote; + } while (write_len > 0); + return Status(); + } + int fd_; + bool has_closed_ = false; +}; + +// Gets the file descriptor in the URI. +Status GetFd(absl::string_view fname, int* result) { + // Consume scheme and empty authority (fd:///) + if (!absl::ConsumePrefix(&fname, kFdFilesystemPrefix)) { + return errors::InvalidArgument("Bad uri: ", fname); + } + + // Try to parse remainder of path as an integer fd + if (!absl::SimpleAtoi(fname, result)) { + return errors::InvalidArgument("Bad path: ", fname); + } + + return OkStatus(); +} + +} // anonymous namespace + +Status FileDescriptorFileSystem::NewRandomAccessFile( + const string& filename, std::unique_ptr<RandomAccessFile>* result) { + int fd; + TF_RETURN_IF_ERROR(GetFd(filename, &fd)); + FileStatistics stat; + TF_RETURN_IF_ERROR(Stat(filename, &stat)); // check against directory FD + + int dup_fd = dup(fd); + if (dup_fd == -1) { + return errors::Unknown("Failed to dup: errno ", errno); + } + + *result = std::make_unique<FdRandomAccessFile>(dup_fd); + return OkStatus(); +} + +Status FileDescriptorFileSystem::GetMatchingPaths( + const string& pattern, std::vector<string>* results) { + results->clear(); + FileStatistics statistics; + if (Stat(pattern, &statistics).ok()) { + results->push_back(pattern); + } + return OkStatus(); +} + +Status FileDescriptorFileSystem::Stat(const string& fname, + FileStatistics* stats) { + if (stats == nullptr) { + return errors::InvalidArgument("FileStatistics pointer must not be NULL"); + } + + int fd; + TF_RETURN_IF_ERROR(GetFd(fname, &fd)); + + struct stat st; + if (fstat(fd, &st) == -1) { + return errors::Unknown("Failed to fstat: errno ", errno); + } + + if (S_ISDIR(st.st_mode)) { + return errors::NotFound("File not found: is a directory"); + } + stats->length = st.st_size; + stats->mtime_nsec = st.st_mtime * 1e9; + stats->is_directory = S_ISDIR(st.st_mode); + + return OkStatus(); +} + +Status FileDescriptorFileSystem::GetFileSize(const string& fname, + uint64* size) { + FileStatistics stat; + TF_RETURN_IF_ERROR(Stat(fname, &stat)); + *size = stat.length; + return OkStatus(); +} + +Status FileDescriptorFileSystem::NewReadOnlyMemoryRegionFromFile( + const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::FileExists(const string& fname) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::GetChildren(const string& dir, + std::vector<string>* r) { + return errors::Unimplemented("fd filesystem is non-hierarchical"); +} + +Status FileDescriptorFileSystem::NewWritableFile( + const string& fname, std::unique_ptr<WritableFile>* result) { + int fd; + TF_RETURN_IF_ERROR(GetFd(fname, &fd)); + FileStatistics stat; + TF_RETURN_IF_ERROR(Stat(fname, &stat)); // check against directory FD + + int dup_fd = dup(fd); + if (dup_fd == -1) { + return errors::Unknown("Failed to dup: errno ", errno); + } + + *result = std::make_unique<FdWritableFile>(dup_fd); + return OkStatus(); +} + +Status FileDescriptorFileSystem::NewAppendableFile( + const string& fname, std::unique_ptr<WritableFile>* result) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::DeleteFile(const string& f) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::CreateDir(const string& d) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::DeleteDir(const string& d) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::RenameFile(const string& s, const string& t) { + return errors::Unimplemented("Not implemented by the fd filesystem"); +} + +Status FileDescriptorFileSystem::CanCreateTempFile(const std::string& fname, + bool* can_create_temp_file) { + *can_create_temp_file = false; + return OkStatus(); +} + +REGISTER_FILE_SYSTEM("fd", FileDescriptorFileSystem); + +} // namespace fcp +} // namespace tensorflow |