aboutsummaryrefslogtreecommitdiff
path: root/jimfs/src/main
diff options
context:
space:
mode:
authorColin Decker <cgdecker@google.com>2013-11-05 18:15:07 -0500
committerColin Decker <cgdecker@google.com>2013-11-05 18:17:13 -0500
commit3abd95a268ae109c9a132377030d0911d2f0fb37 (patch)
tree6016c4b89371d2b890ac335157e2976a9aa0d072 /jimfs/src/main
parentb6f29dbe704f40d442469890ce7c0dde4668379d (diff)
downloadjimfs-3abd95a268ae109c9a132377030d0911d2f0fb37.tar.gz
Simplify AbstractWatchService implementation a bit.
Diffstat (limited to 'jimfs/src/main')
-rw-r--r--jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java76
-rw-r--r--jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java1
2 files changed, 29 insertions, 48 deletions
diff --git a/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java b/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java
index b0757f6..35090a6 100644
--- a/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java
+++ b/jimfs/src/main/java/com/google/jimfs/internal/AbstractWatchService.java
@@ -36,13 +36,11 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
@@ -56,20 +54,16 @@ import javax.annotation.Nullable;
*/
abstract class AbstractWatchService implements WatchService {
- private final TransferQueue<WatchKey> queue = new LinkedTransferQueue<>();
+ private final BlockingQueue<WatchKey> queue = new LinkedBlockingQueue<>();
private final WatchKey poison = new Key(this, null, ImmutableSet.<WatchEvent.Kind<?>>of());
- /**
- * This lock is used to ensure that no thread may block on poll with timeout or take and miss
- * being notified that the service has been closed. Each thread acquires a read lock before
- * blocking and the close() method transfers the poison to blocked queue readers until it manages
- * to acquire the write lock, at which point it's guaranteed there will be no more blocked
- * consumers.
- */
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
private final AtomicBoolean open = new AtomicBoolean(true);
+ /**
+ * Registers the given watchable with this service, returning a new watch key for it. This
+ * implementation just checks that the service is open and creates a key; subclasses may override
+ * it to do other things as well.
+ */
public Key register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes)
throws IOException {
checkOpen();
@@ -84,7 +78,10 @@ abstract class AbstractWatchService implements WatchService {
return open.get();
}
- void enqueue(Key key) {
+ /**
+ * Enqueues the given key.
+ */
+ final void enqueue(Key key) {
if (isOpen()) {
queue.add(key);
}
@@ -101,47 +98,43 @@ abstract class AbstractWatchService implements WatchService {
return ImmutableList.copyOf(queue);
}
+ @Nullable
@Override
public WatchKey poll() {
checkOpen();
return check(queue.poll());
}
+ @Nullable
@Override
public WatchKey poll(long timeout, TimeUnit unit) throws InterruptedException {
checkOpen();
- lock.readLock().lock();
- try {
- checkOpen(); // check again because it's possible the lock blocked for close() to complete
- return check(queue.poll(timeout, unit));
- } finally {
- lock.readLock().unlock();
- }
+ return check(queue.poll(timeout, unit));
}
@Override
public WatchKey take() throws InterruptedException {
checkOpen();
- lock.readLock().lock();
- try {
- checkOpen(); // check again because it's possible the lock blocked for close() to complete
- return check(queue.take());
- } finally {
- lock.readLock().unlock();
- }
+ return check(queue.take());
}
/**
* Returns the given key, throwing an exception if it's the poison.
*/
- private WatchKey check(WatchKey key) {
+ @Nullable
+ private WatchKey check(@Nullable WatchKey key) {
if (key == poison) {
+ // ensure other blocking threads get the poison
+ queue.offer(poison);
throw new ClosedWatchServiceException();
}
return key;
}
- private void checkOpen() {
+ /**
+ * Checks that the watch service is open, throwing {@link ClosedWatchServiceException} if not.
+ */
+ protected final void checkOpen() {
if (!open.get()) {
throw new ClosedWatchServiceException();
}
@@ -150,22 +143,8 @@ abstract class AbstractWatchService implements WatchService {
@Override
public void close() {
if (open.compareAndSet(true, false)) {
- // TODO(cgdecker): If there's a better way to guarantee that no thread is blocked on the
- // queue after this is closed I'd love to know
-
- // Attempt to acquire the write lock... each time we fail, there may be threads blocked
- // on the queue (if none are blocked, they will be blocked soon)
- while (!lock.writeLock().tryLock()) {
- // so transfer the poison to each blocked thread and attempt to acquire the lock again
- boolean poisonTransferred;
- do {
- poisonTransferred = queue.tryTransfer(poison);
- } while (poisonTransferred);
- }
-
- // the write lock has been acquired, so no threads are blocking on the queue
- // can now just unlock... any thread blocking for a read lock will now see this is closed
- lock.writeLock().unlock();
+ queue.clear();
+ queue.offer(poison);
}
}
@@ -176,8 +155,9 @@ abstract class AbstractWatchService implements WatchService {
private final Kind<T> kind;
private final int count;
- private final @Nullable
- T context;
+
+ @Nullable
+ private final T context;
public Event(Kind<T> kind, int count, @Nullable T context) {
this.kind = checkNotNull(kind);
diff --git a/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java b/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java
index b5c4941..36ea5df 100644
--- a/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java
+++ b/jimfs/src/main/java/com/google/jimfs/internal/PollingWatchService.java
@@ -155,6 +155,7 @@ final class PollingWatchService extends AbstractWatchService {
super.close();
synchronized (this) {
+ // synchronize to ensure no new
for (Key key : snapshots.keySet()) {
key.cancel();
}