Merge pull request #22560 from akka/wip-18262-fjp-patriknw

Embed FJP
This commit is contained in:
Patrik Nordwall 2017-03-16 22:25:39 +01:00 committed by GitHub
commit bece5e3fd0
17 changed files with 7535 additions and 117 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,121 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.dispatch.forkjoin;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
* in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Returns the pool hosting this thread.
*
* @return the pool
*/
public ForkJoinPool getPool() {
return pool;
}
/**
* Returns the index number of this thread in its pool. The
* returned value ranges from zero to the maximum number of
* threads (minus one) that have ever been created in the pool.
* This method may be useful for applications that track status or
* collect results per-worker rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
return workQueue.poolIndex;
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
}
/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,164 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.dispatch.forkjoin;
/**
* A recursive resultless {@link ForkJoinTask}. This class
* establishes conventions to parameterize resultless actions as
* {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
* only valid value of type {@code Void}, methods such as {@code join}
* always return {@code null} upon completion.
*
* <p><b>Sample Usages.</b> Here is a simple but complete ForkJoin
* sort that sorts a given {@code long[]} array:
*
* <pre> {@code
* static class SortTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* SortTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* SortTask(long[] array) { this(array, 0, array.length); }
* protected void compute() {
* if (hi - lo < THRESHOLD)
* sortSequentially(lo, hi);
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new SortTask(array, lo, mid),
* new SortTask(array, mid, hi));
* merge(lo, mid, hi);
* }
* }
* // implementation details follow:
* final static int THRESHOLD = 1000;
* void sortSequentially(int lo, int hi) {
* Arrays.sort(array, lo, hi);
* }
* void merge(int lo, int mid, int hi) {
* long[] buf = Arrays.copyOfRange(array, lo, mid);
* for (int i = 0, j = lo, k = mid; i < buf.length; j++)
* array[j] = (k == hi || buf[i] < array[k]) ?
* buf[i++] : array[k++];
* }
* }}</pre>
*
* You could then sort {@code anArray} by creating {@code new
* SortTask(anArray)} and invoking it in a ForkJoinPool. As a more
* concrete simple example, the following task increments each element
* of an array:
* <pre> {@code
* class IncrementTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* IncrementTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* protected void compute() {
* if (hi - lo < THRESHOLD) {
* for (int i = lo; i < hi; ++i)
* array[i]++;
* }
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new IncrementTask(array, lo, mid),
* new IncrementTask(array, mid, hi));
* }
* }
* }}</pre>
*
* <p>The following example illustrates some refinements and idioms
* that may lead to better performance: RecursiveActions need not be
* fully recursive, so long as they maintain the basic
* divide-and-conquer approach. Here is a class that sums the squares
* of each element of a double array, by subdividing out only the
* right-hand-sides of repeated divisions by two, and keeping track of
* them with a chain of {@code next} references. It uses a dynamic
* threshold based on method {@code getSurplusQueuedTaskCount}, but
* counterbalances potential excess partitioning by directly
* performing leaf actions on unstolen tasks rather than further
* subdividing.
*
* <pre> {@code
* double sumOfSquares(ForkJoinPool pool, double[] array) {
* int n = array.length;
* Applyer a = new Applyer(array, 0, n, null);
* pool.invoke(a);
* return a.result;
* }
*
* class Applyer extends RecursiveAction {
* final double[] array;
* final int lo, hi;
* double result;
* Applyer next; // keeps track of right-hand-side tasks
* Applyer(double[] array, int lo, int hi, Applyer next) {
* this.array = array; this.lo = lo; this.hi = hi;
* this.next = next;
* }
*
* double atLeaf(int l, int h) {
* double sum = 0;
* for (int i = l; i < h; ++i) // perform leftmost base step
* sum += array[i] * array[i];
* return sum;
* }
*
* protected void compute() {
* int l = lo;
* int h = hi;
* Applyer right = null;
* while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
* int mid = (l + h) >>> 1;
* right = new Applyer(array, mid, h, right);
* right.fork();
* h = mid;
* }
* double sum = atLeaf(l, h);
* while (right != null) {
* if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi);
* else {
* right.join();
* sum += right.result;
* }
* right = right.next;
* }
* result = sum;
* }
* }}</pre>
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }
/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}

View file

@ -0,0 +1,68 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.dispatch.forkjoin;
/**
* A recursive result-bearing {@link ForkJoinTask}.
*
* <p>For a classic example, here is a task computing Fibonacci numbers:
*
* <pre> {@code
* class Fibonacci extends RecursiveTask<Integer> {
* final int n;
* Fibonacci(int n) { this.n = n; }
* Integer compute() {
* if (n <= 1)
* return n;
* Fibonacci f1 = new Fibonacci(n - 1);
* f1.fork();
* Fibonacci f2 = new Fibonacci(n - 2);
* return f2.compute() + f1.join();
* }
* }}</pre>
*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}

View file

@ -0,0 +1,197 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.dispatch.forkjoin;
import java.util.Random;
/**
* A random number generator isolated to the current thread. Like the
* global {@link java.util.Random} generator used by the {@link
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
* with an internally generated seed that may not otherwise be
* modified. When applicable, use of {@code ThreadLocalRandom} rather
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
* in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
* {@code X} is {@code Int}, {@code Long}, etc).
* When all usages are of this form, it is never possible to
* accidentally share a {@code ThreadLocalRandom} across multiple threads.
*
* <p>This class also provides additional commonly used bounded random
* generation methods.
*
* @since 1.7
* @author Doug Lea
*/
public class ThreadLocalRandom extends Random {
// same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
/**
* The random seed. We can't use super.seed.
*/
private long rnd;
/**
* Initialization flag to permit calls to setSeed to succeed only
* while executing the Random constructor. We can't allow others
* since it would cause setting seed in one part of a program to
* unintentionally impact other usages by the thread.
*/
boolean initialized;
// Padding to help avoid memory contention among seed updates in
// different TLRs in the common case that they are located near
// each other.
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};
/**
* Constructor called only by localRandom.initialValue.
*/
ThreadLocalRandom() {
super();
initialized = true;
}
/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
return localRandom.get();
}
/**
* Throws {@code UnsupportedOperationException}. Setting seeds in
* this generator is not supported.
*
* @throws UnsupportedOperationException always
*/
public void setSeed(long seed) {
if (initialized)
throw new UnsupportedOperationException();
rnd = (seed ^ multiplier) & mask;
}
protected int next(int bits) {
rnd = (rnd * multiplier + addend) & mask;
return (int) (rnd >>> (48-bits));
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @throws IllegalArgumentException if least greater than or equal
* to bound
* @return the next value
*/
public int nextInt(int least, int bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextInt(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public long nextLong(long n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
// Divide n by two until small enough for nextInt. On each
// iteration (at most 31 of them but usually much less),
// randomly choose both whether to include high bit in result
// (offset) and whether to continue with the lower vs upper
// half (which makes a difference only if odd).
long offset = 0;
while (n >= Integer.MAX_VALUE) {
int bits = next(2);
long half = n >>> 1;
long nextn = ((bits & 2) == 0) ? half : n - half;
if ((bits & 1) == 0)
offset += n - nextn;
n = nextn;
}
return offset + nextInt((int) n);
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public long nextLong(long least, long bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextLong(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed {@code double} value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public double nextDouble(double n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
return nextDouble() * n;
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public double nextDouble(double least, double bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextDouble() * (bound - least) + least;
}
private static final long serialVersionUID = -5851777807851030925L;
}

View file

@ -0,0 +1,133 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.dispatch.forkjoin;
import java.util.concurrent.*;
/**
* A {@link BlockingQueue} in which producers may wait for consumers
* to receive elements. A {@code TransferQueue} may be useful for
* example in message passing applications in which producers
* sometimes (using method {@link #transfer}) await receipt of
* elements by consumers invoking {@code take} or {@code poll}, while
* at other times enqueue elements (via method {@code put}) without
* waiting for receipt.
* {@linkplain #tryTransfer(Object) Non-blocking} and
* {@linkplain #tryTransfer(Object,long,TimeUnit) time-out} versions of
* {@code tryTransfer} are also available.
* A {@code TransferQueue} may also be queried, via {@link
* #hasWaitingConsumer}, whether there are any threads waiting for
* items, which is a converse analogy to a {@code peek} operation.
*
* <p>Like other blocking queues, a {@code TransferQueue} may be
* capacity bounded. If so, an attempted transfer operation may
* initially block waiting for available space, and/or subsequently
* block waiting for reception by a consumer. Note that in a queue
* with zero capacity, such as {@link SynchronousQueue}, {@code put}
* and {@code transfer} are effectively synonymous.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.7
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface TransferQueue<E> extends BlockingQueue<E> {
/**
* Transfers the element to a waiting consumer immediately, if possible.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* otherwise returning {@code false} without enqueuing the element.
*
* @param e the element to transfer
* @return {@code true} if the element was transferred, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e);
/**
* Transfers the element to a consumer, waiting if necessary to do so.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else waits until the element is received by a consumer.
*
* @param e the element to transfer
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not left enqueued
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void transfer(E e) throws InterruptedException;
/**
* Transfers the element to a consumer if it is possible to do so
* before the timeout elapses.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else waits until the element is received by a consumer,
* returning {@code false} if the specified wait time elapses
* before the element can be transferred.
*
* @param e the element to transfer
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before completion,
* in which case the element is not left enqueued
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not left enqueued
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Returns {@code true} if there is at least one consumer waiting
* to receive an element via {@link #take} or
* timed {@link #poll(long,TimeUnit) poll}.
* The return value represents a momentary state of affairs.
*
* @return {@code true} if there is at least one waiting consumer
*/
boolean hasWaitingConsumer();
/**
* Returns an estimate of the number of consumers waiting to
* receive elements via {@link #take} or timed
* {@link #poll(long,TimeUnit) poll}. The return value is an
* approximation of a momentary state of affairs, that may be
* inaccurate if consumers have completed or given up waiting.
* The value may be useful for monitoring and heuristics, but
* not for synchronization control. Implementations of this
* method are likely to be noticeably slower than those for
* {@link #hasWaitingConsumer}.
*
* @return the number of consumers waiting to receive elements
*/
int getWaitingConsumerCount();
}

View file

@ -0,0 +1,28 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
/**
* Preview versions of classes targeted for Java 7. Includes a
* fine-grained parallel computation framework: ForkJoinTasks and
* their related support classes provide a very efficient basis for
* obtaining platform-independent parallel speed-ups of
* computation-intensive operations. They are not a full substitute
* for the kinds of arbitrary processing supported by Executors or
* Threads. However, when applicable, they typically provide
* significantly greater performance on multiprocessor platforms.
*
* <p>Candidates for fork/join processing mainly include those that
* can be expressed using parallel divide-and-conquer techniques: To
* solve a problem, break it in two (or more) parts, and then solve
* those parts in parallel, continuing on in this way until the
* problem is too small to be broken up, so is solved directly. The
* underlying <em>work-stealing</em> framework makes subtasks
* available to other threads (normally one per CPU), that help
* complete the tasks. In general, the most efficient ForkJoinTasks
* are those that directly implement this algorithmic design pattern.
*/
package akka.dispatch.forkjoin;

View file

@ -351,7 +351,7 @@ akka {
}
# This will be used if you have set "executor = "fork-join-executor""
# Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool
# Underlying thread pool implementation is akka.dispatch.forkjoin.ForkJoinPool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8

View file

@ -16,7 +16,6 @@ import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinTask }
import scala.util.control.NonFatal
final case class Envelope private (val message: Any, val sender: ActorRef)
@ -380,94 +379,6 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
}
object ForkJoinExecutorConfigurator {
/**
* INTERNAL AKKA USAGE ONLY
*/
final class AkkaForkJoinPool(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics {
def this(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true)
override def execute(r: Runnable): Unit =
if (r ne null)
super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
else
throw new NullPointerException("Runnable was null")
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
/**
* INTERNAL AKKA USAGE ONLY
*/
@SerialVersionUID(1L)
final class AkkaForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case ie: InterruptedException
Thread.currentThread.interrupt()
false
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some some.uncaughtException(t, anything)
}
throw anything
}
}
}
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct
case x throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
}
class ForkJoinExecutorServiceFactory(
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean) extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory
// add the dispatcher id to the thread names
m.withName(m.name + "-" + id)
case other other
}
val asyncMode = config.getString("task-peeking-mode") match {
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}
new ForkJoinExecutorServiceFactory(
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode)
}
}
class DefaultExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites, fallback: ExecutorServiceConfigurator) extends ExecutorServiceConfigurator(config, prerequisites) {
val provider: ExecutorServiceFactoryProvider =
prerequisites.defaultExecutionContext match {

View file

@ -0,0 +1,97 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.dispatch
import akka.dispatch.forkjoin.{ ForkJoinPool, ForkJoinTask }
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ExecutorService
import com.typesafe.config.Config
object ForkJoinExecutorConfigurator {
/**
* INTERNAL AKKA USAGE ONLY
*/
final class AkkaForkJoinPool(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics {
def this(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true)
override def execute(r: Runnable): Unit =
if (r ne null)
super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
else
throw new NullPointerException("Runnable was null")
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
/**
* INTERNAL AKKA USAGE ONLY
*/
@SerialVersionUID(1L)
final class AkkaForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case ie: InterruptedException
Thread.currentThread.interrupt()
false
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some some.uncaughtException(t, anything)
}
throw anything
}
}
}
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct
case x throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
}
class ForkJoinExecutorServiceFactory(
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean) extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory
// add the dispatcher id to the thread names
m.withName(m.name + "-" + id)
case other other
}
val asyncMode = config.getString("task-peeking-mode") match {
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}
new ForkJoinExecutorServiceFactory(
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode)
}
}

