aboutsummaryrefslogtreecommitdiff
path: root/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java')
-rw-r--r--jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java675
1 files changed, 675 insertions, 0 deletions
diff --git a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
new file mode 100644
index 0000000..95863cc
--- /dev/null
+++ b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
@@ -0,0 +1,675 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.common.jimfs;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static java.nio.file.StandardOpenOption.APPEND;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.FileLockInterruptionException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The
+ * read and write methods and other methods that read or change the position of the channel are
+ * locked because the {@link ReadableByteChannel} and {@link WritableByteChannel} interfaces specify
+ * that the read and write methods block when another thread is currently doing a read or write
+ * operation.
+ *
+ * @author Colin Decker
+ */
+final class JimfsFileChannel extends FileChannel {
+
+ /**
+ * Set of threads that are currently doing an interruptible blocking operation; that is, doing
+ * something that requires acquiring the file's lock. These threads must be interrupted if the
+ * channel is closed by another thread.
+ */
+ @GuardedBy("blockingThreads")
+ private final Set<Thread> blockingThreads = new HashSet<Thread>();
+
+ private final RegularFile file;
+ private final FileSystemState fileSystemState;
+
+ private final boolean read;
+ private final boolean write;
+ private final boolean append;
+
+ @GuardedBy("this")
+ private long position;
+
+ public JimfsFileChannel(
+ RegularFile file, Set<OpenOption> options, FileSystemState fileSystemState) {
+ this.file = file;
+ this.fileSystemState = fileSystemState;
+ this.read = options.contains(READ);
+ this.write = options.contains(WRITE);
+ this.append = options.contains(APPEND);
+
+ fileSystemState.register(this);
+ }
+
+ /**
+ * Returns an {@link AsynchronousFileChannel} view of this channel using the given executor for
+ * asynchronous operations.
+ */
+ public AsynchronousFileChannel asAsynchronousFileChannel(ExecutorService executor) {
+ return new JimfsAsynchronousFileChannel(this, executor);
+ }
+
+ void checkReadable() {
+ if (!read) {
+ throw new NonReadableChannelException();
+ }
+ }
+
+ void checkWritable() {
+ if (!write) {
+ throw new NonWritableChannelException();
+ }
+ }
+
+ void checkOpen() throws ClosedChannelException {
+ if (!isOpen()) {
+ throw new ClosedChannelException();
+ }
+ }
+
+ /**
+ * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the
+ * channel was open and the thread was added as a blocking thread; returns {@code false} if the
+ * channel was closed.
+ */
+ private boolean beginBlocking() {
+ begin();
+ synchronized (blockingThreads) {
+ if (isOpen()) {
+ blockingThreads.add(Thread.currentThread());
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Ends a blocking operation, throwing an exception if the thread was interrupted while blocking
+ * or if the channel was closed from another thread.
+ */
+ private void endBlocking(boolean completed) throws AsynchronousCloseException {
+ synchronized (blockingThreads) {
+ blockingThreads.remove(Thread.currentThread());
+ }
+ end(completed);
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ checkNotNull(dst);
+ checkOpen();
+ checkReadable();
+
+ int read = 0; // will definitely either be assigned or an exception will be thrown
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ read = file.read(position, dst);
+ if (read != -1) {
+ position += read;
+ }
+ file.updateAccessTime();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return read;
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ checkPositionIndexes(offset, offset + length, dsts.length);
+ List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length);
+ Util.checkNoneNull(buffers);
+ checkOpen();
+ checkReadable();
+
+ long read = 0; // will definitely either be assigned or an exception will be thrown
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ read = file.read(position, buffers);
+ if (read != -1) {
+ position += read;
+ }
+ file.updateAccessTime();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return read;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException {
+ checkNotNull(dst);
+ Util.checkNotNegative(position, "position");
+ checkOpen();
+ checkReadable();
+
+ int read = 0; // will definitely either be assigned or an exception will be thrown
+
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ read = file.read(position, dst);
+ file.updateAccessTime();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+
+ return read;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ checkNotNull(src);
+ checkOpen();
+ checkWritable();
+
+ int written = 0; // will definitely either be assigned or an exception will be thrown
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ if (append) {
+ position = file.size();
+ }
+ written = file.write(position, src);
+ position += written;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return written;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ checkPositionIndexes(offset, offset + length, srcs.length);
+ List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length);
+ Util.checkNoneNull(buffers);
+ checkOpen();
+ checkWritable();
+
+ long written = 0; // will definitely either be assigned or an exception will be thrown
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ if (append) {
+ position = file.size();
+ }
+ written = file.write(position, buffers);
+ position += written;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return written;
+ }
+
+ @Override
+ public int write(ByteBuffer src, long position) throws IOException {
+ checkNotNull(src);
+ Util.checkNotNegative(position, "position");
+ checkOpen();
+ checkWritable();
+
+ int written = 0; // will definitely either be assigned or an exception will be thrown
+
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ written = file.write(position, src);
+ this.position = position + written;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ written = file.write(position, src);
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return written;
+ }
+
+ @Override
+ public long position() throws IOException {
+ checkOpen();
+
+ long pos;
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ pos = this.position;
+ completed = true;
+ } finally {
+ end(completed);
+ }
+ }
+
+ return pos;
+ }
+
+ @Override
+ public FileChannel position(long newPosition) throws IOException {
+ Util.checkNotNegative(newPosition, "newPosition");
+ checkOpen();
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return this; // AsynchronousCloseException will be thrown
+ }
+ this.position = newPosition;
+ completed = true;
+ } finally {
+ end(completed);
+ }
+ }
+
+ return this;
+ }
+
+ @Override
+ public long size() throws IOException {
+ checkOpen();
+
+ long size = 0; // will definitely either be assigned or an exception will be thrown
+
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ size = file.sizeWithoutLocking();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+
+ return size;
+ }
+
+ @Override
+ public FileChannel truncate(long size) throws IOException {
+ Util.checkNotNegative(size, "size");
+ checkOpen();
+ checkWritable();
+
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return this; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ file.truncate(size);
+ if (position > size) {
+ position = size;
+ }
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return this;
+ }
+
+ @Override
+ public void force(boolean metaData) throws IOException {
+ checkOpen();
+
+ // nothing to do since writes are all direct to the storage
+ // however, we should handle the thread being interrupted anyway
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ } finally {
+ end(completed);
+ }
+ }
+
+ @Override
+ public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+ checkNotNull(target);
+ Util.checkNotNegative(position, "position");
+ Util.checkNotNegative(count, "count");
+ checkOpen();
+ checkReadable();
+
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
+
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ transferred = file.transferTo(position, count, target);
+ file.updateAccessTime();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+
+ return transferred;
+ }
+
+ @Override
+ public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+ checkNotNull(src);
+ Util.checkNotNegative(position, "position");
+ Util.checkNotNegative(count, "count");
+ checkOpen();
+ checkWritable();
+
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
+
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ transferred = file.transferFrom(src, position, count);
+ this.position = position + transferred;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ transferred = file.transferFrom(src, position, count);
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+
+ return transferred;
+ }
+
+ @Override
+ public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
+ // would like this to pretend to work, but can't create an implementation of MappedByteBuffer
+ // well, a direct buffer could be cast to MappedByteBuffer, but it couldn't work in general
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileLock lock(long position, long size, boolean shared) throws IOException {
+ checkLockArguments(position, size, shared);
+
+ // lock is interruptible
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ return new FakeFileLock(this, position, size, shared);
+ } finally {
+ try {
+ end(completed);
+ } catch (ClosedByInterruptException e) {
+ throw new FileLockInterruptionException();
+ }
+ }
+ }
+
+ @Override
+ public FileLock tryLock(long position, long size, boolean shared) throws IOException {
+ checkLockArguments(position, size, shared);
+
+ // tryLock is not interruptible
+ return new FakeFileLock(this, position, size, shared);
+ }
+
+ private void checkLockArguments(long position, long size, boolean shared) throws IOException {
+ Util.checkNotNegative(position, "position");
+ Util.checkNotNegative(size, "size");
+ checkOpen();
+ if (shared) {
+ checkReadable();
+ } else {
+ checkWritable();
+ }
+ }
+
+ @Override
+ protected void implCloseChannel() {
+ // interrupt the current blocking threads, if any, causing them to throw
+ // ClosedByInterruptException
+ try {
+ synchronized (blockingThreads) {
+ for (Thread thread : blockingThreads) {
+ thread.interrupt();
+ }
+ }
+ } finally {
+ fileSystemState.unregister(this);
+ file.closed();
+ }
+ }
+
+ /** A file lock that does nothing, since only one JVM process has access to this file system. */
+ static final class FakeFileLock extends FileLock {
+
+ private final AtomicBoolean valid = new AtomicBoolean(true);
+
+ public FakeFileLock(FileChannel channel, long position, long size, boolean shared) {
+ super(channel, position, size, shared);
+ }
+
+ public FakeFileLock(AsynchronousFileChannel channel, long position, long size, boolean shared) {
+ super(channel, position, size, shared);
+ }
+
+ @Override
+ public boolean isValid() {
+ return valid.get();
+ }
+
+ @Override
+ public void release() throws IOException {
+ valid.set(false);
+ }
+ }
+}