Updating to latest version of FJP

This commit is contained in:
Viktor Klang 2012-01-27 19:57:24 +01:00
parent b045383a72
commit dcde34f8a0

View file

@ -6,8 +6,6 @@
package akka.jsr166y;
import akka.util.Unsafe;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -25,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import akka.util.Unsafe;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@ -197,20 +196,22 @@ public class ForkJoinPool extends AbstractExecutorService {
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
* processing). Instead, we loosely associate (via hashing)
* submission queues with submitting threads, and randomly scan
* these queues as well when looking for work. In essence,
* submitters act like workers except that they never take tasks,
* and they are multiplexed on to a finite number of shared work
* queues. However, classes are set up so that future extensions
* could allow submitters to optionally help perform tasks as
* well. Pool submissions from internal workers are also allowed,
* but use randomized rather than thread-hashed queue indices to
* avoid imbalance. Insertion of tasks in shared mode requires a
* lock (mainly to protect in the case of resizing) but we use
* only a simple spinlock (using bits in field runState), because
* submitters encountering a busy queue try or create others so
* never block.
* processing). Instead, we loosely associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocal Submitter class contains a value initially used as
* a hash code for choosing existing queues, but may be randomly
* repositioned upon contention with other submitters. In
* essence, submitters act like workers except that they never
* take tasks, and they are multiplexed on to a finite number of
* shared work queues. However, classes are set up so that future
* extensions could allow submitters to optionally help perform
* tasks as well. Pool submissions from internal workers are also
* allowed, but use randomized rather than thread-hashed queue
* indices to avoid imbalance. Insertion of tasks in shared mode
* requires a lock (mainly to protect in the case of resizing) but
* we use only a simple spinlock (using bits in field runState),
* because submitters encountering a busy queue try or create
* others so never block.
*
* Management.
* ==========
@ -1087,27 +1088,58 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
* Computes a hash code for the given thread. This method is
* expected to provide higher-quality hash codes than those using
* method hashCode().
<<<<<<< ForkJoinPool.java
* Per-thread records for (typically non-FJ) threads that submit
* to pools. Cureently holds only psuedo-random seed / index that
* is used to chose submission queues in method doSubmit. In the
* future, this may incorporate a means to implement different
* task rejection and resubmission policies.
*/
static final int hashThread(Thread t) {
long id = (t == null) ? 0L : t.getId(); // Use MurmurHash of thread id
int h = (int)id ^ (int)(id >>> 32);
h ^= h >>> 16;
h *= 0x85ebca6b;
h ^= h >>> 13;
h *= 0xc2b2ae35;
return h ^ (h >>> 16);
static final class Submitter {
int seed; // seed for random submission queue selection
// Heuristic padding to ameliorate unfortunate memory placements
int p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
Submitter() {
// Use identityHashCode, forced negative, for seed
seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
}
/**
* Computes next value for random probes. Like method
* WorkQueue.nextSeed, this is manually inlined in several
* usages to avoid writes inside busy loops.
*/
final int nextSeed() {
int r = seed;
r ^= r << 13;
r ^= r >>> 17;
return seed = r ^= r << 5;
}
}
/** ThreadLocal class for Submitters */
static final class ThreadSubmitter extends ThreadLocal<Submitter> {
public Submitter initialValue() { return new Submitter(); }
}
/**
* Per-thread submission bookeeping. Shared across all pools
* to reduce ThreadLocal pollution and because random motion
* to avoid contention in one pool is likely to hold for others.
*/
static final ThreadSubmitter submitters = new ThreadSubmitter();
/**
* Top-level runloop for workers
*/
final void runWorker(ForkJoinWorkerThread wt) {
// Initialize queue array and seed in this thread
WorkQueue w = wt.workQueue;
w.growArray(false); // Initialize queue array and seed in this thread
w.seed = hashThread(Thread.currentThread()) | (1 << 31); // force < 0
w.growArray(false);
// Same initial hash as Submitters
w.seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
do {} while (w.runTask(scan(w)));
}
@ -1220,6 +1252,37 @@ public class ForkJoinPool extends AbstractExecutorService {
U.throwException(ex);
}
/**
* Tries to add and register a new queue at the given index.
*
* @param idx the workQueues array index to register the queue
* @return the queue, or null if could not add because could
* not acquire lock or idx is unusable
*/
private WorkQueue tryAddSharedQueue(int idx) {
WorkQueue q = null;
ReentrantLock lock = this.lock;
if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) {
// create queue outside of lock but only if apparently free
WorkQueue nq = new WorkQueue(null, SHARED_QUEUE);
if (lock.tryLock()) {
try {
WorkQueue[] ws = workQueues;
if (ws != null && idx < ws.length) {
if ((q = ws[idx]) == null) {
int rs; // update runState seq
ws[idx] = q = nq;
runState = (((rs = runState) & SHUTDOWN) |
((rs + RS_SEQ) & ~SHUTDOWN));
}
}
} finally {
lock.unlock();
}
}
}
return q;
}
// Maintaining ctl counts
@ -1322,73 +1385,35 @@ public class ForkJoinPool extends AbstractExecutorService {
// Submissions
/**
* Unless shutting down, adds the given task to some submission
* queue; using a randomly chosen queue index if the caller is a
* ForkJoinWorkerThread, else one based on caller thread's hash
* code. If no queue exists at the index, one is created. If the
* queue is busy, another is chosen by sweeping through the queues
* array.
* Unless shutting down, adds the given task to a submission queue
* at submitter's current queue index. If no queue exists at the
* index, one is created unless pool lock is busy. If the queue
* and/or lock are busy, another index is randomly chosen.
*/
private void doSubmit(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
Thread t = Thread.currentThread();
int r = ((t instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.nextSeed() : hashThread(t));
for (;;) {
Submitter s = submitters.get();
for (int r = s.seed;;) {
WorkQueue q; int k;
int rs = runState, m = rs & SMASK;
int j = r &= (m & ~1); // even numbered queues
WorkQueue[] ws = workQueues;
if (rs < 0 || ws == null)
throw new RejectedExecutionException(); // shutting down
if (ws.length > m) { // consistency check
for (WorkQueue q;;) { // circular sweep
if (((q = ws[j]) != null ||
(q = tryAddSharedQueue(j)) != null) &&
q.trySharedPush(task)) {
signalWork();
return;
}
if ((j = (j + 2) & m) == r) {
Thread.yield(); // all queues busy
break;
}
}
if (rs < 0 || ws == null) // shutting down
throw new RejectedExecutionException();
if (ws.length > m && // k must be at index
((q = ws[k = (r << 1) & m]) != null ||
(q = tryAddSharedQueue(k)) != null) &&
q.trySharedPush(task)) {
signalWork();
return;
}
r ^= r << 13; // xorshift seed to new position
r ^= r >>> 17;
if (((s.seed = r ^= r << 5) & m) == 0)
Thread.yield(); // occasionally yield if busy
}
}
/**
* Tries to add and register a new queue at the given index.
*
* @param idx the workQueues array index to register the queue
* @return the queue, or null if could not add because could
* not acquire lock or idx is unusable
*/
private WorkQueue tryAddSharedQueue(int idx) {
WorkQueue q = null;
ReentrantLock lock = this.lock;
if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) {
// create queue outside of lock but only if apparently free
WorkQueue nq = new WorkQueue(null, SHARED_QUEUE);
if (lock.tryLock()) {
try {
WorkQueue[] ws = workQueues;
if (ws != null && idx < ws.length) {
if ((q = ws[idx]) == null) {
int rs; // update runState seq
ws[idx] = q = nq;
runState = (((rs = runState) & SHUTDOWN) |
((rs + RS_SEQ) & ~SHUTDOWN));
}
}
} finally {
lock.unlock();
}
}
}
return q;
}
// Scanning for tasks
@ -2627,4 +2652,5 @@ public class ForkJoinPool extends AbstractExecutorService {
private static sun.misc.Unsafe getUnsafe() {
return Unsafe.instance;
}
}