aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java')
-rw-r--r--src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java553
1 files changed, 553 insertions, 0 deletions
diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
new file mode 100644
index 000000000..5f158660c
--- /dev/null
+++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.lang3.concurrent;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.AbstractLangTest;
+import org.apache.commons.lang3.ThreadUtils;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for TimedSemaphore.
+ */
+public class TimedSemaphoreTest extends AbstractLangTest {
+ /** Constant for the time period. */
+ private static final long PERIOD_MILLIS = 500;
+
+ private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
+
+ /** Constant for the time unit. */
+ private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
+
+ /** Constant for the default limit. */
+ private static final int LIMIT = 10;
+
+ /**
+ * Tests creating a new instance.
+ */
+ @Test
+ public void testInit() {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ EasyMock.replay(service);
+ final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
+ LIMIT);
+ EasyMock.verify(service);
+ assertEquals(service, semaphore.getExecutorService(), "Wrong service");
+ assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
+ assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
+ assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
+ assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
+ assertFalse(semaphore.isShutdown(), "Already shutdown");
+ assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
+ }
+
+ /**
+ * Tries to create an instance with a negative period. This should cause an
+ * exception.
+ */
+ @Test
+ public void testInitInvalidPeriod() {
+ assertThrows(IllegalArgumentException.class, () -> new TimedSemaphore(0L, UNIT, LIMIT));
+ }
+
+ /**
+ * Tests whether a default executor service is created if no service is
+ * provided.
+ */
+ @Test
+ public void testInitDefaultService() {
+ final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
+ final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
+ .getExecutorService();
+ assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
+ assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
+ assertFalse(exec.isShutdown(), "Already shutdown");
+ semaphore.shutdown();
+ }
+
+ /**
+ * Tests starting the timer.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testStartTimer() throws InterruptedException {
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS,
+ UNIT, LIMIT);
+ final ScheduledFuture<?> future = semaphore.startTimer();
+ assertNotNull(future, "No future returned");
+ ThreadUtils.sleepQuietly(DURATION);
+ final int trials = 10;
+ int count = 0;
+ do {
+ Thread.sleep(PERIOD_MILLIS);
+ assertFalse(count++ > trials, "endOfPeriod() not called!");
+ } while (semaphore.getPeriodEnds() <= 0);
+ semaphore.shutdown();
+ }
+
+ /**
+ * Tests the shutdown() method if the executor belongs to the semaphore. In
+ * this case it has to be shut down.
+ */
+ @Test
+ public void testShutdownOwnExecutor() {
+ final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
+ semaphore.shutdown();
+ assertTrue(semaphore.isShutdown(), "Not shutdown");
+ assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
+ }
+
+ /**
+ * Tests the shutdown() method for a shared executor service before a task
+ * was started. This should do pretty much nothing.
+ */
+ @Test
+ public void testShutdownSharedExecutorNoTask() {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ EasyMock.replay(service);
+ final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
+ LIMIT);
+ semaphore.shutdown();
+ assertTrue(semaphore.isShutdown(), "Not shutdown");
+ EasyMock.verify(service);
+ }
+
+ /**
+ * Prepares an executor service mock to expect the start of the timer.
+ *
+ * @param service the mock
+ * @param future the future
+ */
+ private void prepareStartTimer(final ScheduledExecutorService service,
+ final ScheduledFuture<?> future) {
+ service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
+ .eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
+ EasyMock.expectLastCall().andReturn(future);
+ }
+
+ /**
+ * Tests the shutdown() method for a shared executor after the task was
+ * started. In this case the task must be canceled.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testShutdownSharedExecutorTask() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
+ EasyMock.replay(service, future);
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+ PERIOD_MILLIS, UNIT, LIMIT);
+ semaphore.acquire();
+ semaphore.shutdown();
+ assertTrue(semaphore.isShutdown(), "Not shutdown");
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests multiple invocations of the shutdown() method.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testShutdownMultipleTimes() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
+ EasyMock.replay(service, future);
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+ PERIOD_MILLIS, UNIT, LIMIT);
+ semaphore.acquire();
+ for (int i = 0; i < 10; i++) {
+ semaphore.shutdown();
+ }
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests the acquire() method if a limit is set.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testAcquireLimit() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.replay(service, future);
+ final int count = 10;
+ final CountDownLatch latch = new CountDownLatch(count - 1);
+ final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
+ final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
+ count - 1);
+ semaphore.setLimit(count - 1);
+
+ // start a thread that calls the semaphore count times
+ t.start();
+ latch.await();
+ // now the semaphore's limit should be reached and the thread blocked
+ assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
+
+ // this wakes up the thread, it should call the semaphore once more
+ semaphore.endOfPeriod();
+ t.join();
+ assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
+ assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests the acquire() method if more threads are involved than the limit.
+ * This method starts a number of threads that all invoke the semaphore. The
+ * semaphore's limit is set to 1, so in each period only a single thread can
+ * acquire the semaphore.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testAcquireMultipleThreads() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.replay(service, future);
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+ PERIOD_MILLIS, UNIT, 1);
+ semaphore.latch = new CountDownLatch(1);
+ final int count = 10;
+ final SemaphoreThread[] threads = new SemaphoreThread[count];
+ for (int i = 0; i < count; i++) {
+ threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
+ threads[i].start();
+ }
+ for (int i = 0; i < count; i++) {
+ semaphore.latch.await();
+ assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
+ semaphore.latch = new CountDownLatch(1);
+ semaphore.endOfPeriod();
+ assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
+ }
+ for (int i = 0; i < count; i++) {
+ threads[i].join();
+ }
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests the acquire() method if no limit is set. A test thread is started
+ * that calls the semaphore a large number of times. Even if the semaphore's
+ * period does not end, the thread should never block.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testAcquireNoLimit() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.replay(service, future);
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+ PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
+ final int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
+ t.start();
+ latch.await();
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tries to call acquire() after shutdown(). This should cause an exception.
+ */
+ @Test
+ public void testPassAfterShutdown() {
+ final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
+ semaphore.shutdown();
+ assertThrows(IllegalStateException.class, semaphore::acquire);
+ }
+
+ /**
+ * Tests a bigger number of invocations that span multiple periods. The
+ * period is set to a very short time. A background thread calls the
+ * semaphore a large number of times. While it runs at last one end of a
+ * period should be reached.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testAcquireMultiplePeriods() throws InterruptedException {
+ final int count = 1000;
+ final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
+ PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
+ semaphore.setLimit(count / 4);
+ final CountDownLatch latch = new CountDownLatch(count);
+ final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
+ t.start();
+ latch.await();
+ semaphore.shutdown();
+ assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
+ }
+
+ /**
+ * Tests the methods for statistics.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testGetAverageCallsPerPeriod() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.replay(service, future);
+ final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
+ LIMIT);
+ semaphore.acquire();
+ semaphore.endOfPeriod();
+ assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
+ semaphore.acquire();
+ semaphore.acquire();
+ semaphore.endOfPeriod();
+ assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests whether the available non-blocking calls can be queried.
+ *
+ * @throws InterruptedException so we don't have to catch it
+ */
+ @Test
+ public void testGetAvailablePermits() throws InterruptedException {
+ final ScheduledExecutorService service = EasyMock
+ .createMock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+ prepareStartTimer(service, future);
+ EasyMock.replay(service, future);
+ final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
+ LIMIT);
+ for (int i = 0; i < LIMIT; i++) {
+ assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
+ semaphore.acquire();
+ }
+ semaphore.endOfPeriod();
+ assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
+ EasyMock.verify(service, future);
+ }
+
+ /**
+ * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
+ * by a bunch of threads the expected number of times and not more.
+ */
+ @Test
+ public void testTryAcquire() throws InterruptedException {
+ final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS,
+ LIMIT);
+ final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
+ final CountDownLatch latch = new CountDownLatch(1);
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new TryAcquireThread(semaphore, latch);
+ threads[i].start();
+ }
+
+ latch.countDown();
+ int permits = 0;
+ for (final TryAcquireThread t : threads) {
+ t.join();
+ if (t.acquired) {
+ permits++;
+ }
+ }
+ assertEquals(LIMIT, permits, "Wrong number of permits granted");
+ }
+
+ /**
+ * Tries to call tryAcquire() after shutdown(). This should cause an exception.
+ */
+ @Test
+ public void testTryAcquireAfterShutdown() {
+ final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
+ semaphore.shutdown();
+ assertThrows(IllegalStateException.class, semaphore::tryAcquire);
+ }
+
+ /**
+ * A specialized implementation of {@code TimedSemaphore} that is easier to
+ * test.
+ */
+ private static class TimedSemaphoreTestImpl extends TimedSemaphore {
+ /** A mock scheduled future. */
+ ScheduledFuture<?> schedFuture;
+
+ /** A latch for synchronizing with the main thread. */
+ volatile CountDownLatch latch;
+
+ /** Counter for the endOfPeriod() invocations. */
+ private int periodEnds;
+
+ TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
+ final int limit) {
+ super(timePeriod, timeUnit, limit);
+ }
+
+ TimedSemaphoreTestImpl(final ScheduledExecutorService service,
+ final long timePeriod, final TimeUnit timeUnit, final int limit) {
+ super(service, timePeriod, timeUnit, limit);
+ }
+
+ /**
+ * Returns the number of invocations of the endOfPeriod() method.
+ *
+ * @return the endOfPeriod() invocations
+ */
+ int getPeriodEnds() {
+ synchronized (this) {
+ return periodEnds;
+ }
+ }
+
+ /**
+ * Invokes the latch if one is set.
+ *
+ * @throws InterruptedException because it is declared that way in TimedSemaphore
+ */
+ @Override
+ public synchronized void acquire() throws InterruptedException {
+ super.acquire();
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Counts the number of invocations.
+ */
+ @Override
+ protected synchronized void endOfPeriod() {
+ super.endOfPeriod();
+ periodEnds++;
+ }
+
+ /**
+ * Either returns the mock future or calls the super method.
+ */
+ @Override
+ protected ScheduledFuture<?> startTimer() {
+ return schedFuture != null ? schedFuture : super.startTimer();
+ }
+ }
+
+ /**
+ * A test thread class that will be used by tests for triggering the
+ * semaphore. The thread calls the semaphore a configurable number of times.
+ * When this is done, it can notify the main thread.
+ */
+ private static class SemaphoreThread extends Thread {
+ /** The semaphore. */
+ private final TimedSemaphore semaphore;
+
+ /** A latch for communication with the main thread. */
+ private final CountDownLatch latch;
+
+ /** The number of acquire() calls. */
+ private final int count;
+
+ /** The number of invocations of the latch. */
+ private final int latchCount;
+
+ SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
+ semaphore = b;
+ latch = l;
+ count = c;
+ latchCount = lc;
+ }
+
+ /**
+ * Calls acquire() on the semaphore for the specified number of times.
+ * Optionally the latch will also be triggered to synchronize with the
+ * main test thread.
+ */
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < count; i++) {
+ semaphore.acquire();
+
+ if (i < latchCount) {
+ latch.countDown();
+ }
+ }
+ } catch (final InterruptedException iex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * A test thread class which invokes {@code tryAcquire()} on the test semaphore and
+ * records the return value.
+ */
+ private static class TryAcquireThread extends Thread {
+ /** The semaphore. */
+ private final TimedSemaphore semaphore;
+
+ /** A latch for communication with the main thread. */
+ private final CountDownLatch latch;
+
+ /** Flag whether a permit could be acquired. */
+ private boolean acquired;
+
+ TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
+ semaphore = s;
+ latch = l;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (latch.await(10, TimeUnit.SECONDS)) {
+ acquired = semaphore.tryAcquire();
+ }
+ } catch (final InterruptedException iex) {
+ // ignore
+ }
+ }
+ }
+}