aboutsummaryrefslogtreecommitdiff
path: root/jimfs/src
diff options
context:
space:
mode:
authorcgdecker <cgdecker@google.com>2015-04-17 13:31:08 -0700
committerColin Decker <cgdecker@google.com>2015-04-17 17:26:16 -0400
commit5ce1143d312382df61b0f98e31d5870b94b2470f (patch)
treec3949b9956a98bfd4dd194c8403c21c255dd0f1b /jimfs/src
parentc1a7fcd013ea594762e1d348a7e2f6bafd783411 (diff)
downloadjimfs-5ce1143d312382df61b0f98e31d5870b94b2470f.tar.gz
Several changes to JimfsFileChannel.
- Update it to handle thread interruption for all methods that handle interruption in the default FileChannel implementation (with the exception of map(), for which we always throw UOE). For most methods that weren't already handling interruption, this mostly just means that if they're called on a thread that's already interrupted they'll throw ClosedByInterruptException and close the channel. Requested in GitHub issue #21. Also add tests for this. - Make it possible for FileChannel methods that don't use the channel's position (they have a parameter for a specific file position) to proceed concurrently with other FileChannel methods, which is specified as legal in the FileChannel docs: "Other operations, in particular those that take an explicit position, may proceed concurrently". ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=91435386
Diffstat (limited to 'jimfs/src')
-rw-r--r--jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java369
-rw-r--r--jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java127
2 files changed, 353 insertions, 143 deletions
diff --git a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
index bc52103..b240a0c 100644
--- a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
+++ b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java
@@ -36,12 +36,14 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.OpenOption;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
/**
* A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The
@@ -55,13 +57,12 @@ import javax.annotation.Nullable;
final class JimfsFileChannel extends FileChannel {
/**
- * Thread that is currently doing an interruptible blocking operation; that is, doing something
- * that requires acquiring the file's lock. Since a thread has to already have this channel's
- * lock to do that, there can only be one such thread at a time. This thread must be interrupted
- * if the channel is closed by another thread.
+ * Set of threads that are currently doing an interruptible blocking operation; that is, doing
+ * something that requires acquiring the file's lock. These threads must be interrupted if the
+ * channel is closed by another thread.
*/
- @Nullable
- private volatile Thread blockingThread;
+ @GuardedBy("blockingThreads")
+ private final Set<Thread> blockingThreads = new HashSet<Thread>();
private final RegularFile file;
private final FileSystemState fileSystemState;
@@ -70,6 +71,7 @@ final class JimfsFileChannel extends FileChannel {
private final boolean write;
private final boolean append;
+ @GuardedBy("this")
private long position;
public JimfsFileChannel(
@@ -110,11 +112,20 @@ final class JimfsFileChannel extends FileChannel {
}
/**
- * Begins a blocking operation, making the operation interruptible.
+ * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the
+ * channel was open and the thread was added as a blocking thread; returns {@code false} if the
+ * channel was closed.
*/
- private void beginBlocking() {
+ private boolean beginBlocking() {
begin();
- blockingThread = Thread.currentThread();
+ synchronized (blockingThreads) {
+ if (isOpen()) {
+ blockingThreads.add(Thread.currentThread());
+ return true;
+ }
+
+ return false;
+ }
}
/**
@@ -122,7 +133,9 @@ final class JimfsFileChannel extends FileChannel {
* or if the channel was closed from another thread.
*/
private void endBlocking(boolean completed) throws AsynchronousCloseException {
- blockingThread = null;
+ synchronized (blockingThreads) {
+ blockingThreads.remove(Thread.currentThread());
+ }
end(completed);
}
@@ -132,24 +145,22 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
+ int read = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.readLock().lockInterruptibly();
try {
- int read = file.read(position, dst);
+ read = file.read(position, dst);
if (read != -1) {
position += read;
}
-
file.updateAccessTime();
completed = true;
- return read;
} finally {
file.readLock().unlock();
}
@@ -158,10 +169,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return read;
}
@Override
@@ -172,24 +182,22 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
+ long read = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.readLock().lockInterruptibly();
try {
- long read = file.read(position, buffers);
+ read = file.read(position, buffers);
if (read != -1) {
position += read;
}
-
file.updateAccessTime();
completed = true;
- return read;
} finally {
file.readLock().unlock();
}
@@ -198,10 +206,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return read;
}
@Override
@@ -210,25 +217,23 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
+ int written = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
if (append) {
position = file.size();
}
- int written = file.write(position, src);
+ written = file.write(position, src);
position += written;
-
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -237,10 +242,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
@@ -251,25 +255,23 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
+ long written = 0; // will definitely either be assigned or an exception will be thrown
+
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return 0; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
if (append) {
position = file.size();
}
- long written = file.write(position, buffers);
+ written = file.write(position, buffers);
position += written;
-
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -278,19 +280,32 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
public long position() throws IOException {
checkOpen();
+ long pos;
+
synchronized (this) {
- return position;
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ pos = this.position;
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
+
+ return pos;
}
@Override
@@ -299,7 +314,17 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
synchronized (this) {
- this.position = newPosition;
+ boolean completed = false;
+ try {
+ begin(); // don't call beginBlocking() because this method doesn't block
+ if (!isOpen()) {
+ return this; // AsynchronousCloseException will be thrown
+ }
+ this.position = newPosition;
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
return this;
@@ -308,7 +333,28 @@ final class JimfsFileChannel extends FileChannel {
@Override
public long size() throws IOException {
checkOpen();
- return file.size();
+
+ long size = 0; // will definitely either be assigned or an exception will be thrown
+
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ size = file.sizeWithoutLocking();
+ completed = true;
+ } finally {
+ file.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+
+ return size;
}
@Override
@@ -320,21 +366,17 @@ final class JimfsFileChannel extends FileChannel {
synchronized (this) {
boolean completed = false;
try {
- beginBlocking();
- if (!isOpen()) {
+ if (!beginBlocking()) {
return this; // AsynchronousCloseException will be thrown
}
-
file.writeLock().lockInterruptibly();
try {
file.truncate(size);
if (position > size) {
position = size;
}
-
file.updateModifiedTime();
completed = true;
- return this;
} finally {
file.writeLock().unlock();
}
@@ -343,16 +385,24 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return this;
}
@Override
public void force(boolean metaData) throws IOException {
checkOpen();
- // do nothing... writes are all synchronous anyway
+
+ // nothing to do since writes are all direct to the storage
+ // however, we should handle the thread being interrupted anyway
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ } finally {
+ end(completed);
+ }
}
@Override
@@ -363,32 +413,29 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
- file.readLock().lockInterruptibly();
- try {
- long transferred = file.transferTo(position, count, target);
- file.updateAccessTime();
- completed = true;
- return transferred;
- } finally {
- file.readLock().unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ transferred = file.transferTo(position, count, target);
+ file.updateAccessTime();
+ completed = true;
} finally {
- endBlocking(completed);
+ file.readLock().unlock();
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
}
+
+ return transferred;
}
@Override
@@ -399,29 +446,45 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ long transferred = 0; // will definitely either be assigned or an exception will be thrown
- file.writeLock().lockInterruptibly();
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
try {
- if (append) {
- position = file.size();
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
}
- long transferred = file.transferFrom(src, position, count);
-
- if (append) {
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ transferred = file.transferFrom(src, position, count);
this.position = position + transferred;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
}
-
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ transferred = file.transferFrom(src, position, count);
file.updateModifiedTime();
completed = true;
- return transferred;
} finally {
file.writeLock().unlock();
}
@@ -430,10 +493,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return transferred;
}
@Override
@@ -443,32 +505,29 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkReadable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ int read = 0; // will definitely either be assigned or an exception will be thrown
- file.readLock().lockInterruptibly();
- try {
- int read = file.read(position, dst);
- file.updateAccessTime();
- completed = true;
- return read;
- } finally {
- file.readLock().unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // no need to synchronize here; this method does not make use of the channel's position
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.readLock().lockInterruptibly();
+ try {
+ read = file.read(position, dst);
+ file.updateAccessTime();
+ completed = true;
} finally {
- endBlocking(completed);
+ file.readLock().unlock();
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
}
+
+ return read;
}
@Override
@@ -478,29 +537,45 @@ final class JimfsFileChannel extends FileChannel {
checkOpen();
checkWritable();
- synchronized (this) {
- boolean completed = false;
- try {
- beginBlocking();
- if (!isOpen()) {
- return 0; // AsynchronousCloseException will be thrown
- }
+ int written = 0; // will definitely either be assigned or an exception will be thrown
- file.writeLock().lockInterruptibly();
+ if (append) {
+ // synchronize because appending does update the channel's position
+ synchronized (this) {
+ boolean completed = false;
try {
- if (append) {
- position = file.sizeWithoutLocking();
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
}
- int written = file.write(position, src);
-
- if (append) {
+ file.writeLock().lockInterruptibly();
+ try {
+ position = file.sizeWithoutLocking();
+ written = file.write(position, src);
this.position = position + written;
+ file.updateModifiedTime();
+ completed = true;
+ } finally {
+ file.writeLock().unlock();
}
-
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ endBlocking(completed);
+ }
+ }
+ } else {
+ // don't synchronize because the channel's position is not involved
+ boolean completed = false;
+ try {
+ if (!beginBlocking()) {
+ return 0; // AsynchronousCloseException will be thrown
+ }
+ file.writeLock().lockInterruptibly();
+ try {
+ written = file.write(position, src);
file.updateModifiedTime();
completed = true;
- return written;
} finally {
file.writeLock().unlock();
}
@@ -509,10 +584,9 @@ final class JimfsFileChannel extends FileChannel {
} finally {
endBlocking(completed);
}
-
- // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException
- throw new AssertionError();
}
+
+ return written;
}
@Override
@@ -533,7 +607,14 @@ final class JimfsFileChannel extends FileChannel {
checkWritable();
}
- return new FakeFileLock(this, position, size, shared);
+ boolean completed = false;
+ try {
+ begin();
+ completed = true;
+ return new FakeFileLock(this, position, size, shared);
+ } finally {
+ end(completed);
+ }
}
@Override
@@ -544,11 +625,13 @@ final class JimfsFileChannel extends FileChannel {
@Override
protected void implCloseChannel() {
- // interrupt the current blocking thread, if any, causing it to throw ClosedByInterruptException
+ // interrupt the current blocking threads, if any, causing them to throw
+ // ClosedByInterruptException
try {
- final Thread thread = blockingThread;
- if (thread != null) {
- thread.interrupt();
+ synchronized (blockingThreads) {
+ for (Thread thread : blockingThreads) {
+ thread.interrupt();
+ }
}
} finally {
fileSystemState.unregister(this);
diff --git a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
index 2eb0743..756364a 100644
--- a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
+++ b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java
@@ -871,4 +871,131 @@ public class JimfsFileChannelTest {
return futures;
}
+
+ /**
+ * Tests that the methods on the default FileChannel that support InterruptibleChannel behavior
+ * also support it on JimfsFileChannel, by just interrupting the thread before calling the
+ * method.
+ */
+ @Test
+ public void testInterruptedThreads() throws IOException {
+ final ByteBuffer buf = ByteBuffer.allocate(10);
+ final ByteBuffer[] bufArray = { buf };
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.size();
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.position();
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.position(0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(buf);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(bufArray, 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(buf);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(bufArray, 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.write(buf, 0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.read(buf, 0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.transferTo(0, 1, channel(regularFile(10), READ, WRITE));
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.transferFrom(channel(regularFile(10), READ, WRITE), 0, 1);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.force(true);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.truncate(0);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.lock(0, 1, true);
+ }
+ });
+
+ assertClosedByInterrupt(new FileChannelMethod() {
+ @Override public void call(FileChannel channel) throws IOException {
+ channel.tryLock(0, 1, true);
+ }
+ });
+
+ // the map() method always throws UnsupportedOperationException; it doesn't make sense for it
+ // to try to handle thread interruption
+ }
+
+ private interface FileChannelMethod {
+ void call(FileChannel channel) throws IOException;
+ }
+
+ /**
+ * Asserts that when the given operation is run on an interrupted thread,
+ * {@code ClosedByInterruptException} is thrown, the channel is closed and the thread is no
+ * longer interrupted.
+ */
+ private static void assertClosedByInterrupt(FileChannelMethod method) throws IOException {
+ FileChannel channel = channel(regularFile(10), READ, WRITE);
+ Thread.currentThread().interrupt();
+ try {
+ method.call(channel);
+ fail("expected the method to throw ClosedByInterruptException");
+ } catch (ClosedByInterruptException expected) {
+ assertFalse("expected the channel to be closed", channel.isOpen());
+ assertTrue("expected the thread to still be interrupted", Thread.interrupted());
+ } finally {
+ Thread.interrupted(); // ensure the thread isn't interrupted when this method returns
+ }
+ }
}