diff options
-rw-r--r-- | jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java | 369 | ||||
-rw-r--r-- | jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java | 127 |
2 files changed, 353 insertions, 143 deletions
diff --git a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java index bc52103..b240a0c 100644 --- a/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java +++ b/jimfs/src/main/java/com/google/common/jimfs/JimfsFileChannel.java @@ -36,12 +36,14 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.OpenOption; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The @@ -55,13 +57,12 @@ import javax.annotation.Nullable; final class JimfsFileChannel extends FileChannel { /** - * Thread that is currently doing an interruptible blocking operation; that is, doing something - * that requires acquiring the file's lock. Since a thread has to already have this channel's - * lock to do that, there can only be one such thread at a time. This thread must be interrupted - * if the channel is closed by another thread. + * Set of threads that are currently doing an interruptible blocking operation; that is, doing + * something that requires acquiring the file's lock. These threads must be interrupted if the + * channel is closed by another thread. */ - @Nullable - private volatile Thread blockingThread; + @GuardedBy("blockingThreads") + private final Set<Thread> blockingThreads = new HashSet<Thread>(); private final RegularFile file; private final FileSystemState fileSystemState; @@ -70,6 +71,7 @@ final class JimfsFileChannel extends FileChannel { private final boolean write; private final boolean append; + @GuardedBy("this") private long position; public JimfsFileChannel( @@ -110,11 +112,20 @@ final class JimfsFileChannel extends FileChannel { } /** - * Begins a blocking operation, making the operation interruptible. + * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the + * channel was open and the thread was added as a blocking thread; returns {@code false} if the + * channel was closed. */ - private void beginBlocking() { + private boolean beginBlocking() { begin(); - blockingThread = Thread.currentThread(); + synchronized (blockingThreads) { + if (isOpen()) { + blockingThreads.add(Thread.currentThread()); + return true; + } + + return false; + } } /** @@ -122,7 +133,9 @@ final class JimfsFileChannel extends FileChannel { * or if the channel was closed from another thread. */ private void endBlocking(boolean completed) throws AsynchronousCloseException { - blockingThread = null; + synchronized (blockingThreads) { + blockingThreads.remove(Thread.currentThread()); + } end(completed); } @@ -132,24 +145,22 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkReadable(); + int read = 0; // will definitely either be assigned or an exception will be thrown + synchronized (this) { boolean completed = false; try { - beginBlocking(); - if (!isOpen()) { + if (!beginBlocking()) { return 0; // AsynchronousCloseException will be thrown } - file.readLock().lockInterruptibly(); try { - int read = file.read(position, dst); + read = file.read(position, dst); if (read != -1) { position += read; } - file.updateAccessTime(); completed = true; - return read; } finally { file.readLock().unlock(); } @@ -158,10 +169,9 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return read; } @Override @@ -172,24 +182,22 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkReadable(); + long read = 0; // will definitely either be assigned or an exception will be thrown + synchronized (this) { boolean completed = false; try { - beginBlocking(); - if (!isOpen()) { + if (!beginBlocking()) { return 0; // AsynchronousCloseException will be thrown } - file.readLock().lockInterruptibly(); try { - long read = file.read(position, buffers); + read = file.read(position, buffers); if (read != -1) { position += read; } - file.updateAccessTime(); completed = true; - return read; } finally { file.readLock().unlock(); } @@ -198,10 +206,9 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return read; } @Override @@ -210,25 +217,23 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkWritable(); + int written = 0; // will definitely either be assigned or an exception will be thrown + synchronized (this) { boolean completed = false; try { - beginBlocking(); - if (!isOpen()) { + if (!beginBlocking()) { return 0; // AsynchronousCloseException will be thrown } - file.writeLock().lockInterruptibly(); try { if (append) { position = file.size(); } - int written = file.write(position, src); + written = file.write(position, src); position += written; - file.updateModifiedTime(); completed = true; - return written; } finally { file.writeLock().unlock(); } @@ -237,10 +242,9 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return written; } @Override @@ -251,25 +255,23 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkWritable(); + long written = 0; // will definitely either be assigned or an exception will be thrown + synchronized (this) { boolean completed = false; try { - beginBlocking(); - if (!isOpen()) { + if (!beginBlocking()) { return 0; // AsynchronousCloseException will be thrown } - file.writeLock().lockInterruptibly(); try { if (append) { position = file.size(); } - long written = file.write(position, buffers); + written = file.write(position, buffers); position += written; - file.updateModifiedTime(); completed = true; - return written; } finally { file.writeLock().unlock(); } @@ -278,19 +280,32 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return written; } @Override public long position() throws IOException { checkOpen(); + long pos; + synchronized (this) { - return position; + boolean completed = false; + try { + begin(); // don't call beginBlocking() because this method doesn't block + if (!isOpen()) { + return 0; // AsynchronousCloseException will be thrown + } + pos = this.position; + completed = true; + } finally { + end(completed); + } } + + return pos; } @Override @@ -299,7 +314,17 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); synchronized (this) { - this.position = newPosition; + boolean completed = false; + try { + begin(); // don't call beginBlocking() because this method doesn't block + if (!isOpen()) { + return this; // AsynchronousCloseException will be thrown + } + this.position = newPosition; + completed = true; + } finally { + end(completed); + } } return this; @@ -308,7 +333,28 @@ final class JimfsFileChannel extends FileChannel { @Override public long size() throws IOException { checkOpen(); - return file.size(); + + long size = 0; // will definitely either be assigned or an exception will be thrown + + boolean completed = false; + try { + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown + } + file.readLock().lockInterruptibly(); + try { + size = file.sizeWithoutLocking(); + completed = true; + } finally { + file.readLock().unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endBlocking(completed); + } + + return size; } @Override @@ -320,21 +366,17 @@ final class JimfsFileChannel extends FileChannel { synchronized (this) { boolean completed = false; try { - beginBlocking(); - if (!isOpen()) { + if (!beginBlocking()) { return this; // AsynchronousCloseException will be thrown } - file.writeLock().lockInterruptibly(); try { file.truncate(size); if (position > size) { position = size; } - file.updateModifiedTime(); completed = true; - return this; } finally { file.writeLock().unlock(); } @@ -343,16 +385,24 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return this; } @Override public void force(boolean metaData) throws IOException { checkOpen(); - // do nothing... writes are all synchronous anyway + + // nothing to do since writes are all direct to the storage + // however, we should handle the thread being interrupted anyway + boolean completed = false; + try { + begin(); + completed = true; + } finally { + end(completed); + } } @Override @@ -363,32 +413,29 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkReadable(); - synchronized (this) { - boolean completed = false; - try { - beginBlocking(); - if (!isOpen()) { - return 0; // AsynchronousCloseException will be thrown - } + long transferred = 0; // will definitely either be assigned or an exception will be thrown - file.readLock().lockInterruptibly(); - try { - long transferred = file.transferTo(position, count, target); - file.updateAccessTime(); - completed = true; - return transferred; - } finally { - file.readLock().unlock(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // no need to synchronize here; this method does not make use of the channel's position + boolean completed = false; + try { + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown + } + file.readLock().lockInterruptibly(); + try { + transferred = file.transferTo(position, count, target); + file.updateAccessTime(); + completed = true; } finally { - endBlocking(completed); + file.readLock().unlock(); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endBlocking(completed); } + + return transferred; } @Override @@ -399,29 +446,45 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkWritable(); - synchronized (this) { - boolean completed = false; - try { - beginBlocking(); - if (!isOpen()) { - return 0; // AsynchronousCloseException will be thrown - } + long transferred = 0; // will definitely either be assigned or an exception will be thrown - file.writeLock().lockInterruptibly(); + if (append) { + // synchronize because appending does update the channel's position + synchronized (this) { + boolean completed = false; try { - if (append) { - position = file.size(); + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown } - long transferred = file.transferFrom(src, position, count); - - if (append) { + file.writeLock().lockInterruptibly(); + try { + position = file.sizeWithoutLocking(); + transferred = file.transferFrom(src, position, count); this.position = position + transferred; + file.updateModifiedTime(); + completed = true; + } finally { + file.writeLock().unlock(); } - + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endBlocking(completed); + } + } + } else { + // don't synchronize because the channel's position is not involved + boolean completed = false; + try { + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown + } + file.writeLock().lockInterruptibly(); + try { + transferred = file.transferFrom(src, position, count); file.updateModifiedTime(); completed = true; - return transferred; } finally { file.writeLock().unlock(); } @@ -430,10 +493,9 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return transferred; } @Override @@ -443,32 +505,29 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkReadable(); - synchronized (this) { - boolean completed = false; - try { - beginBlocking(); - if (!isOpen()) { - return 0; // AsynchronousCloseException will be thrown - } + int read = 0; // will definitely either be assigned or an exception will be thrown - file.readLock().lockInterruptibly(); - try { - int read = file.read(position, dst); - file.updateAccessTime(); - completed = true; - return read; - } finally { - file.readLock().unlock(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // no need to synchronize here; this method does not make use of the channel's position + boolean completed = false; + try { + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown + } + file.readLock().lockInterruptibly(); + try { + read = file.read(position, dst); + file.updateAccessTime(); + completed = true; } finally { - endBlocking(completed); + file.readLock().unlock(); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endBlocking(completed); } + + return read; } @Override @@ -478,29 +537,45 @@ final class JimfsFileChannel extends FileChannel { checkOpen(); checkWritable(); - synchronized (this) { - boolean completed = false; - try { - beginBlocking(); - if (!isOpen()) { - return 0; // AsynchronousCloseException will be thrown - } + int written = 0; // will definitely either be assigned or an exception will be thrown - file.writeLock().lockInterruptibly(); + if (append) { + // synchronize because appending does update the channel's position + synchronized (this) { + boolean completed = false; try { - if (append) { - position = file.sizeWithoutLocking(); + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown } - int written = file.write(position, src); - - if (append) { + file.writeLock().lockInterruptibly(); + try { + position = file.sizeWithoutLocking(); + written = file.write(position, src); this.position = position + written; + file.updateModifiedTime(); + completed = true; + } finally { + file.writeLock().unlock(); } - + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endBlocking(completed); + } + } + } else { + // don't synchronize because the channel's position is not involved + boolean completed = false; + try { + if (!beginBlocking()) { + return 0; // AsynchronousCloseException will be thrown + } + file.writeLock().lockInterruptibly(); + try { + written = file.write(position, src); file.updateModifiedTime(); completed = true; - return written; } finally { file.writeLock().unlock(); } @@ -509,10 +584,9 @@ final class JimfsFileChannel extends FileChannel { } finally { endBlocking(completed); } - - // if InterruptedException is caught, endBlocking will throw ClosedByInterruptException - throw new AssertionError(); } + + return written; } @Override @@ -533,7 +607,14 @@ final class JimfsFileChannel extends FileChannel { checkWritable(); } - return new FakeFileLock(this, position, size, shared); + boolean completed = false; + try { + begin(); + completed = true; + return new FakeFileLock(this, position, size, shared); + } finally { + end(completed); + } } @Override @@ -544,11 +625,13 @@ final class JimfsFileChannel extends FileChannel { @Override protected void implCloseChannel() { - // interrupt the current blocking thread, if any, causing it to throw ClosedByInterruptException + // interrupt the current blocking threads, if any, causing them to throw + // ClosedByInterruptException try { - final Thread thread = blockingThread; - if (thread != null) { - thread.interrupt(); + synchronized (blockingThreads) { + for (Thread thread : blockingThreads) { + thread.interrupt(); + } } } finally { fileSystemState.unregister(this); diff --git a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java index 2eb0743..756364a 100644 --- a/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java +++ b/jimfs/src/test/java/com/google/common/jimfs/JimfsFileChannelTest.java @@ -871,4 +871,131 @@ public class JimfsFileChannelTest { return futures; } + + /** + * Tests that the methods on the default FileChannel that support InterruptibleChannel behavior + * also support it on JimfsFileChannel, by just interrupting the thread before calling the + * method. + */ + @Test + public void testInterruptedThreads() throws IOException { + final ByteBuffer buf = ByteBuffer.allocate(10); + final ByteBuffer[] bufArray = { buf }; + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.size(); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.position(); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.position(0); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.write(buf); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.write(bufArray, 0, 1); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.read(buf); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.read(bufArray, 0, 1); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.write(buf, 0); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.read(buf, 0); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.transferTo(0, 1, channel(regularFile(10), READ, WRITE)); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.transferFrom(channel(regularFile(10), READ, WRITE), 0, 1); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.force(true); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.truncate(0); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.lock(0, 1, true); + } + }); + + assertClosedByInterrupt(new FileChannelMethod() { + @Override public void call(FileChannel channel) throws IOException { + channel.tryLock(0, 1, true); + } + }); + + // the map() method always throws UnsupportedOperationException; it doesn't make sense for it + // to try to handle thread interruption + } + + private interface FileChannelMethod { + void call(FileChannel channel) throws IOException; + } + + /** + * Asserts that when the given operation is run on an interrupted thread, + * {@code ClosedByInterruptException} is thrown, the channel is closed and the thread is no + * longer interrupted. + */ + private static void assertClosedByInterrupt(FileChannelMethod method) throws IOException { + FileChannel channel = channel(regularFile(10), READ, WRITE); + Thread.currentThread().interrupt(); + try { + method.call(channel); + fail("expected the method to throw ClosedByInterruptException"); + } catch (ClosedByInterruptException expected) { + assertFalse("expected the channel to be closed", channel.isOpen()); + assertTrue("expected the thread to still be interrupted", Thread.interrupted()); + } finally { + Thread.interrupted(); // ensure the thread isn't interrupted when this method returns + } + } } |