Merge pull request #26893 from jrudolph/jr/remove-fjp-copy
act: remove shaded fork join pool implementation
This commit is contained in:
commit
5db334f2b1
17 changed files with 36 additions and 7303 deletions
|
|
@ -4,9 +4,10 @@
|
|||
|
||||
package akka.actor.typed.internal.routing
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.forkjoin.ThreadLocalRandom
|
||||
|
||||
/**
|
||||
* Kept in the behavior, not shared between instances, meant to be stateful.
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
|
@ -1,121 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package akka.dispatch.forkjoin;
|
||||
|
||||
/**
|
||||
* A thread managed by a {@link ForkJoinPool}, which executes
|
||||
* {@link ForkJoinTask}s.
|
||||
* This class is subclassable solely for the sake of adding
|
||||
* functionality -- there are no overridable methods dealing with
|
||||
* scheduling or execution. However, you can override initialization
|
||||
* and termination methods surrounding the main task processing loop.
|
||||
* If you do create such a subclass, you will also need to supply a
|
||||
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
|
||||
* in a {@code ForkJoinPool}.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class ForkJoinWorkerThread extends Thread {
|
||||
/*
|
||||
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
|
||||
* ForkJoinTasks. For explanation, see the internal documentation
|
||||
* of class ForkJoinPool.
|
||||
*
|
||||
* This class just maintains links to its pool and WorkQueue. The
|
||||
* pool field is set immediately upon construction, but the
|
||||
* workQueue field is not set until a call to registerWorker
|
||||
* completes. This leads to a visibility race, that is tolerated
|
||||
* by requiring that the workQueue field is only accessed by the
|
||||
* owning thread.
|
||||
*/
|
||||
|
||||
final ForkJoinPool pool; // the pool this thread works in
|
||||
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
|
||||
|
||||
/**
|
||||
* Creates a ForkJoinWorkerThread operating in the given pool.
|
||||
*
|
||||
* @param pool the pool this thread works in
|
||||
* @throws NullPointerException if pool is null
|
||||
*/
|
||||
protected ForkJoinWorkerThread(ForkJoinPool pool) {
|
||||
// Use a placeholder until a useful name can be set in registerWorker
|
||||
super("aForkJoinWorkerThread");
|
||||
this.pool = pool;
|
||||
this.workQueue = pool.registerWorker(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the pool hosting this thread.
|
||||
*
|
||||
* @return the pool
|
||||
*/
|
||||
public ForkJoinPool getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index number of this thread in its pool. The
|
||||
* returned value ranges from zero to the maximum number of
|
||||
* threads (minus one) that have ever been created in the pool.
|
||||
* This method may be useful for applications that track status or
|
||||
* collect results per-worker rather than per-task.
|
||||
*
|
||||
* @return the index number
|
||||
*/
|
||||
public int getPoolIndex() {
|
||||
return workQueue.poolIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes internal state after construction but before
|
||||
* processing any tasks. If you override this method, you must
|
||||
* invoke {@code super.onStart()} at the beginning of the method.
|
||||
* Initialization requires care: Most fields must have legal
|
||||
* default values, to ensure that attempted accesses from other
|
||||
* threads work correctly even before this thread starts
|
||||
* processing tasks.
|
||||
*/
|
||||
protected void onStart() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs cleanup associated with termination of this worker
|
||||
* thread. If you override this method, you must invoke
|
||||
* {@code super.onTermination} at the end of the overridden method.
|
||||
*
|
||||
* @param exception the exception causing this thread to abort due
|
||||
* to an unrecoverable error, or {@code null} if completed normally
|
||||
*/
|
||||
protected void onTermination(Throwable exception) {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is required to be public, but should never be
|
||||
* called explicitly. It performs the main run loop to execute
|
||||
* {@link ForkJoinTask}s.
|
||||
*/
|
||||
public void run() {
|
||||
Throwable exception = null;
|
||||
try {
|
||||
onStart();
|
||||
pool.runWorker(workQueue);
|
||||
} catch (Throwable ex) {
|
||||
exception = ex;
|
||||
} finally {
|
||||
try {
|
||||
onTermination(exception);
|
||||
} catch (Throwable ex) {
|
||||
if (exception == null)
|
||||
exception = ex;
|
||||
} finally {
|
||||
pool.deregisterWorker(this, exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,164 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package akka.dispatch.forkjoin;
|
||||
|
||||
/**
|
||||
* A recursive resultless {@link ForkJoinTask}. This class
|
||||
* establishes conventions to parameterize resultless actions as
|
||||
* {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
|
||||
* only valid value of type {@code Void}, methods such as {@code join}
|
||||
* always return {@code null} upon completion.
|
||||
*
|
||||
* <p><b>Sample Usages.</b> Here is a simple but complete ForkJoin
|
||||
* sort that sorts a given {@code long[]} array:
|
||||
*
|
||||
* <pre> {@code
|
||||
* static class SortTask extends RecursiveAction {
|
||||
* final long[] array; final int lo, hi;
|
||||
* SortTask(long[] array, int lo, int hi) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
* SortTask(long[] array) { this(array, 0, array.length); }
|
||||
* protected void compute() {
|
||||
* if (hi - lo < THRESHOLD)
|
||||
* sortSequentially(lo, hi);
|
||||
* else {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* invokeAll(new SortTask(array, lo, mid),
|
||||
* new SortTask(array, mid, hi));
|
||||
* merge(lo, mid, hi);
|
||||
* }
|
||||
* }
|
||||
* // implementation details follow:
|
||||
* final static int THRESHOLD = 1000;
|
||||
* void sortSequentially(int lo, int hi) {
|
||||
* Arrays.sort(array, lo, hi);
|
||||
* }
|
||||
* void merge(int lo, int mid, int hi) {
|
||||
* long[] buf = Arrays.copyOfRange(array, lo, mid);
|
||||
* for (int i = 0, j = lo, k = mid; i < buf.length; j++)
|
||||
* array[j] = (k == hi || buf[i] < array[k]) ?
|
||||
* buf[i++] : array[k++];
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* You could then sort {@code anArray} by creating {@code new
|
||||
* SortTask(anArray)} and invoking it in a ForkJoinPool. As a more
|
||||
* concrete simple example, the following task increments each element
|
||||
* of an array:
|
||||
* <pre> {@code
|
||||
* class IncrementTask extends RecursiveAction {
|
||||
* final long[] array; final int lo, hi;
|
||||
* IncrementTask(long[] array, int lo, int hi) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
* protected void compute() {
|
||||
* if (hi - lo < THRESHOLD) {
|
||||
* for (int i = lo; i < hi; ++i)
|
||||
* array[i]++;
|
||||
* }
|
||||
* else {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* invokeAll(new IncrementTask(array, lo, mid),
|
||||
* new IncrementTask(array, mid, hi));
|
||||
* }
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* <p>The following example illustrates some refinements and idioms
|
||||
* that may lead to better performance: RecursiveActions need not be
|
||||
* fully recursive, so long as they maintain the basic
|
||||
* divide-and-conquer approach. Here is a class that sums the squares
|
||||
* of each element of a double array, by subdividing out only the
|
||||
* right-hand-sides of repeated divisions by two, and keeping track of
|
||||
* them with a chain of {@code next} references. It uses a dynamic
|
||||
* threshold based on method {@code getSurplusQueuedTaskCount}, but
|
||||
* counterbalances potential excess partitioning by directly
|
||||
* performing leaf actions on unstolen tasks rather than further
|
||||
* subdividing.
|
||||
*
|
||||
* <pre> {@code
|
||||
* double sumOfSquares(ForkJoinPool pool, double[] array) {
|
||||
* int n = array.length;
|
||||
* Applyer a = new Applyer(array, 0, n, null);
|
||||
* pool.invoke(a);
|
||||
* return a.result;
|
||||
* }
|
||||
*
|
||||
* class Applyer extends RecursiveAction {
|
||||
* final double[] array;
|
||||
* final int lo, hi;
|
||||
* double result;
|
||||
* Applyer next; // keeps track of right-hand-side tasks
|
||||
* Applyer(double[] array, int lo, int hi, Applyer next) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* this.next = next;
|
||||
* }
|
||||
*
|
||||
* double atLeaf(int l, int h) {
|
||||
* double sum = 0;
|
||||
* for (int i = l; i < h; ++i) // perform leftmost base step
|
||||
* sum += array[i] * array[i];
|
||||
* return sum;
|
||||
* }
|
||||
*
|
||||
* protected void compute() {
|
||||
* int l = lo;
|
||||
* int h = hi;
|
||||
* Applyer right = null;
|
||||
* while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
|
||||
* int mid = (l + h) >>> 1;
|
||||
* right = new Applyer(array, mid, h, right);
|
||||
* right.fork();
|
||||
* h = mid;
|
||||
* }
|
||||
* double sum = atLeaf(l, h);
|
||||
* while (right != null) {
|
||||
* if (right.tryUnfork()) // directly calculate if not stolen
|
||||
* sum += right.atLeaf(right.lo, right.hi);
|
||||
* else {
|
||||
* right.join();
|
||||
* sum += right.result;
|
||||
* }
|
||||
* right = right.next;
|
||||
* }
|
||||
* result = sum;
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public abstract class RecursiveAction extends ForkJoinTask<Void> {
|
||||
private static final long serialVersionUID = 5232453952276485070L;
|
||||
|
||||
/**
|
||||
* The main computation performed by this task.
|
||||
*/
|
||||
protected abstract void compute();
|
||||
|
||||
/**
|
||||
* Always returns {@code null}.
|
||||
*
|
||||
* @return {@code null} always
|
||||
*/
|
||||
public final Void getRawResult() { return null; }
|
||||
|
||||
/**
|
||||
* Requires null completion value.
|
||||
*/
|
||||
protected final void setRawResult(Void mustBeNull) { }
|
||||
|
||||
/**
|
||||
* Implements execution conventions for RecursiveActions.
|
||||
*/
|
||||
protected final boolean exec() {
|
||||
compute();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package akka.dispatch.forkjoin;
|
||||
|
||||
/**
|
||||
* A recursive result-bearing {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>For a classic example, here is a task computing Fibonacci numbers:
|
||||
*
|
||||
* <pre> {@code
|
||||
* class Fibonacci extends RecursiveTask<Integer> {
|
||||
* final int n;
|
||||
* Fibonacci(int n) { this.n = n; }
|
||||
* Integer compute() {
|
||||
* if (n <= 1)
|
||||
* return n;
|
||||
* Fibonacci f1 = new Fibonacci(n - 1);
|
||||
* f1.fork();
|
||||
* Fibonacci f2 = new Fibonacci(n - 2);
|
||||
* return f2.compute() + f1.join();
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* However, besides being a dumb way to compute Fibonacci functions
|
||||
* (there is a simple fast linear algorithm that you'd use in
|
||||
* practice), this is likely to perform poorly because the smallest
|
||||
* subtasks are too small to be worthwhile splitting up. Instead, as
|
||||
* is the case for nearly all fork/join applications, you'd pick some
|
||||
* minimum granularity size (for example 10 here) for which you always
|
||||
* sequentially solve rather than subdividing.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
|
||||
private static final long serialVersionUID = 5232453952276485270L;
|
||||
|
||||
/**
|
||||
* The result of the computation.
|
||||
*/
|
||||
V result;
|
||||
|
||||
/**
|
||||
* The main computation performed by this task.
|
||||
*/
|
||||
protected abstract V compute();
|
||||
|
||||
public final V getRawResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
protected final void setRawResult(V value) {
|
||||
result = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements execution conventions for RecursiveTask.
|
||||
*/
|
||||
protected final boolean exec() {
|
||||
result = compute();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,197 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package akka.dispatch.forkjoin;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* A random number generator isolated to the current thread. Like the
|
||||
* global {@link java.util.Random} generator used by the {@link
|
||||
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
|
||||
* with an internally generated seed that may not otherwise be
|
||||
* modified. When applicable, use of {@code ThreadLocalRandom} rather
|
||||
* than shared {@code Random} objects in concurrent programs will
|
||||
* typically encounter much less overhead and contention. Use of
|
||||
* {@code ThreadLocalRandom} is particularly appropriate when multiple
|
||||
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
|
||||
* in parallel in thread pools.
|
||||
*
|
||||
* <p>Usages of this class should typically be of the form:
|
||||
* {@code ThreadLocalRandom.current().nextX(...)} (where
|
||||
* {@code X} is {@code Int}, {@code Long}, etc).
|
||||
* When all usages are of this form, it is never possible to
|
||||
* accidentally share a {@code ThreadLocalRandom} across multiple threads.
|
||||
*
|
||||
* <p>This class also provides additional commonly used bounded random
|
||||
* generation methods.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class ThreadLocalRandom extends Random {
|
||||
// same constants as Random, but must be redeclared because private
|
||||
private static final long multiplier = 0x5DEECE66DL;
|
||||
private static final long addend = 0xBL;
|
||||
private static final long mask = (1L << 48) - 1;
|
||||
|
||||
/**
|
||||
* The random seed. We can't use super.seed.
|
||||
*/
|
||||
private long rnd;
|
||||
|
||||
/**
|
||||
* Initialization flag to permit calls to setSeed to succeed only
|
||||
* while executing the Random constructor. We can't allow others
|
||||
* since it would cause setting seed in one part of a program to
|
||||
* unintentionally impact other usages by the thread.
|
||||
*/
|
||||
boolean initialized;
|
||||
|
||||
// Padding to help avoid memory contention among seed updates in
|
||||
// different TLRs in the common case that they are located near
|
||||
// each other.
|
||||
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
|
||||
|
||||
/**
|
||||
* The actual ThreadLocal
|
||||
*/
|
||||
private static final ThreadLocal<ThreadLocalRandom> localRandom =
|
||||
new ThreadLocal<ThreadLocalRandom>() {
|
||||
protected ThreadLocalRandom initialValue() {
|
||||
return new ThreadLocalRandom();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Constructor called only by localRandom.initialValue.
|
||||
*/
|
||||
ThreadLocalRandom() {
|
||||
super();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current thread's {@code ThreadLocalRandom}.
|
||||
*
|
||||
* @return the current thread's {@code ThreadLocalRandom}
|
||||
*/
|
||||
public static ThreadLocalRandom current() {
|
||||
return localRandom.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws {@code UnsupportedOperationException}. Setting seeds in
|
||||
* this generator is not supported.
|
||||
*
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public void setSeed(long seed) {
|
||||
if (initialized)
|
||||
throw new UnsupportedOperationException();
|
||||
rnd = (seed ^ multiplier) & mask;
|
||||
}
|
||||
|
||||
protected int next(int bits) {
|
||||
rnd = (rnd * multiplier + addend) & mask;
|
||||
return (int) (rnd >>> (48-bits));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
* @return the next value
|
||||
*/
|
||||
public int nextInt(int least, int bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextInt(bound - least) + least;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value
|
||||
* between 0 (inclusive) and the specified value (exclusive).
|
||||
*
|
||||
* @param n the bound on the random number to be returned. Must be
|
||||
* positive.
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if n is not positive
|
||||
*/
|
||||
public long nextLong(long n) {
|
||||
if (n <= 0)
|
||||
throw new IllegalArgumentException("n must be positive");
|
||||
// Divide n by two until small enough for nextInt. On each
|
||||
// iteration (at most 31 of them but usually much less),
|
||||
// randomly choose both whether to include high bit in result
|
||||
// (offset) and whether to continue with the lower vs upper
|
||||
// half (which makes a difference only if odd).
|
||||
long offset = 0;
|
||||
while (n >= Integer.MAX_VALUE) {
|
||||
int bits = next(2);
|
||||
long half = n >>> 1;
|
||||
long nextn = ((bits & 2) == 0) ? half : n - half;
|
||||
if ((bits & 1) == 0)
|
||||
offset += n - nextn;
|
||||
n = nextn;
|
||||
}
|
||||
return offset + nextInt((int) n);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
*/
|
||||
public long nextLong(long least, long bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextLong(bound - least) + least;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed {@code double} value
|
||||
* between 0 (inclusive) and the specified value (exclusive).
|
||||
*
|
||||
* @param n the bound on the random number to be returned. Must be
|
||||
* positive.
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if n is not positive
|
||||
*/
|
||||
public double nextDouble(double n) {
|
||||
if (n <= 0)
|
||||
throw new IllegalArgumentException("n must be positive");
|
||||
return nextDouble() * n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
*/
|
||||
public double nextDouble(double least, double bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextDouble() * (bound - least) + least;
|
||||
}
|
||||
|
||||
private static final long serialVersionUID = -5851777807851030925L;
|
||||
}
|
||||
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package akka.dispatch.forkjoin;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* A {@link BlockingQueue} in which producers may wait for consumers
|
||||
* to receive elements. A {@code TransferQueue} may be useful for
|
||||
* example in message passing applications in which producers
|
||||
* sometimes (using method {@link #transfer}) await receipt of
|
||||
* elements by consumers invoking {@code take} or {@code poll}, while
|
||||
* at other times enqueue elements (via method {@code put}) without
|
||||
* waiting for receipt.
|
||||
* {@linkplain #tryTransfer(Object) Non-blocking} and
|
||||
* {@linkplain #tryTransfer(Object,long,TimeUnit) time-out} versions of
|
||||
* {@code tryTransfer} are also available.
|
||||
* A {@code TransferQueue} may also be queried, via {@link
|
||||
* #hasWaitingConsumer}, whether there are any threads waiting for
|
||||
* items, which is a converse analogy to a {@code peek} operation.
|
||||
*
|
||||
* <p>Like other blocking queues, a {@code TransferQueue} may be
|
||||
* capacity bounded. If so, an attempted transfer operation may
|
||||
* initially block waiting for available space, and/or subsequently
|
||||
* block waiting for reception by a consumer. Note that in a queue
|
||||
* with zero capacity, such as {@link SynchronousQueue}, {@code put}
|
||||
* and {@code transfer} are effectively synonymous.
|
||||
*
|
||||
* <p>This interface is a member of the
|
||||
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
|
||||
* Java Collections Framework</a>.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
* @param <E> the type of elements held in this collection
|
||||
*/
|
||||
public interface TransferQueue<E> extends BlockingQueue<E> {
|
||||
/**
|
||||
* Transfers the element to a waiting consumer immediately, if possible.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* otherwise returning {@code false} without enqueuing the element.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @return {@code true} if the element was transferred, else
|
||||
* {@code false}
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
boolean tryTransfer(E e);
|
||||
|
||||
/**
|
||||
* Transfers the element to a consumer, waiting if necessary to do so.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* else waits until the element is received by a consumer.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @throws InterruptedException if interrupted while waiting,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
void transfer(E e) throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Transfers the element to a consumer if it is possible to do so
|
||||
* before the timeout elapses.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* else waits until the element is received by a consumer,
|
||||
* returning {@code false} if the specified wait time elapses
|
||||
* before the element can be transferred.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @param timeout how long to wait before giving up, in units of
|
||||
* {@code unit}
|
||||
* @param unit a {@code TimeUnit} determining how to interpret the
|
||||
* {@code timeout} parameter
|
||||
* @return {@code true} if successful, or {@code false} if
|
||||
* the specified waiting time elapses before completion,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws InterruptedException if interrupted while waiting,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
boolean tryTransfer(E e, long timeout, TimeUnit unit)
|
||||
throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Returns {@code true} if there is at least one consumer waiting
|
||||
* to receive an element via {@link #take} or
|
||||
* timed {@link #poll(long,TimeUnit) poll}.
|
||||
* The return value represents a momentary state of affairs.
|
||||
*
|
||||
* @return {@code true} if there is at least one waiting consumer
|
||||
*/
|
||||
boolean hasWaitingConsumer();
|
||||
|
||||
/**
|
||||
* Returns an estimate of the number of consumers waiting to
|
||||
* receive elements via {@link #take} or timed
|
||||
* {@link #poll(long,TimeUnit) poll}. The return value is an
|
||||
* approximation of a momentary state of affairs, that may be
|
||||
* inaccurate if consumers have completed or given up waiting.
|
||||
* The value may be useful for monitoring and heuristics, but
|
||||
* not for synchronization control. Implementations of this
|
||||
* method are likely to be noticeably slower than those for
|
||||
* {@link #hasWaitingConsumer}.
|
||||
*
|
||||
* @return the number of consumers waiting to receive elements
|
||||
*/
|
||||
int getWaitingConsumerCount();
|
||||
}
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
/**
|
||||
* Preview versions of classes targeted for Java 7. Includes a
|
||||
* fine-grained parallel computation framework: ForkJoinTasks and
|
||||
* their related support classes provide a very efficient basis for
|
||||
* obtaining platform-independent parallel speed-ups of
|
||||
* computation-intensive operations. They are not a full substitute
|
||||
* for the kinds of arbitrary processing supported by Executors or
|
||||
* Threads. However, when applicable, they typically provide
|
||||
* significantly greater performance on multiprocessor platforms.
|
||||
*
|
||||
* <p>Candidates for fork/join processing mainly include those that
|
||||
* can be expressed using parallel divide-and-conquer techniques: To
|
||||
* solve a problem, break it in two (or more) parts, and then solve
|
||||
* those parts in parallel, continuing on in this way until the
|
||||
* problem is too small to be broken up, so is solved directly. The
|
||||
* underlying <em>work-stealing</em> framework makes subtasks
|
||||
* available to other threads (normally one per CPU), that help
|
||||
* complete the tasks. In general, the most efficient ForkJoinTasks
|
||||
* are those that directly implement this algorithmic design pattern.
|
||||
*/
|
||||
package akka.dispatch.forkjoin;
|
||||
|
|
@ -21,3 +21,24 @@ ProblemFilters.exclude[MissingClassProblem]("akka.actor.dungeon.UndefinedUidActo
|
|||
|
||||
# Protect internals against starvation #23576
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.dispatch.Dispatchers.this")
|
||||
|
||||
# Remove internal Akka fork of FJP, was meant-to-be internal
|
||||
ProblemFilters.exclude[Problem]("akka.dispatch.forkjoin.*")
|
||||
|
||||
# Consequences of removing the internal FJP is that class hierarchies changes. Many of those APIs are internal, others are not
|
||||
# but probably not meant to be used standalone so it's probably ok. The changes would probably only be observable if you
|
||||
# also referenced classes from akka.dispatch.forkjoin
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.threadFactory")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.this")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.this")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator.validate")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.MonitorableThreadFactory")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.MonitorableThreadFactory.newThread")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#AkkaForkJoinPool.this")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#AkkaForkJoinPool.this")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.Mailbox")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.BalancingDispatcher$SharingMailbox")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.MonitorableThreadFactory#AkkaForkJoinWorkerThread.this")
|
||||
|
|
|
|||
|
|
@ -443,7 +443,7 @@ akka {
|
|||
}
|
||||
|
||||
# This will be used if you have set "executor = "fork-join-executor""
|
||||
# Underlying thread pool implementation is akka.dispatch.forkjoin.ForkJoinPool
|
||||
# Underlying thread pool implementation is java.util.concurrent.ForkJoinPool
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 8
|
||||
|
|
|
|||
|
|
@ -4,9 +4,7 @@
|
|||
|
||||
package akka.dispatch
|
||||
|
||||
import akka.dispatch.forkjoin.{ ForkJoinPool, ForkJoinTask }
|
||||
import java.util.concurrent.ThreadFactory
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }
|
||||
import com.typesafe.config.Config
|
||||
|
||||
object ForkJoinExecutorConfigurator {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import com.typesafe.config.Config
|
|||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import akka.dispatch.forkjoin.ForkJoinTask
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,12 +7,13 @@ package akka.dispatch
|
|||
import java.util.Collection
|
||||
import scala.concurrent.{ BlockContext, CanAwait }
|
||||
import scala.concurrent.duration.Duration
|
||||
import akka.dispatch.forkjoin._
|
||||
import java.util.concurrent.{
|
||||
ArrayBlockingQueue,
|
||||
BlockingQueue,
|
||||
Callable,
|
||||
ExecutorService,
|
||||
ForkJoinPool,
|
||||
ForkJoinWorkerThread,
|
||||
LinkedBlockingQueue,
|
||||
RejectedExecutionException,
|
||||
RejectedExecutionHandler,
|
||||
|
|
|
|||
|
|
@ -145,10 +145,10 @@ java.lang.Exception: I failed!
|
|||
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
|
||||
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
|
||||
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
|
||||
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
|
||||
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
|
||||
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
|
||||
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
|
||||
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:260)
|
||||
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
|
||||
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
|
||||
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
|
||||
```
|
||||
|
||||
We see that after failure the supervised actor is stopped and immediately restarted. We also see a log entry reporting the exception that was handled, in this case, our test exception. In this example we used `preStart()` and `postStop()` hooks
|
||||
|
|
|
|||
|
|
@ -78,6 +78,11 @@ the default dispatcher has been adjusted down to `1.0` which means the number of
|
|||
@ref[Artery TCP](../remoting-artery.md) is now the default remoting implementation.
|
||||
Classic remoting has been deprecated and will be removed in `2.7.0`.
|
||||
|
||||
## Akka now uses Fork Join Pool from JDK
|
||||
|
||||
Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of
|
||||
keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be used. The Akka FJP copy was removed.
|
||||
|
||||
<a id="classic-to-artery"></a>
|
||||
### Migrating from classic remoting to Artery
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue