aboutsummaryrefslogtreecommitdiff
path: root/src/share/classes/java/util/concurrent
diff options
context:
space:
mode:
authordl <none@none>2014-09-05 10:54:28 +0200
committerdl <none@none>2014-09-05 10:54:28 +0200
commitec6e43136420e3c382e7982bf134d7b0f0f3bb27 (patch)
tree27cf84317503f24614ee5e49ca345ce1872ccb19 /src/share/classes/java/util/concurrent
parentb0a5d922d782225e5742bea4a47fa9480e381763 (diff)
downloadjdk8u_jdk-ec6e43136420e3c382e7982bf134d7b0f0f3bb27.tar.gz
8056248: Improve ForkJoin thread throttling
Reviewed-by: psandoz, martin, chegar
Diffstat (limited to 'src/share/classes/java/util/concurrent')
-rw-r--r--src/share/classes/java/util/concurrent/ForkJoinPool.java2852
-rw-r--r--src/share/classes/java/util/concurrent/ForkJoinTask.java187
-rw-r--r--src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java11
3 files changed, 1566 insertions, 1484 deletions
diff --git a/src/share/classes/java/util/concurrent/ForkJoinPool.java b/src/share/classes/java/util/concurrent/ForkJoinPool.java
index 9f5e14914d..8b6ed6deac 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -49,6 +49,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
import java.security.Permissions;
@@ -80,9 +81,9 @@ import java.security.Permissions;
*
* <p>For applications that require separate or custom pools, a {@code
* ForkJoinPool} may be constructed with a given target parallelism
- * level; by default, equal to the number of available processors. The
- * pool attempts to maintain enough active (or available) threads by
- * dynamically adding, suspending, or resuming internal worker
+ * level; by default, equal to the number of available processors.
+ * The pool attempts to maintain enough active (or available) threads
+ * by dynamically adding, suspending, or resuming internal worker
* threads, even if some tasks are stalled waiting to join others.
* However, no such adjustments are guaranteed in the face of blocked
* I/O or other unmanaged synchronization. The nested {@link
@@ -178,7 +179,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* that may be stolen by other workers. Preference rules give
* first priority to processing tasks from their own queues (LIFO
* or FIFO, depending on mode), then to randomized FIFO steals of
- * tasks in other queues.
+ * tasks in other queues. This framework began as vehicle for
+ * supporting tree-structured parallelism using work-stealing.
+ * Over time, its scalability advantages led to extensions and
+ * changes to better support more diverse usage contexts. Because
+ * most internal methods and nested classes are interrelated,
+ * their main rationale and descriptions are presented here;
+ * individual methods and nested classes contain only brief
+ * comments about details.
*
* WorkQueues
* ==========
@@ -198,201 +206,318 @@ public class ForkJoinPool extends AbstractExecutorService {
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
- * See also "Correct and Efficient Work-Stealing for Weak Memory
- * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
- * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
- * analysis of memory ordering (atomic, volatile etc) issues. The
- * main differences ultimately stem from GC requirements that we
- * null out taken slots as soon as we can, to maintain as small a
- * footprint as possible even in programs generating huge numbers
- * of tasks. To accomplish this, we shift the CAS arbitrating pop
- * vs poll (steal) from being on the indices ("base" and "top") to
- * the slots themselves. So, both a successful pop and poll
- * mainly entail a CAS of a slot from non-null to null. Because
- * we rely on CASes of references, we do not need tag bits on base
- * or top. They are simple ints as used in any circular
+ * The main differences ultimately stem from GC requirements that
+ * we null out taken slots as soon as we can, to maintain as small
+ * a footprint as possible even in programs generating huge
+ * numbers of tasks. To accomplish this, we shift the CAS
+ * arbitrating pop vs poll (steal) from being on the indices
+ * ("base" and "top") to the slots themselves.
+ *
+ * Adding tasks then takes the form of a classic array push(task):
+ * q.array[q.top] = task; ++q.top;
+ *
+ * (The actual code needs to null-check and size-check the array,
+ * properly fence the accesses, and possibly signal waiting
+ * workers to start scanning -- see below.) Both a successful pop
+ * and poll mainly entail a CAS of a slot from non-null to null.
+ *
+ * The pop operation (always performed by owner) is:
+ * if ((base != top) and
+ * (the task at top slot is not null) and
+ * (CAS slot to null))
+ * decrement top and return task;
+ *
+ * And the poll operation (usually by a stealer) is
+ * if ((base != top) and
+ * (the task at base slot is not null) and
+ * (base has not changed) and
+ * (CAS slot to null))
+ * increment base and return task;
+ *
+ * Because we rely on CASes of references, we do not need tag bits
+ * on base or top. They are simple ints as used in any circular
* array-based queue (see for example ArrayDeque). Updates to the
- * indices must still be ordered in a way that guarantees that top
- * == base means the queue is empty, but otherwise may err on the
- * side of possibly making the queue appear nonempty when a push,
- * pop, or poll have not fully committed. Note that this means
- * that the poll operation, considered individually, is not
- * wait-free. One thief cannot successfully continue until another
- * in-progress one (or, if previously empty, a push) completes.
- * However, in the aggregate, we ensure at least probabilistic
+ * indices guarantee that top == base means the queue is empty,
+ * but otherwise may err on the side of possibly making the queue
+ * appear nonempty when a push, pop, or poll have not fully
+ * committed. (Method isEmpty() checks the case of a partially
+ * completed removal of the last element.) Because of this, the
+ * poll operation, considered individually, is not wait-free. One
+ * thief cannot successfully continue until another in-progress
+ * one (or, if previously empty, a push) completes. However, in
+ * the aggregate, we ensure at least probabilistic
* non-blockingness. If an attempted steal fails, a thief always
* chooses a different random victim target to try next. So, in
* order for one thief to progress, it suffices for any
* in-progress poll or new push on any empty queue to
* complete. (This is why we normally use method pollAt and its
* variants that try once at the apparent base index, else
- * consider alternative actions, rather than method poll.)
- *
- * This approach also enables support of a user mode in which local
- * task processing is in FIFO, not LIFO order, simply by using
- * poll rather than pop. This can be useful in message-passing
- * frameworks in which tasks are never joined. However neither
- * mode considers affinities, loads, cache localities, etc, so
- * rarely provide the best possible performance on a given
- * machine, but portably provide good throughput by averaging over
- * these factors. (Further, even if we did try to use such
- * information, we do not usually have a basis for exploiting it.
- * For example, some sets of tasks profit from cache affinities,
- * but others are harmed by cache pollution effects.)
+ * consider alternative actions, rather than method poll, which
+ * retries.)
+ *
+ * This approach also enables support of a user mode in which
+ * local task processing is in FIFO, not LIFO order, simply by
+ * using poll rather than pop. This can be useful in
+ * message-passing frameworks in which tasks are never joined.
+ * However neither mode considers affinities, loads, cache
+ * localities, etc, so rarely provide the best possible
+ * performance on a given machine, but portably provide good
+ * throughput by averaging over these factors. Further, even if
+ * we did try to use such information, we do not usually have a
+ * basis for exploiting it. For example, some sets of tasks
+ * profit from cache affinities, but others are harmed by cache
+ * pollution effects. Additionally, even though it requires
+ * scanning, long-term throughput is often best using random
+ * selection rather than directed selection policies, so cheap
+ * randomization of sufficient quality is used whenever
+ * applicable. Various Marsaglia XorShifts (some with different
+ * shift constants) are inlined at use points.
*
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
- * for work-stealing (this would contaminate lifo/fifo
- * processing). Instead, we randomly associate submission queues
+ * by workers. Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters. In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or in the case of CountedCompleters,
- * others with the same root task). However, because most
- * shared/external queue operations are more expensive than
- * internal, and because, at steady state, external submitters
- * will compete for CPU with workers, ForkJoinTask.join and
- * related methods disable them from repeatedly helping to process
- * tasks if all workers are active. Insertion of tasks in shared
+ * others with the same root task). Insertion of tasks in shared
* mode requires a lock (mainly to protect in the case of
- * resizing) but we use only a simple spinlock (using bits in
- * field qlock), because submitters encountering a busy queue move
- * on to try or create other queues -- they block only when
- * creating and registering new queues.
+ * resizing) but we use only a simple spinlock (using field
+ * qlock), because submitters encountering a busy queue move on to
+ * try or create other queues -- they block only when creating and
+ * registering new queues. Additionally, "qlock" saturates to an
+ * unlockable value (-1) at shutdown. Unlocking still can be and
+ * is performed by cheaper ordered writes of "qlock" in successful
+ * cases, but uses CAS in unsuccessful cases.
*
* Management
* ==========
*
* The main throughput advantages of work-stealing stem from
* decentralized control -- workers mostly take tasks from
- * themselves or each other. We cannot negate this in the
- * implementation of other management responsibilities. The main
- * tactic for avoiding bottlenecks is packing nearly all
- * essentially atomic control state into two volatile variables
- * that are by far most often read (not written) as status and
- * consistency checks.
- *
- * Field "ctl" contains 64 bits holding all the information needed
- * to atomically decide to add, inactivate, enqueue (on an event
+ * themselves or each other, at rates that can exceed a billion
+ * per second. The pool itself creates, activates (enables
+ * scanning for and running tasks), deactivates, blocks, and
+ * terminates threads, all with minimal central information.
+ * There are only a few properties that we can globally track or
+ * maintain, so we pack them into a small number of variables,
+ * often maintaining atomicity without blocking or locking.
+ * Nearly all essentially atomic control state is held in two
+ * volatile variables that are by far most often read (not
+ * written) as status and consistency checks. (Also, field
+ * "config" holds unchanging configuration state.)
+ *
+ * Field "ctl" contains 64 bits holding information needed to
+ * atomically decide to add, inactivate, enqueue (on an event
* queue), dequeue, and/or re-activate workers. To enable this
* packing, we restrict maximum parallelism to (1<<15)-1 (which is
* far in excess of normal operating range) to allow ids, counts,
* and their negations (used for thresholding) to fit into 16bit
- * fields.
- *
- * Field "plock" is a form of sequence lock with a saturating
- * shutdown bit (similarly for per-queue "qlocks"), mainly
- * protecting updates to the workQueues array, as well as to
- * enable shutdown. When used as a lock, it is normally only very
- * briefly held, so is nearly always available after at most a
- * brief spin, but we use a monitor-based backup strategy to
- * block when needed.
+ * subfields.
+ *
+ * Field "runState" holds lockable state bits (STARTED, STOP, etc)
+ * also protecting updates to the workQueues array. When used as
+ * a lock, it is normally held only for a few instructions (the
+ * only exceptions are one-time array initialization and uncommon
+ * resizing), so is nearly always available after at most a brief
+ * spin. But to be extra-cautious, after spinning, method
+ * awaitRunStateLock (called only if an initial CAS fails), uses a
+ * wait/notify mechanics on a builtin monitor to block when
+ * (rarely) needed. This would be a terrible idea for a highly
+ * contended lock, but most pools run without the lock ever
+ * contending after the spin limit, so this works fine as a more
+ * conservative alternative. Because we don't otherwise have an
+ * internal Object to use as a monitor, the "stealCounter" (an
+ * AtomicLong) is used when available (it too must be lazily
+ * initialized; see externalSubmit).
+ *
+ * Usages of "runState" vs "ctl" interact in only one case:
+ * deciding to add a worker thread (see tryAddWorker), in which
+ * case the ctl CAS is performed while the lock is held.
*
* Recording WorkQueues. WorkQueues are recorded in the
- * "workQueues" array that is created upon first use and expanded
- * if necessary. Updates to the array while recording new workers
- * and unrecording terminated ones are protected from each other
- * by a lock but the array is otherwise concurrently readable, and
- * accessed directly. To simplify index-based operations, the
- * array size is always a power of two, and all readers must
- * tolerate null slots. Worker queues are at odd indices. Shared
- * (submission) queues are at even indices, up to a maximum of 64
- * slots, to limit growth even if array needs to expand to add
- * more workers. Grouping them together in this way simplifies and
- * speeds up task scanning.
+ * "workQueues" array. The array is created upon first use (see
+ * externalSubmit) and expanded if necessary. Updates to the
+ * array while recording new workers and unrecording terminated
+ * ones are protected from each other by the runState lock, but
+ * the array is otherwise concurrently readable, and accessed
+ * directly. We also ensure that reads of the array reference
+ * itself never become too stale. To simplify index-based
+ * operations, the array size is always a power of two, and all
+ * readers must tolerate null slots. Worker queues are at odd
+ * indices. Shared (submission) queues are at even indices, up to
+ * a maximum of 64 slots, to limit growth even if array needs to
+ * expand to add more workers. Grouping them together in this way
+ * simplifies and speeds up task scanning.
*
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
* compensation for blocked workers. However, all other support
* code is set up to work with other policies. To ensure that we
- * do not hold on to worker references that would prevent GC, ALL
+ * do not hold on to worker references that would prevent GC, All
* accesses to workQueues are via indices into the workQueues
* array (which is one source of some of the messy code
* constructions here). In essence, the workQueues array serves as
- * a weak reference mechanism. Thus for example the wait queue
- * field of ctl stores indices, not references. Access to the
- * workQueues in associated methods (for example signalWork) must
- * both index-check and null-check the IDs. All such accesses
- * ignore bad IDs by returning out early from what they are doing,
- * since this can only be associated with termination, in which
- * case it is OK to give up. All uses of the workQueues array
- * also check that it is non-null (even if previously
- * non-null). This allows nulling during termination, which is
- * currently not necessary, but remains an option for
- * resource-revocation-based shutdown schemes. It also helps
- * reduce JIT issuance of uncommon-trap code, which tends to
- * unnecessarily complicate control flow in some methods.
- *
- * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
- * let workers spin indefinitely scanning for tasks when none can
- * be found immediately, and we cannot start/resume workers unless
- * there appear to be tasks available. On the other hand, we must
- * quickly prod them into action when new tasks are submitted or
- * generated. In many usages, ramp-up time to activate workers is
- * the main limiting factor in overall performance (this is
- * compounded at program start-up by JIT compilation and
- * allocation). So we try to streamline this as much as possible.
- * We park/unpark workers after placing in an event wait queue
- * when they cannot find work. This "queue" is actually a simple
- * Treiber stack, headed by the "id" field of ctl, plus a 15bit
- * counter value (that reflects the number of times a worker has
- * been inactivated) to avoid ABA effects (we need only as many
- * version numbers as worker threads). Successors are held in
- * field WorkQueue.nextWait. Queuing deals with several intrinsic
- * races, mainly that a task-producing thread can miss seeing (and
- * signalling) another thread that gave up looking for work but
- * has not yet entered the wait queue. We solve this by requiring
- * a full sweep of all workers (via repeated calls to method
- * scan()) both before and after a newly waiting worker is added
- * to the wait queue. Because enqueued workers may actually be
- * rescanning rather than waiting, we set and clear the "parker"
+ * a weak reference mechanism. Thus for example the stack top
+ * subfield of ctl stores indices, not references.
+ *
+ * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
+ * cannot let workers spin indefinitely scanning for tasks when
+ * none can be found immediately, and we cannot start/resume
+ * workers unless there appear to be tasks available. On the
+ * other hand, we must quickly prod them into action when new
+ * tasks are submitted or generated. In many usages, ramp-up time
+ * to activate workers is the main limiting factor in overall
+ * performance, which is compounded at program start-up by JIT
+ * compilation and allocation. So we streamline this as much as
+ * possible.
+ *
+ * The "ctl" field atomically maintains active and total worker
+ * counts as well as a queue to place waiting threads so they can
+ * be located for signalling. Active counts also play the role of
+ * quiescence indicators, so are decremented when workers believe
+ * that there are no more tasks to execute. The "queue" is
+ * actually a form of Treiber stack. A stack is ideal for
+ * activating threads in most-recently used order. This improves
+ * performance and locality, outweighing the disadvantages of
+ * being prone to contention and inability to release a worker
+ * unless it is topmost on stack. We park/unpark workers after
+ * pushing on the idle worker stack (represented by the lower
+ * 32bit subfield of ctl) when they cannot find work. The top
+ * stack state holds the value of the "scanState" field of the
+ * worker: its index and status, plus a version counter that, in
+ * addition to the count subfields (also serving as version
+ * stamps) provide protection against Treiber stack ABA effects.
+ *
+ * Field scanState is used by both workers and the pool to manage
+ * and track whether a worker is INACTIVE (possibly blocked
+ * waiting for a signal), or SCANNING for tasks (when neither hold
+ * it is busy running tasks). When a worker is inactivated, its
+ * scanState field is set, and is prevented from executing tasks,
+ * even though it must scan once for them to avoid queuing
+ * races. Note that scanState updates lag queue CAS releases so
+ * usage requires care. When queued, the lower 16 bits of
+ * scanState must hold its pool index. So we place the index there
+ * upon initialization (see registerWorker) and otherwise keep it
+ * there or restore it when necessary.
+ *
+ * Memory ordering. See "Correct and Efficient Work-Stealing for
+ * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
+ * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
+ * analysis of memory ordering requirements in work-stealing
+ * algorithms similar to the one used here. We usually need
+ * stronger than minimal ordering because we must sometimes signal
+ * workers, requiring Dekker-like full-fences to avoid lost
+ * signals. Arranging for enough ordering without expensive
+ * over-fencing requires tradeoffs among the supported means of
+ * expressing access constraints. The most central operations,
+ * taking from queues and updating ctl state, require full-fence
+ * CAS. Array slots are read using the emulation of volatiles
+ * provided by Unsafe. Access from other threads to WorkQueue
+ * base, top, and array requires a volatile load of the first of
+ * any of these read. We use the convention of declaring the
+ * "base" index volatile, and always read it before other fields.
+ * The owner thread must ensure ordered updates, so writes use
+ * ordered intrinsics unless they can piggyback on those for other
+ * writes. Similar conventions and rationales hold for other
+ * WorkQueue fields (such as "currentSteal") that are only written
+ * by owners but observed by others.
+ *
+ * Creating workers. To create a worker, we pre-increment total
+ * count (serving as a reservation), and attempt to construct a
+ * ForkJoinWorkerThread via its factory. Upon construction, the
+ * new thread invokes registerWorker, where it constructs a
+ * WorkQueue and is assigned an index in the workQueues array
+ * (expanding the array if necessary). The thread is then
+ * started. Upon any exception across these steps, or null return
+ * from factory, deregisterWorker adjusts counts and records
+ * accordingly. If a null return, the pool continues running with
+ * fewer than the target number workers. If exceptional, the
+ * exception is propagated, generally to some external caller.
+ * Worker index assignment avoids the bias in scanning that would
+ * occur if entries were sequentially packed starting at the front
+ * of the workQueues array. We treat the array as a simple
+ * power-of-two hash table, expanding as needed. The seedIndex
+ * increment ensures no collisions until a resize is needed or a
+ * worker is deregistered and replaced, and thereafter keeps
+ * probability of collision low. We cannot use
+ * ThreadLocalRandom.getProbe() for similar purposes here because
+ * the thread has not started yet, but do so for creating
+ * submission queues for existing external threads.
+ *
+ * Deactivation and waiting. Queuing encounters several intrinsic
+ * races; most notably that a task-producing thread can miss
+ * seeing (and signalling) another thread that gave up looking for
+ * work but has not yet entered the wait queue. When a worker
+ * cannot find a task to steal, it deactivates and enqueues. Very
+ * often, the lack of tasks is transient due to GC or OS
+ * scheduling. To reduce false-alarm deactivation, scanners
+ * compute checksums of queue states during sweeps. (The
+ * stability checks used here and elsewhere are probabilistic
+ * variants of snapshot techniques -- see Herlihy & Shavit.)
+ * Workers give up and try to deactivate only after the sum is
+ * stable across scans. Further, to avoid missed signals, they
+ * repeat this scanning process after successful enqueuing until
+ * again stable. In this state, the worker cannot take/run a task
+ * it sees until it is released from the queue, so the worker
+ * itself eventually tries to release itself or any successor (see
+ * tryRelease). Otherwise, upon an empty scan, a deactivated
+ * worker uses an adaptive local spin construction (see awaitWork)
+ * before blocking (via park). Note the unusual conventions about
+ * Thread.interrupts surrounding parking and other blocking:
+ * Because interrupts are used solely to alert threads to check
+ * termination, which is checked anyway upon blocking, we clear
+ * status (using Thread.interrupted) before any call to park, so
+ * that park does not immediately return due to status being set
+ * via some other unrelated call to interrupt in user code.
+ *
+ * Signalling and activation. Workers are created or activated
+ * only when there appears to be at least one task they might be
+ * able to find and execute. Upon push (either by a worker or an
+ * external submission) to a previously (possibly) empty queue,
+ * workers are signalled if idle, or created if fewer exist than
+ * the given parallelism level. These primary signals are
+ * buttressed by others whenever other threads remove a task from
+ * a queue and notice that there are other tasks there as well.
+ * On most platforms, signalling (unpark) overhead time is
+ * noticeably long, and the time between signalling a thread and
+ * it actually making progress can be very noticeably long, so it
+ * is worth offloading these delays from critical paths as much as
+ * possible. Also, because inactive workers are often rescanning
+ * or spinning rather than blocking, we set and clear the "parker"
* field of WorkQueues to reduce unnecessary calls to unpark.
* (This requires a secondary recheck to avoid missed signals.)
- * Note the unusual conventions about Thread.interrupts
- * surrounding parking and other blocking: Because interrupts are
- * used solely to alert threads to check termination, which is
- * checked anyway upon blocking, we clear status (using
- * Thread.interrupted) before any call to park, so that park does
- * not immediately return due to status being set via some other
- * unrelated call to interrupt in user code.
- *
- * Signalling. We create or wake up workers only when there
- * appears to be at least one task they might be able to find and
- * execute. When a submission is added or another worker adds a
- * task to a queue that has fewer than two tasks, they signal
- * waiting workers (or trigger creation of new ones if fewer than
- * the given parallelism level -- signalWork). These primary
- * signals are buttressed by others whenever other threads remove
- * a task from a queue and notice that there are other tasks there
- * as well. So in general, pools will be over-signalled. On most
- * platforms, signalling (unpark) overhead time is noticeably
- * long, and the time between signalling a thread and it actually
- * making progress can be very noticeably long, so it is worth
- * offloading these delays from critical paths as much as
- * possible. Additionally, workers spin-down gradually, by staying
- * alive so long as they see the ctl state changing. Similar
- * stability-sensing techniques are also used before blocking in
- * awaitJoin and helpComplete.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
- * time out and terminate if the pool has remained quiescent for a
- * given period -- a short period if there are more threads than
- * parallelism, longer as the number of threads decreases. This
- * will slowly propagate, eventually terminating all workers after
- * periods of non-use.
- *
- * Shutdown and Termination. A call to shutdownNow atomically sets
- * a plock bit and then (non-atomically) sets each worker's
- * qlock status, cancels all unprocessed tasks, and wakes up
- * all waiting workers. Detecting whether termination should
- * commence after a non-abrupt shutdown() call requires more work
- * and bookkeeping. We need consensus about quiescence (i.e., that
- * there is no more work). The active count provides a primary
- * indication but non-abrupt shutdown still requires a rechecking
- * scan for any workers that are inactive but not queued.
+ * time out and terminate (see awaitWork) if the pool has remained
+ * quiescent for period IDLE_TIMEOUT, increasing the period as the
+ * number of threads decreases, eventually removing all workers.
+ * Also, when more than two spare threads exist, excess threads
+ * are immediately terminated at the next quiescent point.
+ * (Padding by two avoids hysteresis.)
+ *
+ * Shutdown and Termination. A call to shutdownNow invokes
+ * tryTerminate to atomically set a runState bit. The calling
+ * thread, as well as every other worker thereafter terminating,
+ * helps terminate others by setting their (qlock) status,
+ * cancelling their unprocessed tasks, and waking them up, doing
+ * so repeatedly until stable (but with a loop bounded by the
+ * number of workers). Calls to non-abrupt shutdown() preface
+ * this by checking whether termination should commence. This
+ * relies primarily on the active count bits of "ctl" maintaining
+ * consensus -- tryTerminate is called from awaitWork whenever
+ * quiescent. However, external submitters do not take part in
+ * this consensus. So, tryTerminate sweeps through queues (until
+ * stable) to ensure lack of in-flight submissions and workers
+ * about to process them before triggering the "STOP" phase of
+ * termination. (Note: there is an intrinsic conflict if
+ * helpQuiescePool is called when shutdown is enabled. Both wait
+ * for quiescence, but tryTerminate is biased to not trigger until
+ * helpQuiescePool completes.)
+ *
*
* Joining Tasks
* =============
@@ -403,9 +528,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* just let them block (as in Thread.join). We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
- * possible is not necessarily a good idea since we sometimes need
- * both an unblocked task and its continuation to progress.
- * Instead we combine two tactics:
+ * possible is not necessarily a good idea since we may need both
+ * an unblocked task and its continuation to progress. Instead we
+ * combine two tactics:
*
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
@@ -425,16 +550,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
*
- * The algorithm in tryHelpStealer entails a form of "linear"
- * helping: Each worker records (in field currentSteal) the most
- * recent task it stole from some other worker. Plus, it records
- * (in field currentJoin) the task it is currently actively
- * joining. Method tryHelpStealer uses these markers to try to
- * find a worker to help (i.e., steal back a task from and execute
- * it) that could hasten completion of the actively joined task.
- * In essence, the joiner executes a task that would be on its own
- * local deque had the to-be-joined task not been stolen. This may
- * be seen as a conservative variant of the approach in Wagner &
+ * The algorithm in helpStealer entails a form of "linear
+ * helping". Each worker records (in field currentSteal) the most
+ * recent task it stole from some other worker (or a submission).
+ * It also records (in field currentJoin) the task it is currently
+ * actively joining. Method helpStealer uses these markers to try
+ * to find a worker to help (i.e., steal back a task from and
+ * execute it) that could hasten completion of the actively joined
+ * task. Thus, the joiner executes a task that would be on its
+ * own local deque had the to-be-joined task not been stolen. This
+ * is a conservative variant of the approach described in Wagner &
* Calder "Leapfrogging: a portable technique for implementing
* efficient futures" SIGPLAN Notices, 1993
* (http://portal.acm.org/citation.cfm?id=155354). It differs in
@@ -452,37 +577,40 @@ public class ForkJoinPool extends AbstractExecutorService {
* which means that we miss links in the chain during long-lived
* tasks, GC stalls etc (which is OK since blocking in such cases
* is usually a good idea). (4) We bound the number of attempts
- * to find work (see MAX_HELP) and fall back to suspending the
+ * to find work using checksums and fall back to suspending the
* worker and if necessary replacing it with another.
*
- * Helping actions for CountedCompleters are much simpler: Method
- * helpComplete can take and execute any task with the same root
- * as the task being waited on. However, this still entails some
- * traversal of completer chains, so is less efficient than using
- * CountedCompleters without explicit joins.
- *
- * It is impossible to keep exactly the target parallelism number
- * of threads running at any given time. Determining the
- * existence of conservatively safe helping targets, the
- * availability of already-created spares, and the apparent need
- * to create new spares are all racy, so we rely on multiple
- * retries of each. Compensation in the apparent absence of
- * helping opportunities is challenging to control on JVMs, where
- * GC and other activities can stall progress of tasks that in
- * turn stall out many other dependent tasks, without us being
- * able to determine whether they will ever require compensation.
- * Even though work-stealing otherwise encounters little
- * degradation in the presence of more threads than cores,
- * aggressively adding new threads in such cases entails risk of
- * unwanted positive feedback control loops in which more threads
- * cause more dependent stalls (as well as delayed progress of
- * unblocked threads to the point that we know they are available)
- * leading to more situations requiring more threads, and so
- * on. This aspect of control can be seen as an (analytically
- * intractable) game with an opponent that may choose the worst
- * (for us) active thread to stall at any time. We take several
- * precautions to bound losses (and thus bound gains), mainly in
- * methods tryCompensate and awaitJoin.
+ * Helping actions for CountedCompleters do not require tracking
+ * currentJoins: Method helpComplete takes and executes any task
+ * with the same root as the task being waited on (preferring
+ * local pops to non-local polls). However, this still entails
+ * some traversal of completer chains, so is less efficient than
+ * using CountedCompleters without explicit joins.
+ *
+ * Compensation does not aim to keep exactly the target
+ * parallelism number of unblocked threads running at any given
+ * time. Some previous versions of this class employed immediate
+ * compensations for any blocked join. However, in practice, the
+ * vast majority of blockages are transient byproducts of GC and
+ * other JVM or OS activities that are made worse by replacement.
+ * Currently, compensation is attempted only after validating that
+ * all purportedly active threads are processing tasks by checking
+ * field WorkQueue.scanState, which eliminates most false
+ * positives. Also, compensation is bypassed (tolerating fewer
+ * threads) in the most common case in which it is rarely
+ * beneficial: when a worker with an empty queue (thus no
+ * continuation tasks) blocks on a join and there still remain
+ * enough threads to ensure liveness.
+ *
+ * The compensation mechanism may be bounded. Bounds for the
+ * commonPool (see commonMaxSpares) better enable JVMs to cope
+ * with programming errors and abuse before running out of
+ * resources to do so. In other cases, users may supply factories
+ * that limit thread construction. The effects of bounding in this
+ * pool (like all others) is imprecise. Total worker counts are
+ * decremented when threads deregister, not when they exit and
+ * resources are reclaimed by the JVM and OS. So the number of
+ * simultaneously live threads may transiently exceed bounds.
*
* Common Pool
* ===========
@@ -492,34 +620,52 @@ public class ForkJoinPool extends AbstractExecutorService {
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* allocation. Most bootstrapping occurs within method
- * fullExternalPush during the first submission to the pool.
+ * externalSubmit during the first submission to the pool.
*
* When external threads submit to the common pool, they can
- * perform subtask processing (see externalHelpJoin and related
- * methods). This caller-helps policy makes it sensible to set
- * common pool parallelism level to one (or more) less than the
- * total number of available cores, or even zero for pure
- * caller-runs. We do not need to record whether external
- * submissions are to the common pool -- if not, externalHelpJoin
- * returns quickly (at the most helping to signal some common pool
- * workers). These submitters would otherwise be blocked waiting
- * for completion, so the extra effort (with liberally sprinkled
- * task status checks) in inapplicable cases amounts to an odd
- * form of limited spin-wait before blocking in ForkJoinTask.join.
+ * perform subtask processing (see externalHelpComplete and
+ * related methods) upon joins. This caller-helps policy makes it
+ * sensible to set common pool parallelism level to one (or more)
+ * less than the total number of available cores, or even zero for
+ * pure caller-runs. We do not need to record whether external
+ * submissions are to the common pool -- if not, external help
+ * methods return quickly. These submitters would otherwise be
+ * blocked waiting for completion, so the extra effort (with
+ * liberally sprinkled task status checks) in inapplicable cases
+ * amounts to an odd form of limited spin-wait before blocking in
+ * ForkJoinTask.join.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
- * after executing any top-level task (see WorkQueue.runTask). The
- * associated mechanics (mainly in ForkJoinWorkerThread) may be
- * JVM-dependent and must access particular Thread class fields to
- * achieve this effect.
+ * after executing any top-level task (see WorkQueue.runTask).
+ * The associated mechanics (mainly in ForkJoinWorkerThread) may
+ * be JVM-dependent and must access particular Thread class fields
+ * to achieve this effect.
*
* Style notes
* ===========
*
+ * Memory ordering relies mainly on Unsafe intrinsics that carry
+ * the further responsibility of explicitly performing null- and
+ * bounds- checks otherwise carried out implicitly by JVMs. This
+ * can be awkward and ugly, but also reflects the need to control
+ * outcomes across the unusual cases that arise in very racy code
+ * with very few invariants. So these explicit checks would exist
+ * in some form anyway. All fields are read into locals before
+ * use, and null-checked if they are references. This is usually
+ * done in a "C"-like style of listing declarations at the heads
+ * of methods or blocks, and using inline assignments on first
+ * encounter. Array bounds-checks are usually performed by
+ * masking with array.length-1, which relies on the invariant that
+ * these arrays are created with positive lengths, which is itself
+ * paranoically checked. Nearly all explicit checks lead to
+ * bypass/return, not exception throws, because they may
+ * legitimately arise due to cancellation/revocation during
+ * shutdown.
+ *
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
* fields of WorkQueue maintain data structures managed by
@@ -527,22 +673,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* trying to reduce this, since any associated future changes in
* representations will need to be accompanied by algorithmic
* changes anyway. Several methods intrinsically sprawl because
- * they must accumulate sets of consistent reads of volatiles held
- * in local variables. Methods signalWork() and scan() are the
- * main bottlenecks, so are especially heavily
- * micro-optimized/mangled. There are lots of inline assignments
- * (of form "while ((local = field) != 0)") which are usually the
- * simplest way to ensure the required read orderings (which are
- * sometimes critical). This leads to a "C"-like style of listing
- * declarations of these locals at the heads of methods or blocks.
- * There are several occurrences of the unusual "do {} while
- * (!cas...)" which is the simplest way to force an update of a
- * CAS'ed variable. There are also other coding oddities (including
- * several unnecessary-looking hoisted null checks) that help
- * some methods perform reasonably even when interpreted (not
- * compiled).
- *
- * The order of declarations in this file is:
+ * they must accumulate sets of consistent reads of fields held in
+ * local variables. There are also other coding oddities
+ * (including several unnecessary-looking hoisted null checks)
+ * that help some methods perform reasonably even when interpreted
+ * (not compiled).
+ *
+ * The order of declarations in this file is (with a few exceptions):
* (1) Static utility functions
* (2) Nested (static) classes
* (3) Static fields
@@ -609,56 +746,37 @@ public class ForkJoinPool extends AbstractExecutorService {
public final boolean exec() { return true; }
}
+ // Constants shared across ForkJoinPool and WorkQueue
+
+ // Bounds
+ static final int SMASK = 0xffff; // short bits == max index
+ static final int MAX_CAP = 0x7fff; // max #workers - 1
+ static final int EVENMASK = 0xfffe; // even short bits
+ static final int SQMASK = 0x007e; // max 64 (even) slots
+
+ // Masks and units for WorkQueue.scanState and ctl sp subfield
+ static final int SCANNING = 1; // false when running tasks
+ static final int INACTIVE = 1 << 31; // must be negative
+ static final int SS_SEQ = 1 << 16; // version count
+
+ // Mode bits for ForkJoinPool.config and WorkQueue.config
+ static final int MODE_MASK = 0xffff << 16; // top half of int
+ static final int LIFO_QUEUE = 0;
+ static final int FIFO_QUEUE = 1 << 16;
+ static final int SHARED_QUEUE = 1 << 31; // must be negative
+
/**
* Queues supporting work-stealing as well as external task
- * submission. See above for main rationale and algorithms.
- * Implementation relies heavily on "Unsafe" intrinsics
- * and selective use of "volatile":
- *
- * Field "base" is the index (mod array.length) of the least valid
- * queue slot, which is always the next position to steal (poll)
- * from if nonempty. Reads and writes require volatile orderings
- * but not CAS, because updates are only performed after slot
- * CASes.
- *
- * Field "top" is the index (mod array.length) of the next queue
- * slot to push to or pop from. It is written only by owner thread
- * for push, or under lock for external/shared push, and accessed
- * by other threads only after reading (volatile) base. Both top
- * and base are allowed to wrap around on overflow, but (top -
- * base) (or more commonly -(base - top) to force volatile read of
- * base before top) still estimates size. The lock ("qlock") is
- * forced to -1 on termination, causing all further lock attempts
- * to fail. (Note: we don't need CAS for termination state because
- * upon pool shutdown, all shared-queues will stop being used
- * anyway.) Nearly all lock bodies are set up so that exceptions
- * within lock bodies are "impossible" (modulo JVM errors that
- * would cause failure anyway.)
- *
- * The array slots are read and written using the emulation of
- * volatiles/atomics provided by Unsafe. Insertions must in
- * general use putOrderedObject as a form of releasing store to
- * ensure that all writes to the task object are ordered before
- * its publication in the queue. All removals entail a CAS to
- * null. The array is always a power of two. To ensure safety of
- * Unsafe array operations, all accesses perform explicit null
- * checks and implicit bounds checks via power-of-two masking.
- *
- * In addition to basic queuing support, this class contains
- * fields described elsewhere to control execution. It turns out
- * to work better memory-layout-wise to include them in this class
- * rather than a separate class.
- *
+ * submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
- * arrays sharing cache lines. (It would be best for queue objects
- * and their arrays to share, but there is nothing available to
- * help arrange that). The @Contended annotation alerts JVMs to
- * try to keep instances apart.
+ * arrays sharing cache lines. The @Contended annotation alerts
+ * JVMs to try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
+
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two; at least 4, but should be larger to
@@ -679,13 +797,13 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
- volatile int eventCount; // encoded inactivation count; < 0 if inactive
- int nextWait; // encoded record of next event waiter
+ // Instance fields
+ volatile int scanState; // versioned, <0: inactive; odd:scanning
+ int stackPred; // pool stack (ctl) predecessor
int nsteals; // number of steals
- int hint; // steal index hint
- short poolIndex; // index of this queue in pool
- final short mode; // 0: lifo, > 0: fifo, < 0: shared
- volatile int qlock; // 1: locked, -1: terminate; else 0
+ int hint; // randomization and stealer index hint
+ int config; // pool index and mode
+ volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
@@ -693,19 +811,23 @@ public class ForkJoinPool extends AbstractExecutorService {
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
- ForkJoinTask<?> currentSteal; // current non-local task being executed
+ volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
- WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
- int seed) {
+ WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
- this.mode = (short)mode;
- this.hint = seed; // store initial seed for runWorker
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
/**
+ * Returns an exportable index (used by ForkJoinWorkerThread).
+ */
+ final int getPoolIndex() {
+ return (config & 0xffff) >>> 1; // ignore odd/even tag bit
+ }
+
+ /**
* Returns the approximate number of tasks in the queue.
*/
final int queueSize() {
@@ -719,12 +841,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* near-empty queue has at least one unclaimed task.
*/
final boolean isEmpty() {
- ForkJoinTask<?>[] a; int m, s;
- int n = base - (s = top);
- return (n >= 0 ||
- (n == -1 &&
- ((a = array) == null ||
- (m = a.length - 1) < 0 ||
+ ForkJoinTask<?>[] a; int n, m, s;
+ return ((n = base - (s = top)) >= 0 ||
+ (n == -1 && // possibly one task
+ ((a = array) == null || (m = a.length - 1) < 0 ||
U.getObject
(a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
}
@@ -738,12 +858,15 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
- int s = top, n;
+ int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
- int m = a.length - 1;
+ int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
- if ((n = (top = s + 1) - base) <= 2)
- (p = pool).signalWork(p.workQueues, this);
+ U.putOrderedInt(this, QTOP, s + 1);
+ if ((n = s - b) <= 1) {
+ if ((p = pool) != null)
+ p.signalWork(p.workQueues, this);
+ }
else if (n >= m)
growArray();
}
@@ -764,7 +887,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
- do {
+ do { // emulate poll from old array, push to new array
ForkJoinTask<?> x;
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
@@ -789,7 +912,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
- top = s;
+ U.putOrderedInt(this, QTOP, s);
return t;
}
}
@@ -800,7 +923,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
- * appear in ForkJoinPool methods scan and tryHelpStealer.
+ * appear in ForkJoinPool methods scan and helpStealer.
*/
final ForkJoinTask<?> pollAt(int b) {
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
@@ -808,7 +931,7 @@ public class ForkJoinPool extends AbstractExecutorService {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
base == b && U.compareAndSwapObject(a, j, t, null)) {
- U.putOrderedInt(this, QBASE, b + 1);
+ base = b + 1;
return t;
}
}
@@ -823,16 +946,15 @@ public class ForkJoinPool extends AbstractExecutorService {
while ((b = base) - top < 0 && (a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
- if (t != null) {
- if (U.compareAndSwapObject(a, j, t, null)) {
- U.putOrderedInt(this, QBASE, b + 1);
- return t;
+ if (base == b) {
+ if (t != null) {
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ base = b + 1;
+ return t;
+ }
}
- }
- else if (base == b) {
- if (b + 1 == top)
+ else if (b + 1 == top) // now empty
break;
- Thread.yield(); // wait for lagging update (very rare)
}
}
return null;
@@ -842,7 +964,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
- return mode == 0 ? pop() : poll();
+ return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}
/**
@@ -852,7 +974,7 @@ public class ForkJoinPool extends AbstractExecutorService {
ForkJoinTask<?>[] a = array; int m;
if (a == null || (m = a.length - 1) < 0)
return null;
- int i = mode == 0 ? top - 1 : base;
+ int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
int j = ((i & m) << ASHIFT) + ABASE;
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}
@@ -860,13 +982,13 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Pops the given task only if it is at the current top.
* (A shared version is available only via FJP.tryExternalUnpush)
- */
+ */
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
- top = s;
+ U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
@@ -876,9 +998,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
- ForkJoinTask.cancelIgnoringExceptions(currentJoin);
- ForkJoinTask.cancelIgnoringExceptions(currentSteal);
- for (ForkJoinTask<?> t; (t = poll()) != null; )
+ ForkJoinTask<?> t;
+ if ((t = currentJoin) != null) {
+ currentJoin = null;
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ }
+ if ((t = currentSteal) != null) {
+ currentSteal = null;
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ }
+ while ((t = poll()) != null)
ForkJoinTask.cancelIgnoringExceptions(t);
}
@@ -893,167 +1022,186 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Executes a top-level task and any local tasks remaining
- * after execution.
+ * Removes and executes all local tasks. If LIFO, invokes
+ * pollAndExecAll. Otherwise implements a specialized pop loop
+ * to exec until empty.
*/
- final void runTask(ForkJoinTask<?> task) {
- if ((currentSteal = task) != null) {
- ForkJoinWorkerThread thread;
- task.doExec();
- ForkJoinTask<?>[] a = array;
- int md = mode;
- ++nsteals;
- currentSteal = null;
- if (md != 0)
- pollAndExecAll();
- else if (a != null) {
- int s, m = a.length - 1;
- ForkJoinTask<?> t;
- while ((s = top - 1) - base >= 0 &&
- (t = (ForkJoinTask<?>)U.getAndSetObject
- (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
- top = s;
+ final void execLocalTasks() {
+ int b = base, m, s;
+ ForkJoinTask<?>[] a = array;
+ if (b - (s = top - 1) <= 0 && a != null &&
+ (m = a.length - 1) >= 0) {
+ if ((config & FIFO_QUEUE) == 0) {
+ for (ForkJoinTask<?> t;;) {
+ if ((t = (ForkJoinTask<?>)U.getAndSetObject
+ (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
+ break;
+ U.putOrderedInt(this, QTOP, s);
t.doExec();
+ if (base - (s = top - 1) > 0)
+ break;
}
}
- if ((thread = owner) != null) // no need to do in finally clause
+ else
+ pollAndExecAll();
+ }
+ }
+
+ /**
+ * Executes the given task and any remaining local tasks.
+ */
+ final void runTask(ForkJoinTask<?> task) {
+ if (task != null) {
+ scanState &= ~SCANNING; // mark as busy
+ (currentSteal = task).doExec();
+ U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
+ execLocalTasks();
+ ForkJoinWorkerThread thread = owner;
+ if (++nsteals < 0) // collect on overflow
+ transferStealCount(pool);
+ scanState |= SCANNING;
+ if (thread != null)
thread.afterTopLevelExec();
}
}
/**
- * If present, removes from queue and executes the given task,
- * or any other cancelled task. Returns (true) on any CAS
- * or consistency check failure so caller can retry.
- *
- * @return false if no progress can be made, else true
+ * Adds steal count to pool stealCounter if it exists, and resets.
*/
- final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
- boolean stat;
- ForkJoinTask<?>[] a; int m, s, b, n;
- if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
- (n = (s = top) - (b = base)) > 0) {
- boolean removed = false, empty = true;
- stat = true;
- for (ForkJoinTask<?> t;;) { // traverse from s to b
- long j = ((--s & m) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObject(a, j);
- if (t == null) // inconsistent length
- break;
- else if (t == task) {
- if (s + 1 == top) { // pop
- if (!U.compareAndSwapObject(a, j, task, null))
- break;
- top = s;
- removed = true;
- }
- else if (base == b) // replace with proxy
- removed = U.compareAndSwapObject(a, j, task,
- new EmptyTask());
- break;
- }
- else if (t.status >= 0)
- empty = false;
- else if (s + 1 == top) { // pop and throw away
- if (U.compareAndSwapObject(a, j, t, null))
- top = s;
- break;
- }
- if (--n == 0) {
- if (!empty && base == b)
- stat = false;
- break;
- }
- }
- if (removed)
- task.doExec();
+ final void transferStealCount(ForkJoinPool p) {
+ AtomicLong sc;
+ if (p != null && (sc = p.stealCounter) != null) {
+ int s = nsteals;
+ nsteals = 0; // if negative, correct for overflow
+ sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
}
- else
- stat = false;
- return stat;
}
/**
- * Tries to poll for and execute the given task or any other
- * task in its CountedCompleter computation.
+ * If present, removes from queue and executes the given task,
+ * or any other cancelled task. Used only by awaitJoin.
+ *
+ * @return true if queue empty and task not known to be done
*/
- final boolean pollAndExecCC(CountedCompleter<?> root) {
- ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
- if ((b = base) - top < 0 && (a = array) != null) {
- long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
- if ((o = U.getObjectVolatile(a, j)) == null)
- return true; // retry
- if (o instanceof CountedCompleter) {
- for (t = (CountedCompleter<?>)o, r = t;;) {
- if (r == root) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- U.putOrderedInt(this, QBASE, b + 1);
- t.doExec();
+ final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
+ ForkJoinTask<?>[] a; int m, s, b, n;
+ if ((a = array) != null && (m = a.length - 1) >= 0 &&
+ task != null) {
+ while ((n = (s = top) - (b = base)) > 0) {
+ for (ForkJoinTask<?> t;;) { // traverse from s to b
+ long j = ((--s & m) << ASHIFT) + ABASE;
+ if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
+ return s + 1 == top; // shorter than expected
+ else if (t == task) {
+ boolean removed = false;
+ if (s + 1 == top) { // pop
+ if (U.compareAndSwapObject(a, j, task, null)) {
+ U.putOrderedInt(this, QTOP, s);
+ removed = true;
+ }
}
- return true;
+ else if (base == b) // replace with proxy
+ removed = U.compareAndSwapObject(
+ a, j, task, new EmptyTask());
+ if (removed)
+ task.doExec();
+ break;
}
- else if ((r = r.completer) == null)
- break; // not part of root computation
+ else if (t.status < 0 && s + 1 == top) {
+ if (U.compareAndSwapObject(a, j, t, null))
+ U.putOrderedInt(this, QTOP, s);
+ break; // was cancelled
+ }
+ if (--n == 0)
+ return false;
}
+ if (task.status < 0)
+ return false;
}
}
- return false;
+ return true;
}
/**
- * Tries to pop and execute the given task or any other task
- * in its CountedCompleter computation.
+ * Pops task if in the same CC computation as the given task,
+ * in either shared or owned mode. Used only by helpComplete.
*/
- final boolean externalPopAndExecCC(CountedCompleter<?> root) {
- ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
+ final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
+ int s; ForkJoinTask<?>[] a; Object o;
if (base - (s = top) < 0 && (a = array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
- for (t = (CountedCompleter<?>)o, r = t;;) {
- if (r == root) {
- if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
- if (top == s && array == a &&
- U.compareAndSwapObject(a, j, t, null)) {
- top = s - 1;
- qlock = 0;
- t.doExec();
+ if ((o = U.getObjectVolatile(a, j)) != null &&
+ (o instanceof CountedCompleter)) {
+ CountedCompleter<?> t = (CountedCompleter<?>)o;
+ for (CountedCompleter<?> r = t;;) {
+ if (r == task) {
+ if (mode < 0) { // must lock
+ if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
+ if (top == s && array == a &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QTOP, s - 1);
+ U.putOrderedInt(this, QLOCK, 0);
+ return t;
+ }
+ U.compareAndSwapInt(this, QLOCK, 1, 0);
}
- else
- qlock = 0;
}
- return true;
+ else if (U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QTOP, s - 1);
+ return t;
+ }
+ break;
}
- else if ((r = r.completer) == null)
+ else if ((r = r.completer) == null) // try parent
break;
}
}
}
- return false;
+ return null;
}
/**
- * Internal version
+ * Steals and runs a task in the same CC computation as the
+ * given task if one exists and can be taken without
+ * contention. Otherwise returns a checksum/control value for
+ * use by method helpComplete.
+ *
+ * @return 1 if successful, 2 if retryable (lost to another
+ * stealer), -1 if non-empty but no matching task found, else
+ * the base index, forced negative.
*/
- final boolean internalPopAndExecCC(CountedCompleter<?> root) {
- ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
- if (base - (s = top) < 0 && (a = array) != null) {
- long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
- for (t = (CountedCompleter<?>)o, r = t;;) {
- if (r == root) {
- if (U.compareAndSwapObject(a, j, t, null)) {
- top = s - 1;
+ final int pollAndExecCC(CountedCompleter<?> task) {
+ int b, h; ForkJoinTask<?>[] a; Object o;
+ if ((b = base) - top >= 0 || (a = array) == null)
+ h = b | Integer.MIN_VALUE; // to sense movement on re-poll
+ else {
+ long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ if ((o = U.getObjectVolatile(a, j)) == null)
+ h = 2; // retryable
+ else if (!(o instanceof CountedCompleter))
+ h = -1; // unmatchable
+ else {
+ CountedCompleter<?> t = (CountedCompleter<?>)o;
+ for (CountedCompleter<?> r = t;;) {
+ if (r == task) {
+ if (base == b &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ base = b + 1;
t.doExec();
+ h = 1; // success
}
- return true;
+ else
+ h = 2; // lost CAS
+ break;
}
- else if ((r = r.completer) == null)
+ else if ((r = r.completer) == null) {
+ h = -1; // unmatched
break;
+ }
}
}
}
- return false;
+ return h;
}
/**
@@ -1061,28 +1209,31 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
- return (eventCount >= 0 &&
+ return (scanState >= 0 &&
(wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED &&
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
- // Unsafe mechanics
+ // Unsafe mechanics. Note that some are (and must be) the same as in FJP
private static final sun.misc.Unsafe U;
- private static final long QBASE;
+ private static final int ABASE;
+ private static final int ASHIFT;
+ private static final long QTOP;
private static final long QLOCK;
- private static final int ABASE;
- private static final int ASHIFT;
+ private static final long QCURRENTSTEAL;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
- Class<?> k = WorkQueue.class;
+ Class<?> wk = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
- QBASE = U.objectFieldOffset
- (k.getDeclaredField("base"));
+ QTOP = U.objectFieldOffset
+ (wk.getDeclaredField("top"));
QLOCK = U.objectFieldOffset
- (k.getDeclaredField("qlock"));
+ (wk.getDeclaredField("qlock"));
+ QCURRENTSTEAL = U.objectFieldOffset
+ (wk.getDeclaredField("currentSteal"));
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
@@ -1126,6 +1277,11 @@ public class ForkJoinPool extends AbstractExecutorService {
static final int commonParallelism;
/**
+ * Limit on spare thread construction in tryCompensate.
+ */
+ private static int commonMaxSpares;
+
+ /**
* Sequence number for creating workerNamePrefix.
*/
private static int poolNumberSequence;
@@ -1138,7 +1294,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return ++poolNumberSequence;
}
- // static constants
+ // static configuration constants
/**
* Initial timeout value (in nanoseconds) for the thread
@@ -1148,27 +1304,32 @@ public class ForkJoinPool extends AbstractExecutorService {
* aggressive shrinkage during most transient stalls (long GCs
* etc).
*/
- private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
+ private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
/**
- * Timeout value when there are more threads than parallelism level
+ * Tolerance for idle timeouts, to cope with timer undershoots
*/
- private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L;
+ private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
/**
- * Tolerance for idle timeouts, to cope with timer undershoots
+ * The initial value for commonMaxSpares during static
+ * initialization. The value is far in excess of normal
+ * requirements, but also far short of MAX_CAP and typical
+ * OS thread limits, so allows JVMs to catch misuse/abuse
+ * before running out of resources needed to do so.
*/
- private static final long TIMEOUT_SLOP = 2000000L;
+ private static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
- * The maximum stolen->joining link depth allowed in method
- * tryHelpStealer. Must be a power of two. Depths for legitimate
- * chains are unbounded, but we use a fixed constant to avoid
- * (otherwise unchecked) cycles and to bound staleness of
- * traversal parameters at the expense of sometimes blocking when
- * we could be helping.
+ * Number of times to spin-wait before blocking. The spins (in
+ * awaitRunStateLock and awaitWork) currently use randomized
+ * spins. If/when MWAIT-like intrinsics becomes available, they
+ * may allow quieter spinning. The value of SPINS must be a power
+ * of two, at least 4. The current value causes spinning for a
+ * small fraction of typical context-switch times, well worthwhile
+ * given the typical likelihoods that blocking is not necessary.
*/
- private static final int MAX_HELP = 64;
+ private static final int SPINS = 1 << 11;
/**
* Increment for seed generators. See class ThreadLocal for
@@ -1177,209 +1338,212 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final int SEED_INCREMENT = 0x9e3779b9;
/*
- * Bits and masks for control variables
- *
- * Field ctl is a long packed with:
- * AC: Number of active running workers minus target parallelism (16 bits)
- * TC: Number of total workers minus target parallelism (16 bits)
- * ST: true if pool is terminating (1 bit)
- * EC: the wait count of top waiting thread (15 bits)
- * ID: poolIndex of top of Treiber stack of waiters (16 bits)
- *
- * When convenient, we can extract the upper 32 bits of counts and
- * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
- * (int)ctl. The ec field is never accessed alone, but always
- * together with id and st. The offsets of counts by the target
- * parallelism and the positionings of fields makes it possible to
- * perform the most common checks via sign tests of fields: When
- * ac is negative, there are not enough active workers, when tc is
- * negative, there are not enough total workers, and when e is
- * negative, the pool is terminating. To deal with these possibly
- * negative fields, we use casts in and out of "short" and/or
- * signed shifts to maintain signedness.
- *
- * When a thread is queued (inactivated), its eventCount field is
- * set negative, which is the only way to tell if a worker is
- * prevented from executing tasks, even though it must continue to
- * scan for them to avoid queuing races. Note however that
- * eventCount updates lag releases so usage requires care.
- *
- * Field plock is an int packed with:
- * SHUTDOWN: true if shutdown is enabled (1 bit)
- * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits)
- * SIGNAL: set when threads may be waiting on the lock (1 bit)
- *
- * The sequence number enables simple consistency checks:
- * Staleness of read-only operations on the workQueues array can
- * be checked by comparing plock before vs after the reads.
- */
-
- // bit positions/shifts for fields
+ * Bits and masks for field ctl, packed with 4 16 bit subfields:
+ * AC: Number of active running workers minus target parallelism
+ * TC: Number of total workers minus target parallelism
+ * SS: version count and status of top waiting thread
+ * ID: poolIndex of top of Treiber stack of waiters
+ *
+ * When convenient, we can extract the lower 32 stack top bits
+ * (including version bits) as sp=(int)ctl. The offsets of counts
+ * by the target parallelism and the positionings of fields makes
+ * it possible to perform the most common checks via sign tests of
+ * fields: When ac is negative, there are not enough active
+ * workers, when tc is negative, there are not enough total
+ * workers. When sp is non-zero, there are waiting workers. To
+ * deal with possibly negative fields, we use casts in and out of
+ * "short" and/or signed shifts to maintain signedness.
+ *
+ * Because it occupies uppermost bits, we can add one active count
+ * using getAndAddLong of AC_UNIT, rather than CAS, when returning
+ * from a blocked join. Other updates entail multiple subfields
+ * and masking, requiring CAS.
+ */
+
+ // Lower and upper word masks
+ private static final long SP_MASK = 0xffffffffL;
+ private static final long UC_MASK = ~SP_MASK;
+
+ // Active counts
private static final int AC_SHIFT = 48;
+ private static final long AC_UNIT = 0x0001L << AC_SHIFT;
+ private static final long AC_MASK = 0xffffL << AC_SHIFT;
+
+ // Total counts
private static final int TC_SHIFT = 32;
- private static final int ST_SHIFT = 31;
- private static final int EC_SHIFT = 16;
-
- // bounds
- private static final int SMASK = 0xffff; // short bits
- private static final int MAX_CAP = 0x7fff; // max #workers - 1
- private static final int EVENMASK = 0xfffe; // even short bits
- private static final int SQMASK = 0x007e; // max 64 (even) slots
- private static final int SHORT_SIGN = 1 << 15;
- private static final int INT_SIGN = 1 << 31;
-
- // masks
- private static final long STOP_BIT = 0x0001L << ST_SHIFT;
- private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
- private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;
-
- // units for incrementing and decrementing
- private static final long TC_UNIT = 1L << TC_SHIFT;
- private static final long AC_UNIT = 1L << AC_SHIFT;
-
- // masks and units for dealing with u = (int)(ctl >>> 32)
- private static final int UAC_SHIFT = AC_SHIFT - 32;
- private static final int UTC_SHIFT = TC_SHIFT - 32;
- private static final int UAC_MASK = SMASK << UAC_SHIFT;
- private static final int UTC_MASK = SMASK << UTC_SHIFT;
- private static final int UAC_UNIT = 1 << UAC_SHIFT;
- private static final int UTC_UNIT = 1 << UTC_SHIFT;
-
- // masks and units for dealing with e = (int)ctl
- private static final int E_MASK = 0x7fffffff; // no STOP_BIT
- private static final int E_SEQ = 1 << EC_SHIFT;
-
- // plock bits
- private static final int SHUTDOWN = 1 << 31;
- private static final int PL_LOCK = 2;
- private static final int PL_SIGNAL = 1;
- private static final int PL_SPINS = 1 << 8;
-
- // access mode for WorkQueue
- static final int LIFO_QUEUE = 0;
- static final int FIFO_QUEUE = 1;
- static final int SHARED_QUEUE = -1;
+ private static final long TC_UNIT = 0x0001L << TC_SHIFT;
+ private static final long TC_MASK = 0xffffL << TC_SHIFT;
+ private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
+
+ // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
+ private static final int RSLOCK = 1;
+ private static final int RSIGNAL = 1 << 1;
+ private static final int STARTED = 1 << 2;
+ private static final int STOP = 1 << 29;
+ private static final int TERMINATED = 1 << 30;
+ private static final int SHUTDOWN = 1 << 31;
// Instance fields
- volatile long stealCount; // collects worker counts
- volatile long ctl; // main pool control
- volatile int plock; // shutdown status and seqLock
- volatile int indexSeed; // worker/submitter index seed
- final short parallelism; // parallelism level
- final short mode; // LIFO/FIFO
- WorkQueue[] workQueues; // main registry
+ volatile long ctl; // main pool control
+ volatile int runState; // lockable status
+ final int config; // parallelism, mode
+ int indexSeed; // to generate worker index
+ volatile WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
- final UncaughtExceptionHandler ueh; // per-worker UEH
- final String workerNamePrefix; // to create worker name string
+ final UncaughtExceptionHandler ueh; // per-worker UEH
+ final String workerNamePrefix; // to create worker name string
+ volatile AtomicLong stealCounter; // also used as sync monitor
/**
- * Acquires the plock lock to protect worker array and related
- * updates. This method is called only if an initial CAS on plock
- * fails. This acts as a spinlock for normal cases, but falls back
- * to builtin monitor to block when (rarely) needed. This would be
- * a terrible idea for a highly contended lock, but works fine as
- * a more conservative alternative to a pure spinlock.
+ * Acquires the runState lock; returns current (locked) runState.
*/
- private int acquirePlock() {
- int spins = PL_SPINS, ps, nps;
- for (;;) {
- if (((ps = plock) & PL_LOCK) == 0 &&
- U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
- return nps;
- else if (spins >= 0) {
- if (ThreadLocalRandom.nextSecondarySeed() >= 0)
+ private int lockRunState() {
+ int rs;
+ return ((((rs = runState) & RSLOCK) != 0 ||
+ !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
+ awaitRunStateLock() : rs);
+ }
+
+ /**
+ * Spins and/or blocks until runstate lock is available. See
+ * above for explanation.
+ */
+ private int awaitRunStateLock() {
+ Object lock;
+ boolean wasInterrupted = false;
+ for (int spins = SPINS, r = 0, rs, ns;;) {
+ if (((rs = runState) & RSLOCK) == 0) {
+ if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
+ if (wasInterrupted) {
+ try {
+ Thread.currentThread().interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
+ return ns;
+ }
+ }
+ else if (r == 0)
+ r = ThreadLocalRandom.nextSecondarySeed();
+ else if (spins > 0) {
+ r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
+ if (r >= 0)
--spins;
}
- else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
- synchronized (this) {
- if ((plock & PL_SIGNAL) != 0) {
+ else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
+ Thread.yield(); // initialization race
+ else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
+ synchronized (lock) {
+ if ((runState & RSIGNAL) != 0) {
try {
- wait();
+ lock.wait();
} catch (InterruptedException ie) {
- try {
- Thread.currentThread().interrupt();
- } catch (SecurityException ignore) {
- }
+ if (!(Thread.currentThread() instanceof
+ ForkJoinWorkerThread))
+ wasInterrupted = true;
}
}
else
- notifyAll();
+ lock.notifyAll();
}
}
}
}
/**
- * Unlocks and signals any thread waiting for plock. Called only
- * when CAS of seq value for unlock fails.
+ * Unlocks and sets runState to newRunState.
+ *
+ * @param oldRunState a value returned from lockRunState
+ * @param newRunState the next value (must have lock bit clear).
*/
- private void releasePlock(int ps) {
- plock = ps;
- synchronized (this) { notifyAll(); }
+ private void unlockRunState(int oldRunState, int newRunState) {
+ if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
+ Object lock = stealCounter;
+ runState = newRunState; // clears RSIGNAL bit
+ if (lock != null)
+ synchronized (lock) { lock.notifyAll(); }
+ }
}
+ // Creating, registering and deregistering workers
+
/**
- * Tries to create and start one worker if fewer than target
- * parallelism level exist. Adjusts counts etc on failure.
+ * Tries to construct and start one worker. Assumes that total
+ * count has already been incremented as a reservation. Invokes
+ * deregisterWorker on any failure.
+ *
+ * @return true if successful
*/
- private void tryAddWorker() {
- long c; int u, e;
- while ((u = (int)((c = ctl) >>> 32)) < 0 &&
- (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
- long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
- ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- ForkJoinWorkerThreadFactory fac;
- Throwable ex = null;
- ForkJoinWorkerThread wt = null;
- try {
- if ((fac = factory) != null &&
- (wt = fac.newThread(this)) != null) {
- wt.start();
- break;
- }
- } catch (Throwable rex) {
- ex = rex;
- }
- deregisterWorker(wt, ex);
- break;
+ private boolean createWorker() {
+ ForkJoinWorkerThreadFactory fac = factory;
+ Throwable ex = null;
+ ForkJoinWorkerThread wt = null;
+ try {
+ if (fac != null && (wt = fac.newThread(this)) != null) {
+ wt.start();
+ return true;
}
+ } catch (Throwable rex) {
+ ex = rex;
}
+ deregisterWorker(wt, ex);
+ return false;
}
- // Registering and deregistering workers
+ /**
+ * Tries to add one worker, incrementing ctl counts before doing
+ * so, relying on createWorker to back out on failure.
+ *
+ * @param c incoming ctl value, with total count negative and no
+ * idle workers. On CAS failure, c is refreshed and retried if
+ * this holds (otherwise, a new worker is not needed).
+ */
+ private void tryAddWorker(long c) {
+ boolean add = false;
+ do {
+ long nc = ((AC_MASK & (c + AC_UNIT)) |
+ (TC_MASK & (c + TC_UNIT)));
+ if (ctl == c) {
+ int rs, stop; // check if terminating
+ if ((stop = (rs = lockRunState()) & STOP) == 0)
+ add = U.compareAndSwapLong(this, CTL, c, nc);
+ unlockRunState(rs, rs & ~RSLOCK);
+ if (stop != 0)
+ break;
+ if (add) {
+ createWorker();
+ break;
+ }
+ }
+ } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
+ }
/**
- * Callback from ForkJoinWorkerThread to establish and record its
- * WorkQueue. To avoid scanning bias due to packing entries in
- * front of the workQueues array, we treat the array as a simple
- * power-of-two hash table using per-thread seed as hash,
- * expanding as needed.
+ * Callback from ForkJoinWorkerThread constructor to establish and
+ * record its WorkQueue.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
- UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
- wt.setDaemon(true);
+ UncaughtExceptionHandler handler;
+ wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
- do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
- s += SEED_INCREMENT) ||
- s == 0); // skip 0
- WorkQueue w = new WorkQueue(this, wt, mode, s);
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
+ WorkQueue w = new WorkQueue(this, wt);
+ int i = 0; // assign a pool index
+ int mode = config & MODE_MASK;
+ int rs = lockRunState();
try {
- if ((ws = workQueues) != null) { // skip if shutting down
- int n = ws.length, m = n - 1;
- int r = (s << 1) | 1; // use odd-numbered indices
- if (ws[r &= m] != null) { // collision
- int probes = 0; // step by approx half size
+ WorkQueue[] ws; int n; // skip if no array
+ if ((ws = workQueues) != null && (n = ws.length) > 0) {
+ int s = indexSeed += SEED_INCREMENT; // unlikely to collide
+ int m = n - 1;
+ i = ((s << 1) | 1) & m; // odd-numbered indices
+ if (ws[i] != null) { // collision
+ int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
- while (ws[r = (r + step) & m] != null) {
+ while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
@@ -1387,15 +1551,15 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- w.poolIndex = (short)r;
- w.eventCount = r; // volatile write orders
- ws[r] = w;
+ w.hint = s; // use as random seed
+ w.config = i | mode;
+ w.scanState = i; // publication fence
+ ws[i] = w;
}
} finally {
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
+ unlockRunState(rs, rs & ~RSLOCK);
}
- wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
+ wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
@@ -1411,384 +1575,322 @@ public class ForkJoinPool extends AbstractExecutorService {
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
- int ps;
- w.qlock = -1; // ensure set
- U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
- try {
- int idx = w.poolIndex;
- WorkQueue[] ws = workQueues;
- if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
- ws[idx] = null;
- } finally {
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- }
- }
-
- long c; // adjust ctl counts
+ WorkQueue[] ws; // remove index from array
+ int idx = w.config & SMASK;
+ int rs = lockRunState();
+ if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
+ ws[idx] = null;
+ unlockRunState(rs, rs & ~RSLOCK);
+ }
+ long c; // decrement counts
do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
- ((c - TC_UNIT) & TC_MASK) |
- (c & ~(AC_MASK|TC_MASK)))));
-
- if (!tryTerminate(false, false) && w != null && w.array != null) {
- w.cancelAll(); // cancel remaining tasks
- WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
- while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
- if (e > 0) { // activate or create replacement
- if ((ws = workQueues) == null ||
- (i = e & SMASK) >= ws.length ||
- (v = ws[i]) == null)
- break;
- long nc = (((long)(v.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (v.eventCount != (e | INT_SIGN))
- break;
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- v.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = v.parker) != null)
- U.unpark(p);
- break;
- }
- }
- else {
- if ((short)u < 0)
- tryAddWorker();
+ (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
+ (TC_MASK & (c - TC_UNIT)) |
+ (SP_MASK & c))));
+ if (w != null) {
+ w.qlock = -1; // ensure set
+ w.transferStealCount(this);
+ w.cancelAll(); // cancel remaining tasks
+ }
+ for (;;) { // possibly replace
+ WorkQueue[] ws; int m, sp;
+ if (tryTerminate(false, false) || w == null || w.array == null ||
+ (runState & STOP) != 0 || (ws = workQueues) == null ||
+ (m = ws.length - 1) < 0) // already terminating
+ break;
+ if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
+ if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
- }
}
+ else if (ex != null && (c & ADD_WORKER) != 0L) {
+ tryAddWorker(c); // create replacement
+ break;
+ }
+ else // don't need replacement
+ break;
}
- if (ex == null) // help clean refs on way out
+ if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
- else // rethrow
+ else // rethrow
ForkJoinTask.rethrow(ex);
}
- // Submissions
-
- /**
- * Unless shutting down, adds the given task to a submission queue
- * at submitter's current queue index (modulo submission
- * range). Only the most common path is directly handled in this
- * method. All others are relayed to fullExternalPush.
- *
- * @param task the task. Caller must ensure non-null.
- */
- final void externalPush(ForkJoinTask<?> task) {
- WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
- int r = ThreadLocalRandom.getProbe();
- int ps = plock;
- WorkQueue[] ws = workQueues;
- if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
- (q = ws[m & r & SQMASK]) != null && r != 0 &&
- U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
- if ((a = q.array) != null &&
- (am = a.length - 1) > (n = (s = q.top) - q.base)) {
- int j = ((am & s) << ASHIFT) + ABASE;
- U.putOrderedObject(a, j, task);
- q.top = s + 1; // push on to deque
- q.qlock = 0;
- if (n <= 1)
- signalWork(ws, q);
- return;
- }
- q.qlock = 0;
- }
- fullExternalPush(task);
- }
-
- /**
- * Full version of externalPush. This method is called, among
- * other times, upon the first submission of the first task to the
- * pool, so must perform secondary initialization. It also
- * detects first submission by an external thread by looking up
- * its ThreadLocal, and creates a new shared queue if the one at
- * index if empty or contended. The plock lock body must be
- * exception-free (so no try/finally) so we optimistically
- * allocate new queues outside the lock and throw them away if
- * (very rarely) not needed.
- *
- * Secondary initialization occurs when plock is zero, to create
- * workQueue array and set plock to a valid value. This lock body
- * must also be exception-free. Because the plock seq value can
- * eventually wrap around zero, this method harmlessly fails to
- * reinitialize if workQueues exists, while still advancing plock.
- */
- private void fullExternalPush(ForkJoinTask<?> task) {
- int r;
- if ((r = ThreadLocalRandom.getProbe()) == 0) {
- ThreadLocalRandom.localInit();
- r = ThreadLocalRandom.getProbe();
- }
- for (;;) {
- WorkQueue[] ws; WorkQueue q; int ps, m, k;
- boolean move = false;
- if ((ps = plock) < 0)
- throw new RejectedExecutionException();
- else if (ps == 0 || (ws = workQueues) == null ||
- (m = ws.length - 1) < 0) { // initialize workQueues
- int p = parallelism; // find power of two table size
- int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
- n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
- n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
- WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
- new WorkQueue[n] : null);
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if (((ws = workQueues) == null || ws.length == 0) && nws != null)
- workQueues = nws;
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- }
- else if ((q = ws[k = r & m & SQMASK]) != null) {
- if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- ForkJoinTask<?>[] a = q.array;
- int s = q.top;
- boolean submitted = false;
- try { // locked version of push
- if ((a != null && a.length > s + 1 - q.base) ||
- (a = q.growArray()) != null) { // must presize
- int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
- U.putOrderedObject(a, j, task);
- q.top = s + 1;
- submitted = true;
- }
- } finally {
- q.qlock = 0; // unlock
- }
- if (submitted) {
- signalWork(ws, q);
- return;
- }
- }
- move = true; // move on failure
- }
- else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
- q = new WorkQueue(this, null, SHARED_QUEUE, r);
- q.poolIndex = (short)k;
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
- ws[k] = q;
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- }
- else
- move = true; // move if busy
- if (move)
- r = ThreadLocalRandom.advanceProbe(r);
- }
- }
-
- // Maintaining ctl counts
-
- /**
- * Increments active count; mainly called upon return from blocking.
- */
- final void incrementActiveCount() {
- long c;
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, ((c & ~AC_MASK) |
- ((c & AC_MASK) + AC_UNIT))));
- }
+ // Signalling
/**
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
- * @param q if non-null, the queue holding tasks to be processed
+ * @param q a WorkQueue --if non-null, don't retry if now empty
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
- for (;;) {
- long c; int e, u, i; WorkQueue w; Thread p;
- if ((u = (int)((c = ctl) >>> 32)) >= 0)
- break;
- if ((e = (int)c) <= 0) {
- if ((short)u < 0)
- tryAddWorker();
+ long c; int sp, i; WorkQueue v; Thread p;
+ while ((c = ctl) < 0L) { // too few active
+ if ((sp = (int)c) == 0) { // no idle workers
+ if ((c & ADD_WORKER) != 0L) // too few workers
+ tryAddWorker(c);
break;
}
- if (ws == null || ws.length <= (i = e & SMASK) ||
- (w = ws[i]) == null)
+ if (ws == null) // unstarted/terminated
+ break;
+ if (ws.length <= (i = sp & SMASK)) // terminated
break;
- long nc = (((long)(w.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT)) << 32);
- int ne = (e + E_SEQ) & E_MASK;
- if (w.eventCount == (e | INT_SIGN) &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = ne;
- if ((p = w.parker) != null)
+ if ((v = ws[i]) == null) // terminating
+ break;
+ int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
+ int d = sp - v.scanState; // screen CAS
+ long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
+ if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
+ v.scanState = vs; // activate v
+ if ((p = v.parker) != null)
U.unpark(p);
break;
}
- if (q != null && q.base >= q.top)
+ if (q != null && q.base == q.top) // no more work
break;
}
}
+ /**
+ * Signals and releases worker v if it is top of idle worker
+ * stack. This performs a one-shot version of signalWork only if
+ * there is (apparently) at least one idle worker.
+ *
+ * @param c incoming ctl value
+ * @param v if non-null, a worker
+ * @param inc the increment to active count (zero when compensating)
+ * @return true if successful
+ */
+ private boolean tryRelease(long c, WorkQueue v, long inc) {
+ int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
+ if (v != null && v.scanState == sp) { // v is at top of stack
+ long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ v.scanState = vs;
+ if ((p = v.parker) != null)
+ U.unpark(p);
+ return true;
+ }
+ }
+ return false;
+ }
+
// Scanning for tasks
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
- w.growArray(); // allocate queue
- for (int r = w.hint; scan(w, r) == 0; ) {
+ w.growArray(); // allocate queue
+ int seed = w.hint; // initially holds randomization hint
+ int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
+ for (ForkJoinTask<?> t;;) {
+ if ((t = scan(w, r)) != null)
+ w.runTask(t);
+ else if (!awaitWork(w, r))
+ break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
/**
- * Scans for and, if found, runs one task, else possibly
- * inactivates the worker. This method operates on single reads of
- * volatile state and is designed to be re-invoked continuously,
- * in part because it returns upon detecting inconsistencies,
- * contention, or state changes that indicate possible success on
- * re-invocation.
- *
- * The scan searches for tasks across queues starting at a random
- * index, checking each at least twice. The scan terminates upon
- * either finding a non-empty queue, or completing the sweep. If
- * the worker is not inactivated, it takes and runs a task from
- * this queue. Otherwise, if not activated, it tries to activate
- * itself or some other worker by signalling. On failure to find a
- * task, returns (for retry) if pool state may have changed during
- * an empty scan, or tries to inactivate if active, else possibly
- * blocks or terminates via method awaitWork.
+ * Scans for and tries to steal a top-level task. Scans start at a
+ * random location, randomly moving on apparent contention,
+ * otherwise continuing linearly until reaching two consecutive
+ * empty passes over all queues with the same checksum (summing
+ * each base index of each queue, that moves on each steal), at
+ * which point the worker tries to inactivate and then re-scans,
+ * attempting to re-activate (itself or some other worker) if
+ * finding a task; otherwise returning null to await work. Scans
+ * otherwise touch as little memory as possible, to reduce
+ * disruption on other scanning threads.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
- * @return worker qlock status if would have waited, else 0
+ * @return a task, or null if none found
*/
- private final int scan(WorkQueue w, int r) {
+ private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
- long c = ctl; // for consistency check
- if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
- for (int j = m + m + 1, ec = w.eventCount;;) {
- WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
- if ((q = ws[(r - j) & m]) != null &&
- (b = q.base) - q.top < 0 && (a = q.array) != null) {
- long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- if ((t = ((ForkJoinTask<?>)
- U.getObjectVolatile(a, i))) != null) {
- if (ec < 0)
- helpRelease(c, ws, w, q, b);
- else if (q.base == b &&
- U.compareAndSwapObject(a, i, t, null)) {
- U.putOrderedInt(q, QBASE, b + 1);
- if ((b + 1) - q.top < 0)
- signalWork(ws, q);
- w.runTask(t);
+ if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
+ int ss = w.scanState; // initially non-negative
+ for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
+ WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
+ int b, n; long c;
+ if ((q = ws[k]) != null) {
+ if ((n = (b = q.base) - q.top) < 0 &&
+ (a = q.array) != null) { // non-empty
+ long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ if ((t = ((ForkJoinTask<?>)
+ U.getObjectVolatile(a, i))) != null &&
+ q.base == b) {
+ if (ss >= 0) {
+ if (U.compareAndSwapObject(a, i, t, null)) {
+ q.base = b + 1;
+ if (n < -1) // signal others
+ signalWork(ws, q);
+ return t;
+ }
+ }
+ else if (oldSum == 0 && // try to activate
+ w.scanState < 0)
+ tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
+ if (ss < 0) // refresh
+ ss = w.scanState;
+ r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
+ origin = k = r & m; // move and rescan
+ oldSum = checkSum = 0;
+ continue;
}
- break;
+ checkSum += b;
}
- else if (--j < 0) {
- if ((ec | (e = (int)c)) < 0) // inactive or terminating
- return awaitWork(w, c, ec);
- else if (ctl == c) { // try to inactivate and enqueue
- long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
- w.nextWait = e;
- w.eventCount = ec | INT_SIGN;
- if (!U.compareAndSwapLong(this, CTL, c, nc))
- w.eventCount = ec; // back out
+ if ((k = (k + 1) & m) == origin) { // continue until stable
+ if ((ss >= 0 || (ss == (ss = w.scanState))) &&
+ oldSum == (oldSum = checkSum)) {
+ if (ss < 0 || w.qlock < 0) // already inactive
+ break;
+ int ns = ss | INACTIVE; // try to inactivate
+ long nc = ((SP_MASK & ns) |
+ (UC_MASK & ((c = ctl) - AC_UNIT)));
+ w.stackPred = (int)c; // hold prev stack top
+ U.putInt(w, QSCANSTATE, ns);
+ if (U.compareAndSwapLong(this, CTL, c, nc))
+ ss = ns;
+ else
+ w.scanState = ss; // back out
}
- break;
+ checkSum = 0;
}
}
}
- return 0;
+ return null;
}
/**
- * A continuation of scan(), possibly blocking or terminating
- * worker w. Returns without blocking if pool state has apparently
- * changed since last invocation. Also, if inactivating w has
- * caused the pool to become quiescent, checks for pool
+ * Possibly blocks worker w waiting for a task to steal, or
+ * returns false if the worker should terminate. If inactivating
+ * w has caused the pool to become quiescent, checks for pool
* termination, and, so long as this is not the only worker, waits
- * for event for up to a given duration. On timeout, if ctl has
- * not changed, terminates the worker, which will in turn wake up
+ * for up to a given duration. On timeout, if ctl has not
+ * changed, terminates the worker, which will in turn wake up
* another worker to possibly repeat this process.
*
* @param w the calling worker
- * @param c the ctl value on entry to scan
- * @param ec the worker's eventCount on entry to scan
- */
- private final int awaitWork(WorkQueue w, long c, int ec) {
- int stat, ns; long parkTime, deadline;
- if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
- !Thread.interrupted()) {
- int e = (int)c;
- int u = (int)(c >>> 32);
- int d = (u >> UAC_SHIFT) + parallelism; // active count
-
- if (e < 0 || (d <= 0 && tryTerminate(false, false)))
- stat = w.qlock = -1; // pool is terminating
- else if ((ns = w.nsteals) != 0) { // collect steals and retry
- w.nsteals = 0;
- U.getAndAddLong(this, STEALCOUNT, (long)ns);
+ * @param r a random seed (for spins)
+ * @return false if the worker should terminate
+ */
+ private boolean awaitWork(WorkQueue w, int r) {
+ if (w == null || w.qlock < 0) // w is terminating
+ return false;
+ for (int pred = w.stackPred, spins = SPINS, ss;;) {
+ if ((ss = w.scanState) >= 0)
+ break;
+ else if (spins > 0) {
+ r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
+ if (r >= 0 && --spins == 0) { // randomize spins
+ WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
+ if (pred != 0 && (ws = workQueues) != null &&
+ (j = pred & SMASK) < ws.length &&
+ (v = ws[j]) != null && // see if pred parking
+ (v.parker == null || v.scanState >= 0))
+ spins = SPINS; // continue spinning
+ }
}
- else {
- long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
- ((long)(w.nextWait & E_MASK)) | // ctl to restore
- ((long)(u + UAC_UNIT)) << 32);
- if (pc != 0L) { // timed wait if last waiter
- int dc = -(short)(c >>> TC_SHIFT);
- parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
- (dc + 1) * IDLE_TIMEOUT);
+ else if (w.qlock < 0) // recheck after spins
+ return false;
+ else if (!Thread.interrupted()) {
+ long c, prevctl, parkTime, deadline;
+ int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
+ if ((ac <= 0 && tryTerminate(false, false)) ||
+ (runState & STOP) != 0) // pool terminating
+ return false;
+ if (ac <= 0 && ss == (int)c) { // is last waiter
+ prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
+ int t = (short)(c >>> TC_SHIFT); // shrink excess spares
+ if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
+ return false; // else use timed wait
+ parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
- parkTime = deadline = 0L;
- if (w.eventCount == ec && ctl == c) {
- Thread wt = Thread.currentThread();
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt; // emulate LockSupport.park
- if (w.eventCount == ec && ctl == c)
- U.park(false, parkTime); // must recheck before park
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- if (parkTime != 0L && ctl == c &&
- deadline - System.nanoTime() <= 0L &&
- U.compareAndSwapLong(this, CTL, c, pc))
- stat = w.qlock = -1; // shrink pool
- }
+ prevctl = parkTime = deadline = 0L;
+ Thread wt = Thread.currentThread();
+ U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
+ w.parker = wt;
+ if (w.scanState < 0 && ctl == c) // recheck before park
+ U.park(false, parkTime);
+ U.putOrderedObject(w, QPARKER, null);
+ U.putObject(wt, PARKBLOCKER, null);
+ if (w.scanState >= 0)
+ break;
+ if (parkTime != 0L && ctl == c &&
+ deadline - System.nanoTime() <= 0L &&
+ U.compareAndSwapLong(this, CTL, c, prevctl))
+ return false; // shrink pool
}
}
- return stat;
+ return true;
}
+ // Joining tasks
+
/**
- * Possibly releases (signals) a worker. Called only from scan()
- * when a worker with apparently inactive status finds a non-empty
- * queue. This requires revalidating all of the associated state
- * from caller.
+ * Tries to steal and run tasks within the target's computation.
+ * Uses a variant of the top-level algorithm, restricted to tasks
+ * with the given task as ancestor: It prefers taking and running
+ * eligible tasks popped from the worker's own queue (via
+ * popCC). Otherwise it scans others, randomly moving on
+ * contention or execution, deciding to give up based on a
+ * checksum (via return codes frob pollAndExecCC). The maxTasks
+ * argument supports external usages; internal calls use zero,
+ * allowing unbounded steps (external calls trap non-positive
+ * values).
+ *
+ * @param w caller
+ * @param maxTasks if non-zero, the maximum number of other tasks to run
+ * @return task status on exit
*/
- private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
- WorkQueue q, int b) {
- WorkQueue v; int e, i; Thread p;
- if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
- ws != null && ws.length > (i = e & SMASK) &&
- (v = ws[i]) != null && ctl == c) {
- long nc = (((long)(v.nextWait & E_MASK)) |
- ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
- int ne = (e + E_SEQ) & E_MASK;
- if (q != null && q.base == b && w.eventCount < 0 &&
- v.eventCount == (e | INT_SIGN) &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- v.eventCount = ne;
- if ((p = v.parker) != null)
- U.unpark(p);
+ final int helpComplete(WorkQueue w, CountedCompleter<?> task,
+ int maxTasks) {
+ WorkQueue[] ws; int s = 0, m;
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
+ task != null && w != null) {
+ int mode = w.config; // for popCC
+ int r = w.hint ^ w.top; // arbitrary seed for origin
+ int origin = r & m; // first queue to scan
+ int h = 1; // 1:ran, >1:contended, <0:hash
+ for (int k = origin, oldSum = 0, checkSum = 0;;) {
+ CountedCompleter<?> p; WorkQueue q;
+ if ((s = task.status) < 0)
+ break;
+ if (h == 1 && (p = w.popCC(task, mode)) != null) {
+ p.doExec(); // run local task
+ if (maxTasks != 0 && --maxTasks == 0)
+ break;
+ origin = k; // reset
+ oldSum = checkSum = 0;
+ }
+ else { // poll other queues
+ if ((q = ws[k]) == null)
+ h = 0;
+ else if ((h = q.pollAndExecCC(task)) < 0)
+ checkSum += h;
+ if (h > 0) {
+ if (h == 1 && maxTasks != 0 && --maxTasks == 0)
+ break;
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ origin = k = r & m; // move and restart
+ oldSum = checkSum = 0;
+ }
+ else if ((k = (k + 1) & m) == origin) {
+ if (oldSum == (oldSum = checkSum))
+ break;
+ checkSum = 0;
+ }
+ }
}
}
+ return s;
}
/**
@@ -1799,268 +1901,167 @@ public class ForkJoinPool extends AbstractExecutorService {
* execute tasks from. The first call to this method upon a
* waiting join will often entail scanning/search, (which is OK
* because the joiner has nothing better to do), but this method
- * leaves hints in workers to speed up subsequent calls. The
- * implementation is very branchy to cope with potential
- * inconsistencies or loops encountering chains that are stale,
- * unknown, or so long that they are likely cyclic.
+ * leaves hints in workers to speed up subsequent calls.
*
- * @param joiner the joining worker
+ * @param w caller
* @param task the task to join
- * @return 0 if no progress can be made, negative if task
- * known complete, else positive
- */
- private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
- int stat = 0, steps = 0; // bound to avoid cycles
- if (task != null && joiner != null &&
- joiner.base - joiner.top >= 0) { // hoist checks
- restart: for (;;) {
- ForkJoinTask<?> subtask = task; // current target
- for (WorkQueue j = joiner, v;;) { // v is stealer of subtask
- WorkQueue[] ws; int m, s, h;
- if ((s = task.status) < 0) {
- stat = s;
- break restart;
- }
- if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
- break restart; // shutting down
- if ((v = ws[h = (j.hint | 1) & m]) == null ||
- v.currentSteal != subtask) {
- for (int origin = h;;) { // find stealer
- if (((h = (h + 2) & m) & 15) == 1 &&
- (subtask.status < 0 || j.currentJoin != subtask))
- continue restart; // occasional staleness check
- if ((v = ws[h]) != null &&
- v.currentSteal == subtask) {
- j.hint = h; // save hint
+ */
+ private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
+ WorkQueue[] ws = workQueues;
+ int oldSum = 0, checkSum, m;
+ if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
+ task != null) {
+ do { // restart point
+ checkSum = 0; // for stability check
+ ForkJoinTask<?> subtask;
+ WorkQueue j = w, v; // v is subtask stealer
+ descent: for (subtask = task; subtask.status >= 0; ) {
+ for (int h = j.hint | 1, k = 0, i; ; k += 2) {
+ if (k > m) // can't find stealer
+ break descent;
+ if ((v = ws[i = (h + k) & m]) != null) {
+ if (v.currentSteal == subtask) {
+ j.hint = i;
break;
}
- if (h == origin)
- break restart; // cannot find stealer
+ checkSum += v.base;
}
}
- for (;;) { // help stealer or descend to its stealer
+ for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
- if (subtask.status < 0) // surround probes with
- continue restart; // consistency checks
- if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
- int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- ForkJoinTask<?> t =
- (ForkJoinTask<?>)U.getObjectVolatile(a, i);
- if (subtask.status < 0 || j.currentJoin != subtask ||
- v.currentSteal != subtask)
- continue restart; // stale
- stat = 1; // apparent progress
- if (v.base == b) {
- if (t == null)
- break restart;
- if (U.compareAndSwapObject(a, i, t, null)) {
- U.putOrderedInt(v, QBASE, b + 1);
- ForkJoinTask<?> ps = joiner.currentSteal;
- int jt = joiner.top;
- do {
- joiner.currentSteal = t;
- t.doExec(); // clear local tasks too
- } while (task.status >= 0 &&
- joiner.top != jt &&
- (t = joiner.pop()) != null);
- joiner.currentSteal = ps;
- break restart;
- }
- }
+ checkSum += (b = v.base);
+ ForkJoinTask<?> next = v.currentJoin;
+ if (subtask.status < 0 || j.currentJoin != subtask ||
+ v.currentSteal != subtask) // stale
+ break descent;
+ if (b - v.top >= 0 || (a = v.array) == null) {
+ if ((subtask = next) == null)
+ break descent;
+ j = v;
+ break;
}
- else { // empty -- try to descend
- ForkJoinTask<?> next = v.currentJoin;
- if (subtask.status < 0 || j.currentJoin != subtask ||
- v.currentSteal != subtask)
- continue restart; // stale
- else if (next == null || ++steps == MAX_HELP)
- break restart; // dead-end or maybe cyclic
- else {
- subtask = next;
- j = v;
- break;
+ int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ ForkJoinTask<?> t = ((ForkJoinTask<?>)
+ U.getObjectVolatile(a, i));
+ if (v.base == b) {
+ if (t == null) // stale
+ break descent;
+ if (U.compareAndSwapObject(a, i, t, null)) {
+ v.base = b + 1;
+ ForkJoinTask<?> ps = w.currentSteal;
+ int top = w.top;
+ do {
+ U.putOrderedObject(w, QCURRENTSTEAL, t);
+ t.doExec(); // clear local tasks too
+ } while (task.status >= 0 &&
+ w.top != top &&
+ (t = w.pop()) != null);
+ U.putOrderedObject(w, QCURRENTSTEAL, ps);
+ if (w.base != w.top)
+ return; // can't further help
}
}
}
}
- }
+ } while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
- return stat;
- }
-
- /**
- * Analog of tryHelpStealer for CountedCompleters. Tries to steal
- * and run tasks within the target's computation.
- *
- * @param task the task to join
- * @param maxTasks the maximum number of other tasks to run
- */
- final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
- int maxTasks) {
- WorkQueue[] ws; int m;
- int s = 0;
- if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
- joiner != null && task != null) {
- int j = joiner.poolIndex;
- int scans = m + m + 1;
- long c = 0L; // for stability check
- for (int k = scans; ; j += 2) {
- WorkQueue q;
- if ((s = task.status) < 0)
- break;
- else if (joiner.internalPopAndExecCC(task)) {
- if (--maxTasks <= 0) {
- s = task.status;
- break;
- }
- k = scans;
- }
- else if ((s = task.status) < 0)
- break;
- else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
- if (--maxTasks <= 0) {
- s = task.status;
- break;
- }
- k = scans;
- }
- else if (--k < 0) {
- if (c == (c = ctl))
- break;
- k = scans;
- }
- }
- }
- return s;
}
/**
* Tries to decrement active count (sometimes implicitly) and
* possibly release or create a compensating worker in preparation
- * for blocking. Fails on contention or termination. Otherwise,
- * adds a new thread if no idle workers are available and pool
- * may become starved.
- *
- * @param c the assumed ctl value
- */
- final boolean tryCompensate(long c) {
- WorkQueue[] ws = workQueues;
- int pc = parallelism, e = (int)c, m, tc;
- if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
- WorkQueue w = ws[e & m];
- if (e != 0 && w != null) {
- Thread p;
- long nc = ((long)(w.nextWait & E_MASK) |
- (c & (AC_MASK|TC_MASK)));
- int ne = (e + E_SEQ) & E_MASK;
- if (w.eventCount == (e | INT_SIGN) &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = ne;
- if ((p = w.parker) != null)
- U.unpark(p);
- return true; // replace with idle worker
+ * for blocking. Returns false (retryable by caller), on
+ * contention, detected staleness, instability, or termination.
+ *
+ * @param w caller
+ */
+ private boolean tryCompensate(WorkQueue w) {
+ boolean canBlock;
+ WorkQueue[] ws; long c; int m, pc, sp;
+ if (w == null || w.qlock < 0 || // caller terminating
+ (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
+ (pc = config & SMASK) == 0) // parallelism disabled
+ canBlock = false;
+ else if ((sp = (int)(c = ctl)) != 0) // release idle worker
+ canBlock = tryRelease(c, ws[sp & m], 0L);
+ else {
+ int ac = (int)(c >> AC_SHIFT) + pc;
+ int tc = (short)(c >> TC_SHIFT) + pc;
+ int nbusy = 0; // validate saturation
+ for (int i = 0; i <= m; ++i) { // two passes of odd indices
+ WorkQueue v;
+ if ((v = ws[((i << 1) | 1) & m]) != null) {
+ if ((v.scanState & SCANNING) != 0)
+ break;
+ ++nbusy;
}
}
- else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
- (int)(c >> AC_SHIFT) + pc > 1) {
- long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
- if (U.compareAndSwapLong(this, CTL, c, nc))
- return true; // no compensation
+ if (nbusy != (tc << 1) || ctl != c)
+ canBlock = false; // unstable or stale
+ else if (tc >= pc && ac > 1 && w.isEmpty()) {
+ long nc = ((AC_MASK & (c - AC_UNIT)) |
+ (~AC_MASK & c)); // uncompensated
+ canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
- else if (tc + pc < MAX_CAP) {
- long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- ForkJoinWorkerThreadFactory fac;
- Throwable ex = null;
- ForkJoinWorkerThread wt = null;
- try {
- if ((fac = factory) != null &&
- (wt = fac.newThread(this)) != null) {
- wt.start();
- return true;
- }
- } catch (Throwable rex) {
- ex = rex;
- }
- deregisterWorker(wt, ex); // clean up and return false
- }
+ else if (tc >= MAX_CAP ||
+ (this == common && tc >= pc + commonMaxSpares))
+ throw new RejectedExecutionException(
+ "Thread limit exceeded replacing blocked worker");
+ else { // similar to tryAddWorker
+ boolean add = false; int rs; // CAS within lock
+ long nc = ((AC_MASK & c) |
+ (TC_MASK & (c + TC_UNIT)));
+ if (((rs = lockRunState()) & STOP) == 0)
+ add = U.compareAndSwapLong(this, CTL, c, nc);
+ unlockRunState(rs, rs & ~RSLOCK);
+ canBlock = add && createWorker(); // throws on exception
}
}
- return false;
+ return canBlock;
}
/**
- * Helps and/or blocks until the given task is done.
+ * Helps and/or blocks until the given task is done or timeout.
*
- * @param joiner the joining worker
+ * @param w caller
* @param task the task
+ * @param deadline for timed waits, if nonzero
* @return task status on exit
*/
- final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
+ final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
- if (task != null && (s = task.status) >= 0 && joiner != null) {
- ForkJoinTask<?> prevJoin = joiner.currentJoin;
- joiner.currentJoin = task;
- do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
- (s = task.status) >= 0);
- if (s >= 0 && (task instanceof CountedCompleter))
- s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
- long cc = 0; // for stability checks
- while (s >= 0 && (s = task.status) >= 0) {
- if ((s = tryHelpStealer(joiner, task)) == 0 &&
- (s = task.status) >= 0) {
- if (!tryCompensate(cc))
- cc = ctl;
- else {
- if (task.trySetSignal() && (s = task.status) >= 0) {
- synchronized (task) {
- if (task.status >= 0) {
- try { // see ForkJoinTask
- task.wait(); // for explanation
- } catch (InterruptedException ie) {
- }
- }
- else
- task.notifyAll();
- }
- }
- long c; // reactivate
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl,
- ((c & ~AC_MASK) |
- ((c & AC_MASK) + AC_UNIT))));
- }
+ if (task != null && w != null) {
+ ForkJoinTask<?> prevJoin = w.currentJoin;
+ U.putOrderedObject(w, QCURRENTJOIN, task);
+ CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
+ (CountedCompleter<?>)task : null;
+ for (;;) {
+ if ((s = task.status) < 0)
+ break;
+ if (cc != null)
+ helpComplete(w, cc, 0);
+ else if (w.base == w.top || w.tryRemoveAndExec(task))
+ helpStealer(w, task);
+ if ((s = task.status) < 0)
+ break;
+ long ms, ns;
+ if (deadline == 0L)
+ ms = 0L;
+ else if ((ns = deadline - System.nanoTime()) <= 0L)
+ break;
+ else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
+ ms = 1L;
+ if (tryCompensate(w)) {
+ task.internalWait(ms);
+ U.getAndAddLong(this, CTL, AC_UNIT);
}
}
- joiner.currentJoin = prevJoin;
+ U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
- /**
- * Stripped-down variant of awaitJoin used by timed joins. Tries
- * to help join only while there is continuous progress. (Caller
- * will then enter a timed wait.)
- *
- * @param joiner the joining worker
- * @param task the task
- */
- final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
- int s;
- if (joiner != null && task != null && (s = task.status) >= 0) {
- ForkJoinTask<?> prevJoin = joiner.currentJoin;
- joiner.currentJoin = task;
- do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
- (s = task.status) >= 0);
- if (s >= 0) {
- if (task instanceof CountedCompleter)
- helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
- do {} while (task.status >= 0 &&
- tryHelpStealer(joiner, task) > 0);
- }
- joiner.currentJoin = prevJoin;
- }
- }
+ // Specialized scanning
/**
* Returns a (probably) non-empty steal queue, if one is found
@@ -2068,19 +2069,24 @@ public class ForkJoinPool extends AbstractExecutorService {
* caller if, by the time it tries to use the queue, it is empty.
*/
private WorkQueue findNonEmptyStealQueue() {
+ WorkQueue[] ws; int m; // one-shot version of scan loop
int r = ThreadLocalRandom.nextSecondarySeed();
- for (;;) {
- int ps = plock, m; WorkQueue[] ws; WorkQueue q;
- if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
- for (int j = (m + 1) << 2; j >= 0; --j) {
- if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
- q.base - q.top < 0)
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
+ for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
+ WorkQueue q; int b;
+ if ((q = ws[k]) != null) {
+ if ((b = q.base) - q.top < 0)
return q;
+ checkSum += b;
+ }
+ if ((k = (k + 1) & m) == origin) {
+ if (oldSum == (oldSum = checkSum))
+ break;
+ checkSum = 0;
}
}
- if (plock == ps)
- return null;
}
+ return null;
}
/**
@@ -2090,35 +2096,34 @@ public class ForkJoinPool extends AbstractExecutorService {
* find tasks either.
*/
final void helpQuiescePool(WorkQueue w) {
- ForkJoinTask<?> ps = w.currentSteal;
+ ForkJoinTask<?> ps = w.currentSteal; // save context
for (boolean active = true;;) {
long c; WorkQueue q; ForkJoinTask<?> t; int b;
- while ((t = w.nextLocalTask()) != null)
- t.doExec();
+ w.execLocalTasks(); // run locals before each scan
if ((q = findNonEmptyStealQueue()) != null) {
if (!active) { // re-establish active count
active = true;
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl,
- ((c & ~AC_MASK) |
- ((c & AC_MASK) + AC_UNIT))));
+ U.getAndAddLong(this, CTL, AC_UNIT);
+ }
+ if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
+ U.putOrderedObject(w, QCURRENTSTEAL, t);
+ t.doExec();
+ if (++w.nsteals < 0)
+ w.transferStealCount(this);
}
- if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
- w.runTask(t);
}
else if (active) { // decrement active count without queuing
- long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
- if ((int)(nc >> AC_SHIFT) + parallelism == 0)
+ long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
+ if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
break; // bypass decrement-then-increment
if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
- else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
- U.compareAndSwapLong
- (this, CTL, c, ((c & ~AC_MASK) |
- ((c & AC_MASK) + AC_UNIT))))
+ else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
+ U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
break;
}
+ U.putOrderedObject(w, QCURRENTSTEAL, ps);
}
/**
@@ -2141,7 +2146,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Returns a cheap heuristic guide for task partitioning when
* programmers, frameworks, tools, or languages have little or no
- * idea about task granularity. In essence by offering this
+ * idea about task granularity. In essence, by offering this
* method, we ask users only about tradeoffs in overhead vs
* expected throughput and its variance, rather than how finely to
* partition tasks.
@@ -2179,15 +2184,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* many of these by further considering the number of "idle"
* threads, that are known to have zero queued tasks, so
* compensate by a factor of (#idle/#active) threads.
- *
- * Note: The approximation of #busy workers as #active workers is
- * not very good under current signalling scheme, and should be
- * improved.
*/
static int getSurplusQueuedTaskCount() {
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
- int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
+ int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).
+ config & SMASK;
int n = (q = wt.workQueue).top - q.base;
int a = (int)(pool.ctl >> AC_SHIFT) + p;
return n - (a > (p >>>= 1) ? 0 :
@@ -2202,13 +2204,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// Termination
/**
- * Possibly initiates and/or completes termination. The caller
- * triggering termination runs three passes through workQueues:
- * (0) Setting termination status, followed by wakeups of queued
- * workers; (1) cancelling all tasks; (2) interrupting lagging
- * threads (likely in external tasks, but possibly also blocked in
- * joins). Each pass repeats previous steps because of potential
- * lagging thread creation.
+ * Possibly initiates and/or completes termination.
*
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
@@ -2216,166 +2212,256 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
- int ps;
- if (this == common) // cannot shut down
+ int rs;
+ if (this == common) // cannot shut down
return false;
- if ((ps = plock) >= 0) { // enable by setting plock
+ if ((rs = runState) >= 0) {
if (!enable)
return false;
- if ((ps & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- }
- for (long c;;) {
- if (((c = ctl) & STOP_BIT) != 0) { // already terminating
- if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
- synchronized (this) {
- notifyAll(); // signal when 0 workers
+ rs = lockRunState(); // enter SHUTDOWN phase
+ unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
+ }
+
+ if ((rs & STOP) == 0) {
+ if (!now) { // check quiescence
+ for (long oldSum = 0L;;) { // repeat until stable
+ WorkQueue[] ws; WorkQueue w; int m, b; long c;
+ long checkSum = ctl;
+ if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
+ return false; // still active workers
+ if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
+ break; // check queues
+ for (int i = 0; i <= m; ++i) {
+ if ((w = ws[i]) != null) {
+ if ((b = w.base) != w.top || w.scanState >= 0 ||
+ w.currentSteal != null) {
+ tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
+ return false; // arrange for recheck
+ }
+ checkSum += b;
+ if ((i & 1) == 0)
+ w.qlock = -1; // try to disable external
+ }
}
+ if (oldSum == (oldSum = checkSum))
+ break;
}
- return true;
}
- if (!now) { // check if idle & no tasks
- WorkQueue[] ws; WorkQueue w;
- if ((int)(c >> AC_SHIFT) + parallelism > 0)
- return false;
- if ((ws = workQueues) != null) {
- for (int i = 0; i < ws.length; ++i) {
- if ((w = ws[i]) != null &&
- (!w.isEmpty() ||
- ((i & 1) != 0 && w.eventCount >= 0))) {
- signalWork(ws, w);
- return false;
- }
- }
+ if ((runState & STOP) == 0) {
+ rs = lockRunState(); // enter STOP phase
+ unlockRunState(rs, (rs & ~RSLOCK) | STOP);
+ }
+ }
+
+ int pass = 0; // 3 passes to help terminate
+ for (long oldSum = 0L;;) { // or until done or stable
+ WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
+ long checkSum = ctl;
+ if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
+ (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
+ if ((runState & TERMINATED) == 0) {
+ rs = lockRunState(); // done
+ unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
+ synchronized (this) { notifyAll(); } // for awaitTermination
}
+ break;
}
- if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
- for (int pass = 0; pass < 3; ++pass) {
- WorkQueue[] ws; WorkQueue w; Thread wt;
- if ((ws = workQueues) != null) {
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- if ((w = ws[i]) != null) {
- w.qlock = -1;
- if (pass > 0) {
- w.cancelAll();
- if (pass > 1 && (wt = w.owner) != null) {
- if (!wt.isInterrupted()) {
- try {
- wt.interrupt();
- } catch (Throwable ignore) {
- }
- }
- U.unpark(wt);
- }
+ for (int i = 0; i <= m; ++i) {
+ if ((w = ws[i]) != null) {
+ checkSum += w.base;
+ w.qlock = -1; // try to disable
+ if (pass > 0) {
+ w.cancelAll(); // clear queue
+ if (pass > 1 && (wt = w.owner) != null) {
+ if (!wt.isInterrupted()) {
+ try { // unblock join
+ wt.interrupt();
+ } catch (Throwable ignore) {
}
}
+ if (w.scanState < 0)
+ U.unpark(wt); // wake up
}
- // Wake up workers parked on event queue
- int i, e; long cc; Thread p;
- while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
- (i = e & SMASK) < n && i >= 0 &&
- (w = ws[i]) != null) {
- long nc = ((long)(w.nextWait & E_MASK) |
- ((cc + AC_UNIT) & AC_MASK) |
- (cc & (TC_MASK|STOP_BIT)));
- if (w.eventCount == (e | INT_SIGN) &&
- U.compareAndSwapLong(this, CTL, cc, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
- w.qlock = -1;
- if ((p = w.parker) != null)
- U.unpark(p);
- }
+ }
+ }
+ }
+ if (checkSum != oldSum) { // unstable
+ oldSum = checkSum;
+ pass = 0;
+ }
+ else if (pass > 3 && pass > m) // can't further help
+ break;
+ else if (++pass > 1) { // try to dequeue
+ long c; int j = 0, sp; // bound attempts
+ while (j++ <= m && (sp = (int)(c = ctl)) != 0)
+ tryRelease(c, ws[sp & m], AC_UNIT);
+ }
+ }
+ return true;
+ }
+
+ // External operations
+
+ /**
+ * Full version of externalPush, handling uncommon cases, as well
+ * as performing secondary initialization upon the first
+ * submission of the first task to the pool. It also detects
+ * first submission by an external thread and creates a new shared
+ * queue if the one at index if empty or contended.
+ *
+ * @param task the task. Caller must ensure non-null.
+ */
+ private void externalSubmit(ForkJoinTask<?> task) {
+ int r; // initialize caller's probe
+ if ((r = ThreadLocalRandom.getProbe()) == 0) {
+ ThreadLocalRandom.localInit();
+ r = ThreadLocalRandom.getProbe();
+ }
+ for (;;) {
+ WorkQueue[] ws; WorkQueue q; int rs, m, k;
+ boolean move = false;
+ if ((rs = runState) < 0) {
+ tryTerminate(false, false); // help terminate
+ throw new RejectedExecutionException();
+ }
+ else if ((rs & STARTED) == 0 || // initialize
+ ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
+ int ns = 0;
+ rs = lockRunState();
+ try {
+ if ((rs & STARTED) == 0) {
+ U.compareAndSwapObject(this, STEALCOUNTER, null,
+ new AtomicLong());
+ // create workQueues array with size a power of two
+ int p = config & SMASK; // ensure at least 2 slots
+ int n = (p > 1) ? p - 1 : 1;
+ n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
+ n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
+ workQueues = new WorkQueue[n];
+ ns = STARTED;
+ }
+ } finally {
+ unlockRunState(rs, (rs & ~RSLOCK) | ns);
+ }
+ }
+ else if ((q = ws[k = r & m & SQMASK]) != null) {
+ if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ ForkJoinTask<?>[] a = q.array;
+ int s = q.top;
+ boolean submitted = false; // initial submission or resizing
+ try { // locked version of push
+ if ((a != null && a.length > s + 1 - q.base) ||
+ (a = q.growArray()) != null) {
+ int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
+ U.putOrderedObject(a, j, task);
+ U.putOrderedInt(q, QTOP, s + 1);
+ submitted = true;
}
+ } finally {
+ U.compareAndSwapInt(q, QLOCK, 1, 0);
+ }
+ if (submitted) {
+ signalWork(ws, q);
+ return;
}
}
+ move = true; // move on failure
+ }
+ else if (((rs = runState) & RSLOCK) == 0) { // create new queue
+ q = new WorkQueue(this, null);
+ q.hint = r;
+ q.config = k | SHARED_QUEUE;
+ q.scanState = INACTIVE;
+ rs = lockRunState(); // publish index
+ if (rs > 0 && (ws = workQueues) != null &&
+ k < ws.length && ws[k] == null)
+ ws[k] = q; // else terminated
+ unlockRunState(rs, rs & ~RSLOCK);
}
+ else
+ move = true; // move if busy
+ if (move)
+ r = ThreadLocalRandom.advanceProbe(r);
}
}
- // external operations on common pool
+ /**
+ * Tries to add the given task to a submission queue at
+ * submitter's current queue. Only the (vastly) most common path
+ * is directly handled in this method, while screening for need
+ * for externalSubmit.
+ *
+ * @param task the task. Caller must ensure non-null.
+ */
+ final void externalPush(ForkJoinTask<?> task) {
+ WorkQueue[] ws; WorkQueue q; int m;
+ int r = ThreadLocalRandom.getProbe();
+ int rs = runState;
+ if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
+ (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
+ U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ ForkJoinTask<?>[] a; int am, n, s;
+ if ((a = q.array) != null &&
+ (am = a.length - 1) > (n = (s = q.top) - q.base)) {
+ int j = ((am & s) << ASHIFT) + ABASE;
+ U.putOrderedObject(a, j, task);
+ U.putOrderedInt(q, QTOP, s + 1);
+ U.putOrderedInt(q, QLOCK, 0);
+ if (n <= 1)
+ signalWork(ws, q);
+ return;
+ }
+ U.compareAndSwapInt(q, QLOCK, 1, 0);
+ }
+ externalSubmit(task);
+ }
/**
- * Returns common pool queue for a thread that has submitted at
- * least one task.
+ * Returns common pool queue for an external thread.
*/
static WorkQueue commonSubmitterQueue() {
- ForkJoinPool p; WorkQueue[] ws; int m, z;
- return ((z = ThreadLocalRandom.getProbe()) != 0 &&
- (p = common) != null &&
- (ws = p.workQueues) != null &&
+ ForkJoinPool p = common;
+ int r = ThreadLocalRandom.getProbe();
+ WorkQueue[] ws; int m;
+ return (p != null && (ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
- ws[m & z & SQMASK] : null;
+ ws[m & r & SQMASK] : null;
}
/**
- * Tries to pop the given task from submitter's queue in common pool.
+ * Performs tryUnpush for an external submitter: Finds queue,
+ * locks if apparently non-empty, validates upon locking, and
+ * adjusts top. Each check can fail but rarely does.
*/
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
- WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
- WorkQueue[] ws = workQueues;
- int z = ThreadLocalRandom.getProbe();
- boolean popped = false;
- if (ws != null && (m = ws.length - 1) >= 0 &&
- (joiner = ws[z & m & SQMASK]) != null &&
- joiner.base != (s = joiner.top) &&
- (a = joiner.array) != null) {
+ WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
+ int r = ThreadLocalRandom.getProbe();
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
+ (w = ws[m & r & SQMASK]) != null &&
+ (a = w.array) != null && (s = w.top) != w.base) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
- if (U.getObject(a, j) == task &&
- U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
- if (joiner.top == s && joiner.array == a &&
+ if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
+ if (w.top == s && w.array == a &&
+ U.getObject(a, j) == task &&
U.compareAndSwapObject(a, j, task, null)) {
- joiner.top = s - 1;
- popped = true;
+ U.putOrderedInt(w, QTOP, s - 1);
+ U.putOrderedInt(w, QLOCK, 0);
+ return true;
}
- joiner.qlock = 0;
+ U.compareAndSwapInt(w, QLOCK, 1, 0);
}
}
- return popped;
+ return false;
}
+ /**
+ * Performs helpComplete for an external submitter.
+ */
final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
- WorkQueue joiner; int m;
- WorkQueue[] ws = workQueues;
- int j = ThreadLocalRandom.getProbe();
- int s = 0;
- if (ws != null && (m = ws.length - 1) >= 0 &&
- (joiner = ws[j & m & SQMASK]) != null && task != null) {
- int scans = m + m + 1;
- long c = 0L; // for stability check
- j |= 1; // poll odd queues
- for (int k = scans; ; j += 2) {
- WorkQueue q;
- if ((s = task.status) < 0)
- break;
- else if (joiner.externalPopAndExecCC(task)) {
- if (--maxTasks <= 0) {
- s = task.status;
- break;
- }
- k = scans;
- }
- else if ((s = task.status) < 0)
- break;
- else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
- if (--maxTasks <= 0) {
- s = task.status;
- break;
- }
- k = scans;
- }
- else if (--k < 0) {
- if (c == (c = ctl))
- break;
- k = scans;
- }
- }
- }
- return s;
+ WorkQueue[] ws; int n;
+ int r = ThreadLocalRandom.getProbe();
+ return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
+ helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
}
// Exported methods
@@ -2447,7 +2533,7 @@ public class ForkJoinPool extends AbstractExecutorService {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
- (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
+ asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
@@ -2478,8 +2564,7 @@ public class ForkJoinPool extends AbstractExecutorService {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
- this.mode = (short)mode;
- this.parallelism = (short)parallelism;
+ this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
@@ -2624,7 +2709,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
+ ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
boolean done = false;
try {
@@ -2670,7 +2755,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public int getParallelism() {
int par;
- return ((par = parallelism) > 0) ? par : 1;
+ return ((par = config & SMASK) > 0) ? par : 1;
}
/**
@@ -2692,7 +2777,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of worker threads
*/
public int getPoolSize() {
- return parallelism + (short)(ctl >>> TC_SHIFT);
+ return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
}
/**
@@ -2702,7 +2787,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return mode == FIFO_QUEUE;
+ return (config & FIFO_QUEUE) != 0;
}
/**
@@ -2733,7 +2818,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = parallelism + (int)(ctl >> AC_SHIFT);
+ int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2749,7 +2834,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
+ return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
/**
@@ -2764,7 +2849,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of steals
*/
public long getStealCount() {
- long count = stealCount;
+ AtomicLong sc = stealCounter;
+ long count = (sc == null) ? 0L : sc.get();
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
@@ -2894,7 +2980,8 @@ public class ForkJoinPool extends AbstractExecutorService {
public String toString() {
// Use a single pass through workQueues to collect counts
long qt = 0L, qs = 0L; int rc = 0;
- long st = stealCount;
+ AtomicLong sc = stealCounter;
+ long st = (sc == null) ? 0L : sc.get();
long c = ctl;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
@@ -2912,16 +2999,16 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- int pc = parallelism;
+ int pc = (config & SMASK);
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
ac = 0;
- String level;
- if ((c & STOP_BIT) != 0)
- level = (tc == 0) ? "Terminated" : "Terminating";
- else
- level = plock < 0 ? "Shutting down" : "Running";
+ int rs = runState;
+ String level = ((rs & TERMINATED) != 0 ? "Terminated" :
+ (rs & STOP) != 0 ? "Terminating" :
+ (rs & SHUTDOWN) != 0 ? "Shutting down" :
+ "Running");
return super.toString() +
"[" + level +
", parallelism = " + pc +
@@ -2983,9 +3070,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- long c = ctl;
- return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) + parallelism <= 0);
+ return (runState & TERMINATED) != 0;
}
/**
@@ -3002,9 +3087,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- long c = ctl;
- return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) + parallelism > 0);
+ int rs = runState;
+ return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
}
/**
@@ -3013,7 +3097,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return plock < 0;
+ return (runState & SHUTDOWN) != 0;
}
/**
@@ -3090,8 +3174,9 @@ public class ForkJoinPool extends AbstractExecutorService {
}
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) {
- ForkJoinTask<?> t; WorkQueue q; int b;
- if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
+ ForkJoinTask<?> t; WorkQueue q; int b, k;
+ if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
+ (b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null)
t.doExec();
@@ -3115,8 +3200,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* in {@link ForkJoinPool}s.
*
* <p>A {@code ManagedBlocker} provides two methods. Method
- * {@code isReleasable} must return {@code true} if blocking is
- * not necessary. Method {@code block} blocks the current thread
+ * {@link #isReleasable} must return {@code true} if blocking is
+ * not necessary. Method {@link #block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
@@ -3185,37 +3270,46 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Blocks in accord with the given blocker. If the current thread
- * is a {@link ForkJoinWorkerThread}, this method possibly
- * arranges for a spare thread to be activated if necessary to
- * ensure sufficient parallelism while the current thread is blocked.
+ * Runs the given possibly blocking task. When {@linkplain
+ * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
+ * method possibly arranges for a spare thread to be activated if
+ * necessary to ensure sufficient parallelism while the current
+ * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
+ *
+ * <p>This method repeatedly calls {@code blocker.isReleasable()} and
+ * {@code blocker.block()} until either method returns {@code true}.
+ * Every call to {@code blocker.block()} is preceded by a call to
+ * {@code blocker.isReleasable()} that returned {@code false}.
*
- * <p>If the caller is not a {@link ForkJoinTask}, this method is
+ * <p>If not running in a ForkJoinPool, this method is
* behaviorally equivalent to
* <pre> {@code
* while (!blocker.isReleasable())
* if (blocker.block())
- * return;
- * }</pre>
+ * break;}</pre>
*
- * If the caller is a {@code ForkJoinTask}, then the pool may
- * first be expanded to ensure parallelism, and later adjusted.
+ * If running in a ForkJoinPool, the pool may first be expanded to
+ * ensure sufficient parallelism available during the call to
+ * {@code blocker.block()}.
*
- * @param blocker the blocker
- * @throws InterruptedException if blocker.block did so
+ * @param blocker the blocker task
+ * @throws InterruptedException if {@code blocker.block()} did so
*/
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
+ ForkJoinPool p;
+ ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
+ WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
- if (p.tryCompensate(p.ctl)) {
+ if (p.tryCompensate(w)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
- p.incrementActiveCount();
+ U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
@@ -3241,15 +3335,18 @@ public class ForkJoinPool extends AbstractExecutorService {
// Unsafe mechanics
private static final sun.misc.Unsafe U;
+ private static final int ABASE;
+ private static final int ASHIFT;
private static final long CTL;
+ private static final long RUNSTATE;
+ private static final long STEALCOUNTER;
private static final long PARKBLOCKER;
- private static final int ABASE;
- private static final int ASHIFT;
- private static final long STEALCOUNT;
- private static final long PLOCK;
- private static final long INDEXSEED;
- private static final long QBASE;
+ private static final long QTOP;
private static final long QLOCK;
+ private static final long QSCANSTATE;
+ private static final long QPARKER;
+ private static final long QCURRENTSTEAL;
+ private static final long QCURRENTJOIN;
static {
// initialize field offsets for CAS etc
@@ -3258,20 +3355,26 @@ public class ForkJoinPool extends AbstractExecutorService {
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
- STEALCOUNT = U.objectFieldOffset
- (k.getDeclaredField("stealCount"));
- PLOCK = U.objectFieldOffset
- (k.getDeclaredField("plock"));
- INDEXSEED = U.objectFieldOffset
- (k.getDeclaredField("indexSeed"));
+ RUNSTATE = U.objectFieldOffset
+ (k.getDeclaredField("runState"));
+ STEALCOUNTER = U.objectFieldOffset
+ (k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
Class<?> wk = WorkQueue.class;
- QBASE = U.objectFieldOffset
- (wk.getDeclaredField("base"));
+ QTOP = U.objectFieldOffset
+ (wk.getDeclaredField("top"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
+ QSCANSTATE = U.objectFieldOffset
+ (wk.getDeclaredField("scanState"));
+ QPARKER = U.objectFieldOffset
+ (wk.getDeclaredField("parker"));
+ QCURRENTSTEAL = U.objectFieldOffset
+ (wk.getDeclaredField("currentSteal"));
+ QCURRENTJOIN = U.objectFieldOffset
+ (wk.getDeclaredField("currentJoin"));
Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
@@ -3282,6 +3385,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new Error(e);
}
+ commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
@@ -3289,7 +3393,7 @@ public class ForkJoinPool extends AbstractExecutorService {
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
- int par = common.parallelism; // report 1 even if threads disabled
+ int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
diff --git a/src/share/classes/java/util/concurrent/ForkJoinTask.java b/src/share/classes/java/util/concurrent/ForkJoinTask.java
index 936bfc23a3..4439c2407d 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -297,15 +297,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
- * Tries to set SIGNAL status unless already completed. Used by
- * ForkJoinPool. Other variants are directly incorporated into
- * externalAwaitDone etc.
+ * If not done, sets SIGNAL status and performs Object.wait(timeout).
+ * This task may or may not be done on exit. Ignores interrupts.
*
- * @return true if successful
+ * @param timeout using Object.wait conventions.
*/
- final boolean trySetSignal() {
- int s = status;
- return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
+ final void internalWait(long timeout) {
+ int s;
+ if ((s = status) >= 0 && // force completer to issue notify
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ try { wait(timeout); } catch (InterruptedException ie) { }
+ else
+ notifyAll();
+ }
+ }
}
/**
@@ -313,35 +320,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return status upon completion
*/
private int externalAwaitDone() {
- int s;
- ForkJoinPool cp = ForkJoinPool.common;
- if ((s = status) >= 0) {
- if (cp != null) {
- if (this instanceof CountedCompleter)
- s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- s = doExec();
- }
- if (s >= 0 && (s = status) >= 0) {
- boolean interrupted = false;
- do {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait();
- } catch (InterruptedException ie) {
- interrupted = true;
- }
+ int s = ((this instanceof CountedCompleter) ? // try helping
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
+ if (s >= 0 && (s = status) >= 0) {
+ boolean interrupted = false;
+ do {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait(0L);
+ } catch (InterruptedException ie) {
+ interrupted = true;
}
- else
- notifyAll();
}
+ else
+ notifyAll();
}
- } while ((s = status) >= 0);
- if (interrupted)
- Thread.currentThread().interrupt();
- }
+ }
+ } while ((s = status) >= 0);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
return s;
}
@@ -351,22 +352,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
- ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted())
throw new InterruptedException();
- if ((s = status) >= 0 && cp != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
- }
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0)
- wait();
- else
- notifyAll();
+ if ((s = status) >= 0 &&
+ (s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
+ 0)) >= 0) {
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(0L);
+ else
+ notifyAll();
+ }
}
}
}
@@ -386,7 +387,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
- wt.pool.awaitJoin(w, this) :
+ wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
@@ -399,7 +400,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ (wt = (ForkJoinWorkerThread)t).pool.
+ awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
@@ -577,7 +579,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (false && e.thrower != Thread.currentThread().getId()) {
+ if (e.thrower != Thread.currentThread().getId()) {
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
@@ -587,13 +589,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
- else if (ps.length == 1 && ps[0] == Throwable.class)
- return (Throwable)(c.newInstance(ex));
+ else if (ps.length == 1 && ps[0] == Throwable.class) {
+ Throwable wx = (Throwable)c.newInstance(ex);
+ return (wx == null) ? ex : wx;
+ }
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
- wx.initCause(ex);
- return wx;
+ if (wx != null) {
+ wx.initCause(ex);
+ return wx;
+ }
}
} catch (Exception ignore) {
}
@@ -1017,67 +1023,40 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ int s;
+ long nanos = unit.toNanos(timeout);
if (Thread.interrupted())
throw new InterruptedException();
- // Messy in part because we measure in nanosecs, but wait in millisecs
- int s; long ms;
- long ns = unit.toNanos(timeout);
- ForkJoinPool cp;
- if ((s = status) >= 0 && ns > 0L) {
- long deadline = System.nanoTime() + ns;
- ForkJoinPool p = null;
- ForkJoinPool.WorkQueue w = null;
+ if ((s = status) >= 0 && nanos > 0L) {
+ long d = System.nanoTime() + nanos;
+ long deadline = (d == 0L) ? 1L : d; // avoid 0
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- p = wt.pool;
- w = wt.workQueue;
- p.helpJoinOnce(w, this); // no retries on failure
- }
- else if ((cp = ForkJoinPool.common) != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
+ s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
}
- boolean canBlock = false;
- boolean interrupted = false;
- try {
- while ((s = status) >= 0) {
- if (w != null && w.qlock < 0)
- cancelIgnoringExceptions(this);
- else if (!canBlock) {
- if (p == null || p.tryCompensate(p.ctl))
- canBlock = true;
- }
- else {
- if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
- U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait(ms);
- } catch (InterruptedException ie) {
- if (p == null)
- interrupted = true;
- }
- }
- else
- notifyAll();
- }
+ else if ((s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ?
+ doExec() : 0)) >= 0) {
+ long ns, ms; // measure in nanosecs, but wait in millisecs
+ while ((s = status) >= 0 &&
+ (ns = deadline - System.nanoTime()) > 0L) {
+ if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(ms); // OK to throw InterruptedException
+ else
+ notifyAll();
}
- if ((s = status) < 0 || interrupted ||
- (ns = deadline - System.nanoTime()) <= 0L)
- break;
}
}
- } finally {
- if (p != null && canBlock)
- p.incrementActiveCount();
}
- if (interrupted)
- throw new InterruptedException();
}
+ if (s >= 0)
+ s = status;
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
diff --git a/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index 404c47cc01..8723f0aac6 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -66,7 +66,7 @@ public class ForkJoinWorkerThread extends Thread {
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
- * requires that we break quite a lot of encapulation (via Unsafe)
+ * requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
@@ -118,7 +118,7 @@ public class ForkJoinWorkerThread extends Thread {
* @return the index number
*/
public int getPoolIndex() {
- return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
+ return workQueue.getPoolIndex();
}
/**
@@ -171,7 +171,7 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
- * Erases ThreadLocals by nulling out Thread maps
+ * Erases ThreadLocals by nulling out Thread maps.
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
@@ -246,8 +246,8 @@ public class ForkJoinWorkerThread extends Thread {
/**
* Returns a new group with the system ThreadGroup (the
- * topmost, parentless group) as parent. Uses Unsafe to
- * traverse Thread group and ThreadGroup parent fields.
+ * topmost, parent-less group) as parent. Uses Unsafe to
+ * traverse Thread.group and ThreadGroup.parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
@@ -274,4 +274,3 @@ public class ForkJoinWorkerThread extends Thread {
}
}
-