View file

@ -17,7 +17,7 @@ import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import akka.dispatch.forkjoin.ForkJoinTask
import scala.util.control.NonFatal
/**

View file

@ -7,7 +7,7 @@ package akka.dispatch
import java.util.Collection
import scala.concurrent.{ BlockContext, CanAwait }
import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin._
import akka.dispatch.forkjoin._
import java.util.concurrent.{
ArrayBlockingQueue,
BlockingQueue,

View file

@ -9,6 +9,7 @@ import org.openjdk.jmh.annotations._
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.annotation.tailrec
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@ -19,10 +20,10 @@ import scala.concurrent.Await
class ForkJoinActorBenchmark {
import ForkJoinActorBenchmark._
@Param(Array("1", "5"))
@Param(Array("5"))
var tpt = 0
@Param(Array("1", "4"))
@Param(Array("1"))
var threads = ""
implicit var system: ActorSystem = _
@ -54,25 +55,104 @@ class ForkJoinActorBenchmark {
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong(): Unit = {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
var pingPongActors: Vector[(ActorRef, ActorRef)] = null
var pingPongLessActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null
var pingPongSameNumberOfActorsAsCoresActors: Vector[(ActorRef, ActorRef)] = null
var pingPongMoreActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null
ping.tell(message, pong)
@Setup(Level.Invocation)
def setupActors(): Unit = {
pingPongActors = startActors(1)
pingPongLessActorsThanCoresActors = startActors(lessThanCoresActorPairs)
pingPongSameNumberOfActorsAsCoresActors = startActors(cores / 2)
pingPongMoreActorsThanCoresActors = startActors(moreThanCoresActorPairs)
}
val p = TestProbe()
p.watch(ping)
p.expectTerminated(ping, timeout)
p.watch(pong)
p.expectTerminated(pong, timeout)
@TearDown(Level.Invocation)
def tearDownActors(): Unit = {
stopActors(pingPongActors)
stopActors(pingPongLessActorsThanCoresActors)
stopActors(pingPongSameNumberOfActorsAsCoresActors)
stopActors(pingPongMoreActorsThanCoresActors)
}
def startActors(n: Int): Vector[(ActorRef, ActorRef)] = {
for {
i <- (1 to n).toVector
} yield {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
(ping, pong)
}
}
def stopActors(refs: Vector[(ActorRef, ActorRef)]): Unit = {
if (refs ne null) {
refs.foreach {
case (ping, pong) =>
system.stop(ping)
system.stop(pong)
}
awaitTerminated(refs)
}
}
def awaitTerminated(refs: Vector[(ActorRef, ActorRef)]): Unit = {
if (refs ne null) refs.foreach {
case (ping, pong) =>
val p = TestProbe()
p.watch(ping)
p.expectTerminated(ping, timeout)
p.watch(pong)
p.expectTerminated(pong, timeout)
}
}
def sendMessage(refs: Vector[(ActorRef, ActorRef)], inFlight: Int): Unit = {
for {
(ping, pong) <- refs
_ <- 1 to inFlight
} {
ping.tell(Message, pong)
}
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong(): Unit = {
// only one message in flight
sendMessage(pingPongActors, inFlight = 1)
awaitTerminated(pingPongActors)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesLessThanCores)
def pingPongLessActorsThanCores(): Unit = {
sendMessage(pingPongLessActorsThanCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongLessActorsThanCoresActors)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesSameAsCores)
def pingPongSameNumberOfActorsAsCores(): Unit = {
sendMessage(pingPongSameNumberOfActorsAsCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongSameNumberOfActorsAsCoresActors)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesMoreThanCores)
def pingPongMoreActorsThanCores(): Unit = {
sendMessage(pingPongMoreActorsThanCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongMoreActorsThanCoresActors)
}
// @Benchmark
// @Measurement(timeUnit = TimeUnit.MILLISECONDS)
// @OperationsPerInvocation(messages)
def floodPipe(): Unit = {
val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None))
@ -83,43 +163,53 @@ class ForkJoinActorBenchmark {
val p = TestProbe()
p.watch(end)
def send(left: Int): Unit =
@tailrec def send(left: Int): Unit =
if (left > 0) {
beginning ! message
beginning ! Message
send(left - 1)
}
send(messages / 4) // we have 4 actors in the pipeline
beginning ! stop
beginning ! Stop
p.expectTerminated(end, timeout)
}
}
object ForkJoinActorBenchmark {
final val stop = "stop"
final val message = "message"
case object Stop
case object Message
final val timeout = 15.seconds
final val messages = 400000
// update according to cpu
final val cores = 8
// 2 actors per
final val moreThanCoresActorPairs = cores * 2
final val lessThanCoresActorPairs = (cores / 2) - 1
final val totalMessagesMoreThanCores = moreThanCoresActorPairs * messages
final val totalMessagesLessThanCores = lessThanCoresActorPairs * messages
final val totalMessagesSameAsCores = cores * messages
class Pipe(next: Option[ActorRef]) extends Actor {
def receive = {
case m @ `message` =>
if (next.isDefined) next.get forward m
case s @ `stop` =>
case Message =>
if (next.isDefined) next.get forward Message
case Stop =>
context stop self
if (next.isDefined) next.get forward s
if (next.isDefined) next.get forward Stop
}
}
class PingPong extends Actor {
var left = messages / 2
def receive = {
case `message` =>
case Message =>
if (left <= 1)
context stop self
sender() ! message
sender() ! Message
left -= 1
}
}

View file

@ -240,6 +240,17 @@ New::
override def get(system: ActorSystem): MyExtension = super.get(system)
}
Actor Mailbox
=============
Scala 2.12 is using the standard JDK8 ForkJoinPool, which may cause performance regression for
some Actor messaging scenarios. Therefore we have embedded the ForkJoinPool from Scala 2.11 in
Akka. This breaks binary backwards compatibility for custom ``Mailbox`` implementations that were
compiled with Akka 2.4.
This is only a problem if you are using a library that provides a custom ``Mailbox`` implementation.
The change is source compatible and such library should be recompiled and released for Akka 2.5.
Streams
=======

View file

@ -99,6 +99,22 @@ object MiMa extends AutoPlugin {
val bcIssuesBetween24and25 = Seq(
// ##22269 GSet as delta-CRDT
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GSet.this"), // constructor supplied by companion object
// # 18262 embed FJP, Mailbox extends ForkJoinTask
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.threadFactory"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#ForkJoinExecutorServiceFactory.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator.validate"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.MonitorableThreadFactory"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.MonitorableThreadFactory.newThread"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#AkkaForkJoinPool.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.ForkJoinExecutorConfigurator#AkkaForkJoinPool.this"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.Mailbox"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.BalancingDispatcher$SharingMailbox"),
ProblemFilters.exclude[MissingTypesProblem]("akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.dispatch.MonitorableThreadFactory#AkkaForkJoinWorkerThread.this"),
// #21875 delta-CRDT
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),