diff options
author | Colin Decker <cgdecker@google.com> | 2013-11-05 18:15:07 -0500 |
---|---|---|
committer | Colin Decker <cgdecker@google.com> | 2013-11-05 18:17:13 -0500 |
commit | 3abd95a268ae109c9a132377030d0911d2f0fb37 (patch) | |
tree | 6016c4b89371d2b890ac335157e2976a9aa0d072 | |
parent | b6f29dbe704f40d442469890ce7c0dde4668379d (diff) | |
download | jimfs-3abd95a268ae109c9a132377030d0911d2f0fb37.tar.gz |
Simplify AbstractWatchService implementation a bit.
-rw-r--r-- | jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java | 76 | ||||
-rw-r--r-- | jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java | 1 |
2 files changed, 29 insertions, 48 deletions
diff --git a/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java b/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java index b0757f6..35090a6 100644 --- a/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java +++ b/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java @@ -36,13 +36,11 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; @@ -56,20 +54,16 @@ import javax.annotation.Nullable; */ abstract class AbstractWatchService implements WatchService { - private final TransferQueue<WatchKey> queue = new LinkedTransferQueue<>(); + private final BlockingQueue<WatchKey> queue = new LinkedBlockingQueue<>(); private final WatchKey poison = new Key(this, null, ImmutableSet.<WatchEvent.Kind<?>>of()); - /** - * This lock is used to ensure that no thread may block on poll with timeout or take and miss - * being notified that the service has been closed. Each thread acquires a read lock before - * blocking and the close() method transfers the poison to blocked queue readers until it manages - * to acquire the write lock, at which point it's guaranteed there will be no more blocked - * consumers. - */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final AtomicBoolean open = new AtomicBoolean(true); + /** + * Registers the given watchable with this service, returning a new watch key for it. This + * implementation just checks that the service is open and creates a key; subclasses may override + * it to do other things as well. + */ public Key register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes) throws IOException { checkOpen(); @@ -84,7 +78,10 @@ abstract class AbstractWatchService implements WatchService { return open.get(); } - void enqueue(Key key) { + /** + * Enqueues the given key. + */ + final void enqueue(Key key) { if (isOpen()) { queue.add(key); } @@ -101,47 +98,43 @@ abstract class AbstractWatchService implements WatchService { return ImmutableList.copyOf(queue); } + @Nullable @Override public WatchKey poll() { checkOpen(); return check(queue.poll()); } + @Nullable @Override public WatchKey poll(long timeout, TimeUnit unit) throws InterruptedException { checkOpen(); - lock.readLock().lock(); - try { - checkOpen(); // check again because it's possible the lock blocked for close() to complete - return check(queue.poll(timeout, unit)); - } finally { - lock.readLock().unlock(); - } + return check(queue.poll(timeout, unit)); } @Override public WatchKey take() throws InterruptedException { checkOpen(); - lock.readLock().lock(); - try { - checkOpen(); // check again because it's possible the lock blocked for close() to complete - return check(queue.take()); - } finally { - lock.readLock().unlock(); - } + return check(queue.take()); } /** * Returns the given key, throwing an exception if it's the poison. */ - private WatchKey check(WatchKey key) { + @Nullable + private WatchKey check(@Nullable WatchKey key) { if (key == poison) { + // ensure other blocking threads get the poison + queue.offer(poison); throw new ClosedWatchServiceException(); } return key; } - private void checkOpen() { + /** + * Checks that the watch service is open, throwing {@link ClosedWatchServiceException} if not. + */ + protected final void checkOpen() { if (!open.get()) { throw new ClosedWatchServiceException(); } @@ -150,22 +143,8 @@ abstract class AbstractWatchService implements WatchService { @Override public void close() { if (open.compareAndSet(true, false)) { - // TODO(cgdecker): If there's a better way to guarantee that no thread is blocked on the - // queue after this is closed I'd love to know - - // Attempt to acquire the write lock... each time we fail, there may be threads blocked - // on the queue (if none are blocked, they will be blocked soon) - while (!lock.writeLock().tryLock()) { - // so transfer the poison to each blocked thread and attempt to acquire the lock again - boolean poisonTransferred; - do { - poisonTransferred = queue.tryTransfer(poison); - } while (poisonTransferred); - } - - // the write lock has been acquired, so no threads are blocking on the queue - // can now just unlock... any thread blocking for a read lock will now see this is closed - lock.writeLock().unlock(); + queue.clear(); + queue.offer(poison); } } @@ -176,8 +155,9 @@ abstract class AbstractWatchService implements WatchService { private final Kind<T> kind; private final int count; - private final @Nullable - T context; + + @Nullable + private final T context; public Event(Kind<T> kind, int count, @Nullable T context) { this.kind = checkNotNull(kind); diff --git a/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java b/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java index b5c4941..36ea5df 100644 --- a/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java +++ b/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java @@ -155,6 +155,7 @@ final class PollingWatchService extends AbstractWatchService { super.close(); synchronized (this) { + // synchronize to ensure no new for (Key key : snapshots.keySet()) { key.cancel(); } |