aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java369
-rw-r--r--jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java127
2 files changed, 353 insertions, 143 deletions
diff --git a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
index bc52103..b240a0c 100644
--- a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
+++ b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
@@ -36,12 +36,14 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.OpenOption;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
/**
* A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The
@@ -55,13 +57,12 @@ import javax.annotation.Nullable;
final class JimfsFileChannel extends FileChannel {
/**
- * Thread that is currently doing an interruptible blocking operation; that is, doing something
- * that requires acquiring the file's lock. Since a thread has to already have this channel's
- * lock to do that, there can only be one such thread at a time. This thread must be interrupted
- * if the channel is closed by another thread.
+ * Set of threads that are currently doing an interruptible blocking operation; that is, doing
+ * something that requires acquiring the file's lock. These threads must be interrupted if the
+ * channel is closed by another thread.
*/
- @Nullable
- private volatile Thread blockingThread;
+ @GuardedBy("blockingThreads")
+ private final Set<Thread> blockingThreads = new HashSet<Thread>();
private final RegularFile file;
private final FileSystemState fileSystemState;
@@ -70,6 +71,7 @@ final class JimfsFileChannel extends FileChannel {
private final boolean write;
private final boolean append;
+ @GuardedBy("this")
private long position;
public JimfsFileChannel(
@@ -110,11 +112,20 @@ final class JimfsFileChannel extends FileChannel {
}
/**
- * Begins a blocking operation, making the operation interruptible.
+ * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the
+ * channel was open and the thread was added as a blocking thread; returns {@code false} if the
+ * channel was closed.
*/
- private void beginBlocking() {
+ private boolean beginBlocking() {
begin();
- blockingThread = Thread.currentThread();
+ synchronized (blockingThreads) {
+ if (isOpen()) {
+ blockingThreads.add(Thread.currentThread());
+ return true;
+ }
+
+ return false;
+ }
}
/**
@@ -122,7 +133,9 @@ final class JimfsFileChannel extends FileChannel {
* or if the channel was closed from another thread.
*/
private void endBlocking(boolean completed) throws AsynchronousCloseException {
- blockingThread = null;
+ synchronized (blockingThreads) {
+ blockingThreads.remove(Thread.currentThread());
+ }
end(completed);
}
@@ -132,24 +145,22 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
+ int read = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.readLock().lockInterruptibly();
try {
- int read = file.read(position, dst);
+ read = file.read(position, dst);
if (read != -1) {
position += read;
}
-
file.updateAccessTime();
completed = true;
- return read;
} finally {
file.readLock().unlock();
}
@@ -158,10 +169,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return read;
}
@Override
@@ -172,24 +182,22 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
+ long read = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.readLock().lockInterruptibly();
try {
- long read = file.read(position, buffers);
+ read = file.read(position, buffers);
if (read != -1) {
position += read;
}
-
file.updateAccessTime();
completed = true;
- return read;
} finally {
file.readLock().unlock();
}
@@ -198,10 +206,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return read;
}
@Override
@@ -210,25 +217,23 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
+ int written = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
if (append) {
position = file.size();
}
- int written = file.write(position, src);
+ written = file.write(position, src);
position += written;
-
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -237,10 +242,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
@@ -251,25 +255,23 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
+ long written = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
if (append) {
position = file.size();
}
- long written = file.write(position, buffers);
+ written = file.write(position, buffers);
position += written;
-
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -278,19 +280,32 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
public long position() throws IOException {
checkOpen();
+ long pos;
+
synchronized (this) {
- return position;
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ pos = this.position;
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
+
+ return pos;
}
@Override
@@ -299,7 +314,17 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
synchronized (this) {
- this.position = newPosition;
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return this; // AsynchronousCloseException will be thrown
+ }
+ this.position = newPosition;
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
return this;
@@ -308,7 +333,28 @@ final class JimfsFileChannel extends FileChannel {
@Override
public long size() throws IOException {
checkOpen();
- return file.size();
+
+ long size = 0; // will definitely either be assigned or an exception will be thrown
+
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ size = file.sizeWithoutLocking();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+
+ return size;
}
@Override
@@ -320,21 +366,17 @@ final class JimfsFileChannel extends FileChannel {
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return this; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
file.truncate(size);
if (position > size) {
position = size;
}
-
file.updateModifiedTime();
completed = true;
- return this;
} finally {
file.writeLock().unlock();
}
@@ -343,16 +385,24 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return this;
}
@Override
public void force(boolean metaData) throws IOException {
checkOpen();
- // do nothing... writes are all synchronous anyway
+
+ // nothing to do since writes are all direct to the storage
+ // however, we should handle the thread being interrupted anyway
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
@Override
@@ -363,32 +413,29 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
- file.readLock().lockInterruptibly();
- try {
- long transferred = file.transferTo(position, count, target);
- file.updateAccessTime();
- completed = true;
- return transferred;
- } finally {
- file.readLock().unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ transferred = file.transferTo(position, count, target);
+ file.updateAccessTime();
+ completed = true;
} finally {
- endBlocking(completed);
+ file.readLock().unlock();
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
}
+
+ return transferred;
}
@Override
@@ -399,29 +446,45 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
- file.writeLock().lockInterruptibly();
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
try {
- if (append) {
- position = file.size();
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
}
- long transferred = file.transferFrom(src, position, count);
-
- if (append) {
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ transferred = file.transferFrom(src, position, count);
this.position = position + transferred;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
}
-
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ transferred = file.transferFrom(src, position, count);
file.updateModifiedTime();
completed = true;
- return transferred;
} finally {
file.writeLock().unlock();
}
@@ -430,10 +493,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return transferred;
}
@Override
@@ -443,32 +505,29 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ int read = 0; // will definitely either be assigned or an exception will be thrown
- file.readLock().lockInterruptibly();
- try {
- int read = file.read(position, dst);
- file.updateAccessTime();
- completed = true;
- return read;
- } finally {
- file.readLock().unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ read = file.read(position, dst);
+ file.updateAccessTime();
+ completed = true;
} finally {
- endBlocking(completed);
+ file.readLock().unlock();
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
}
+
+ return read;
}
@Override
@@ -478,29 +537,45 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ int written = 0; // will definitely either be assigned or an exception will be thrown
- file.writeLock().lockInterruptibly();
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
try {
- if (append) {
- position = file.sizeWithoutLocking();
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
}
- int written = file.write(position, src);
-
- if (append) {
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ written = file.write(position, src);
this.position = position + written;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
}
-
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ written = file.write(position, src);
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -509,10 +584,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
@@ -533,7 +607,14 @@ final class JimfsFileChannel extends FileChannel {
checkWritable();
}
- return new FakeFileLock(this, position, size, shared);
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ return new FakeFileLock(this, position, size, shared);
+ } finally {
+ end(completed);
+ }
}
@Override
@@ -544,11 +625,13 @@ final class JimfsFileChannel extends FileChannel {
@Override
protected void implCloseChannel() {
- // interrupt the current blocking thread, if any, causing it to throw ClosedByInterruptException
+ // interrupt the current blocking threads, if any, causing them to throw
+ // ClosedByInterruptException
try {
- final Thread thread = blockingThread;
- if (thread != null) {
- thread.interrupt();
+ synchronized (blockingThreads) {
+ for (Thread thread : blockingThreads) {
+ thread.interrupt();
+ }
}
} finally {
fileSystemState.unregister(this);
diff --git a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
index 2eb0743..756364a 100644
--- a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
+++ b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
@@ -871,4 +871,131 @@ public class JimfsFileChannelTest {
return futures;
}
+
+ /**
+ * Tests that the methods on the default FileChannel that support InterruptibleChannel behavior
+ * also support it on JimfsFileChannel, by just interrupting the thread before calling the
+ * method.
+ */
+ @Test
+ public void testInterruptedThreads() throws IOException {
+ final ByteBuffer buf = ByteBuffer.allocate(10);
+ final ByteBuffer[] bufArray = { buf };
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.size();
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.position();
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.position(0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(buf);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(bufArray, 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(buf);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(bufArray, 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(buf, 0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(buf, 0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.transferTo(0, 1, channel(regularFile(10), READ, WRITE));
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.transferFrom(channel(regularFile(10), READ, WRITE), 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.force(true);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.truncate(0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.lock(0, 1, true);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.tryLock(0, 1, true);
+ }
+ });
+
+ // the map() method always throws UnsupportedOperationException; it doesn't make sense for it
+ // to try to handle thread interruption
+ }
+
+ private interface FileChannelMethod {
+ void call(FileChannel channel) throws IOException;
+ }
+
+ /**
+ * Asserts that when the given operation is run on an interrupted thread,
+ * {@code ClosedByInterruptException} is thrown, the channel is closed and the thread is no
+ * longer interrupted.
+ */
+ private static void assertClosedByInterrupt(FileChannelMethod method) throws IOException {
+ FileChannel channel = channel(regularFile(10), READ, WRITE);
+ Thread.currentThread().interrupt();
+ try {
+ method.call(channel);
+ fail("expected the method to throw ClosedByInterruptException");
+ } catch (ClosedByInterruptException expected) {
+ assertFalse("expected the channel to be closed", channel.isOpen());
+ assertTrue("expected the thread to still be interrupted", Thread.interrupted());
+ } finally {
+ Thread.interrupted(); // ensure the thread isn't interrupted when this method returns
+ }
+ }
}