diff --git a/akka-actor-tests/src/test/scala/akka/util/ImmutableIntMapSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ImmutableIntMapSpec.scala new file mode 100644 index 0000000000..298b9f2d81 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/ImmutableIntMapSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.util +import org.scalatest.Matchers +import org.scalatest.WordSpec + +import scala.util.Random + +class ImmutableIntMapSpec extends WordSpec with Matchers { + + "ImmutableIntMap" must { + + "have no entries when empty" in { + val empty = ImmutableIntMap.empty + empty.size should be(0) + empty.keysIterator.toList should be(Nil) + } + + "add and get entries" in { + val m1 = ImmutableIntMap.empty.updated(10, 10) + m1.keysIterator.toList should be(List(10)) + m1.keysIterator.map(m1.get).toList should be(List(10)) + + val m2 = m1.updated(20, 20) + m2.keysIterator.toList should be(List(10, 20)) + m2.keysIterator.map(m2.get).toList should be(List(10, 20)) + + val m3 = m1.updated(5, 5) + m3.keysIterator.toList should be(List(5, 10)) + m3.keysIterator.map(m3.get).toList should be(List(5, 10)) + + val m4 = m2.updated(5, 5) + m4.keysIterator.toList should be(List(5, 10, 20)) + m4.keysIterator.map(m4.get).toList should be(List(5, 10, 20)) + + val m5 = m4.updated(15, 15) + m5.keysIterator.toList should be(List(5, 10, 15, 20)) + m5.keysIterator.map(m5.get).toList should be(List(5, 10, 15, 20)) + } + + "replace entries" in { + val m1 = ImmutableIntMap.empty.updated(10, 10).updated(10, 11) + m1.keysIterator.map(m1.get).toList should be(List(11)) + + val m2 = m1.updated(20, 20).updated(30, 30) + .updated(20, 21).updated(30, 31) + m2.keysIterator.map(m2.get).toList should be(List(11, 21, 31)) + } + + "update if absent" in { + val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 11) + m1.updateIfAbsent(10, 15) should be(ImmutableIntMap.empty.updated(10, 10).updated(20, 11)) + m1.updateIfAbsent(30, 12) should be(ImmutableIntMap.empty.updated(10, 10).updated(20, 11).updated(30, 12)) + } + + "have toString" in { + ImmutableIntMap.empty.toString should be("ImmutableIntMap()") + ImmutableIntMap.empty.updated(10, 10).toString should be("ImmutableIntMap(10 -> 10)") + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).toString should be( + "ImmutableIntMap(10 -> 10, 20 -> 20)") + } + + "have equals and hashCode" in { + ImmutableIntMap.empty.updated(10, 10) should be(ImmutableIntMap.empty.updated(10, 10)) + ImmutableIntMap.empty.updated(10, 10).hashCode should be( + ImmutableIntMap.empty.updated(10, 10).hashCode) + + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should be( + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30)) + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30).hashCode should be( + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30).hashCode) + + ImmutableIntMap.empty.updated(10, 10).updated(20, 20) should not be ImmutableIntMap.empty.updated(10, 10) + + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should not be + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 31) + + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should not be + ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(31, 30) + + ImmutableIntMap.empty should be(ImmutableIntMap.empty) + ImmutableIntMap.empty.hashCode should be(ImmutableIntMap.empty.hashCode) + } + + "remove entries" in { + val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) + + val m2 = m1.remove(10) + m2.keysIterator.map(m2.get).toList should be(List(20, 30)) + + val m3 = m1.remove(20) + m3.keysIterator.map(m3.get).toList should be(List(10, 30)) + + val m4 = m1.remove(30) + m4.keysIterator.map(m4.get).toList should be(List(10, 20)) + + m1.remove(5) should be(m1) + + m1.remove(10).remove(20).remove(30) should be(ImmutableIntMap.empty) + } + + "get None when entry doesn't exist" in { + val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) + m1.get(5) should be(Int.MinValue) + m1.get(15) should be(Int.MinValue) + m1.get(25) should be(Int.MinValue) + m1.get(35) should be(Int.MinValue) + } + + "contain keys" in { + val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) + m1.contains(10) should be(true) + m1.contains(20) should be(true) + m1.contains(30) should be(true) + m1.contains(5) should be(false) + m1.contains(25) should be(false) + } + + "have correct behavior for random operations" in { + val seed = System.nanoTime() + val rnd = new Random(seed) + + var longMap = ImmutableIntMap.empty + var reference = Map.empty[Long, Int] + + def verify(): Unit = { + val m = longMap.keysIterator.map(key ⇒ key → longMap.get(key)).toMap + + m should be(reference) + } + + (1 to 1000).foreach { i ⇒ + withClue(s"seed=$seed, iteration=$i") { + val key = rnd.nextInt(100) + val value = rnd.nextPrintableChar() + rnd.nextInt(3) match { + case 0 | 1 ⇒ + longMap = longMap.updated(key, value) + reference = reference.updated(key, value) + case 2 ⇒ + longMap = longMap.remove(key) + reference = reference - key + } + verify() + } + } + } + + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6d4039c416..b810e4e64b 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -335,6 +335,7 @@ akka { # - "default-executor" requires a "default-executor" section # - "fork-join-executor" requires a "fork-join-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section + # - "affinity-pool-executor" requires an "affinity-pool-executor" section # - A FQCN of a class extending ExecutorServiceConfigurator executor = "default-executor" @@ -350,6 +351,69 @@ akka { fallback = "fork-join-executor" } + # This will be used if you have set "executor = "affinity-pool-executor"" + # Underlying thread pool implementation is akka.dispatch.affinity.AffinityPool. + # This executor is classified as "ApiMayChange". + affinity-pool-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 4 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 0.8 + + # Max number of threads to cap factor-based parallelism number to. + parallelism-max = 64 + + # Each worker in the pool uses a separate bounded MPSC queue. This value + # indicates the upper bound of the queue. Whenever an attempt to enqueue + # a task is made and the queue does not have capacity to accomodate + # the task, the rejection handler created by the factory specified + # in "rejection-handler-factory" is invoked. + task-queue-size = 512 + + # FQCN of the Rejection handler used in the pool. + # Must have an empty public constructor constructor with and needs + # to implement akka.actor.affinity.RejectionHandlerFactory. + rejection-handler-factory = "akka.dispatch.affinity.DefaultRejectionHandlerFactory" + + # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. + # The tradeoff is that to have low latency more CPU time must be used to be + # able to react quickly on incoming messages or send as fast as possible after + # backoff backpressure. + # Level 1 strongly prefer low CPU consumption over low latency. + # Level 10 strongly prefer low latency over low CPU consumption. + idle-cpu-level = 5 + + # Internally the AffinityPool uses two methods to determine which task + # queue to allocate a Runnable to: + # - map based - maintains a round robin counter and a map of Runnable + # hashcodes to queues that they have been associated with. This ensures + # maximum fairness in terms of work distribution, meaning that each worker + # will get approximately equal amount of mailboxes to execute. This is suitable + # in cases where we have a small number of actors that will be scheduled on + # the pool and we want to ensure the maximum possible utilization of the + # available threads. + # - hash based - the task - queue in which the runnable should go is determined + # by using an uniformly distributed int to int hash function which uses the + # hash code of the Runnable as an input. This is preferred in situations where we + # have enough number of distinct actors to ensure statistically uniform + # distribution of work across threads or we are ready to sacrifice the + # former for the added benefit of avoiding map look-ups. + # + # The value serves as a threshold which determines the point at which the + # pool switches from the first to the second work distribution schemes. + # For example, if the value is set to 128, the pool can observe up to + # 128 unique actors and schedule their mailboxes using the map based + # approach. Once this number is reached the pool switches to hash based + # task distribution mode. If the value is set to 0, the map based + # work distribution approach is disabled and only the hash based is + # used irrespective of the number of unique actors. Valid range is + # 0 to 2048 (inclusive) + fair-work-distribution-threshold = 128 + } + # This will be used if you have set "executor = "fork-join-executor"" # Underlying thread pool implementation is akka.dispatch.forkjoin.ForkJoinPool fork-join-executor { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 285337dc4d..28dd212522 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -8,11 +8,13 @@ import java.util.concurrent._ import java.{ util ⇒ ju } import akka.actor._ +import akka.dispatch.affinity.AffinityPoolConfigurator import akka.dispatch.sysmsg._ import akka.event.EventStream import akka.event.Logging.{ Debug, Error, LogEventException } import akka.util.{ Index, Unsafe } import com.typesafe.config.Config + import scala.annotation.tailrec import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -327,6 +329,8 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def configurator(executor: String): ExecutorServiceConfigurator = executor match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) + case "affinity-pool-executor" ⇒ new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites) + case fqcn ⇒ val args = List( classOf[Config] → config, diff --git a/akka-actor/src/main/scala/akka/dispatch/affinity/AffinityPool.scala b/akka-actor/src/main/scala/akka/dispatch/affinity/AffinityPool.scala new file mode 100644 index 0000000000..a53019df1f --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/affinity/AffinityPool.scala @@ -0,0 +1,423 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ + +package akka.dispatch.affinity + +import java.lang.invoke.MethodHandles +import java.lang.invoke.MethodType.methodType +import java.util +import java.util.Collections +import java.util.concurrent.TimeUnit.MICROSECONDS +import java.util.concurrent._ +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } +import java.util.concurrent.locks.{ Lock, LockSupport, ReentrantLock } + +import akka.dispatch._ +import akka.util.Helpers.Requiring +import com.typesafe.config.Config + +import scala.annotation.tailrec +import java.lang.Integer.reverseBytes + +import akka.annotation.InternalApi +import akka.annotation.ApiMayChange +import akka.util.ImmutableIntMap +import akka.util.OptionVal + +import scala.collection.mutable +import scala.util.control.NonFatal + +/** + * An [[ExecutorService]] implementation which pins actor to particular threads + * and guaranteed that an actor's [[Mailbox]] will e run on the thread it used + * it used to run. In situations where we see a lot of cache ping pong, this + * might lead to significant performance improvements. + * + * INTERNAL API + */ +@InternalApi +@ApiMayChange +private[akka] class AffinityPool( + parallelism: Int, + affinityGroupSize: Int, + tf: ThreadFactory, + idleCpuLevel: Int, + fairDistributionThreshold: Int, + rejectionHandler: RejectionHandler) + extends AbstractExecutorService { + + if (parallelism <= 0) + throw new IllegalArgumentException("Size of pool cannot be less or equal to 0") + + // Held while starting/shutting down workers/pool in order to make + // the operations linear and enforce atomicity. An example of that would be + // adding a worker. We want the creation of the worker, addition + // to the set and starting to worker to be an atomic action. Using + // a concurrent set would not give us that + private val bookKeepingLock = new ReentrantLock() + + // condition used for awaiting termination + private val terminationCondition = bookKeepingLock.newCondition() + + // indicates the current state of the pool + @volatile final private var poolState: PoolState = Running + + private final val workQueues = Array.fill(parallelism)(new BoundedTaskQueue(affinityGroupSize)) + private final val workers = mutable.Set[ThreadPoolWorker]() + + // a counter that gets incremented every time a task is queued + private val executionCounter: AtomicInteger = new AtomicInteger(0) + // maps a runnable to an index of a worker queue + private val runnableToWorkerQueueIndex = new AtomicReference(ImmutableIntMap.empty) + + private def locked[T](l: Lock)(body: ⇒ T) = { + l.lock() + try { + body + } finally { + l.unlock() + } + } + + private def getQueueForRunnable(command: Runnable) = { + + val runnableHash = command.hashCode() + + def sbhash(i: Int) = reverseBytes(i * 0x9e3775cd) * 0x9e3775cd + + def getNext = executionCounter.incrementAndGet() % parallelism + + def updateIfAbsentAndGetQueueIndex( + workerQueueIndex: AtomicReference[ImmutableIntMap], + runnableHash: Int, queueIndex: ⇒ Int): Int = { + @tailrec + def updateIndex(): Unit = { + val prev = workerQueueIndex.get() + if (!runnableToWorkerQueueIndex.compareAndSet(prev, prev.updateIfAbsent(runnableHash, queueIndex))) { + updateIndex() + } + } + updateIndex() + workerQueueIndex.get().get(runnableHash) // can safely call get.. + } + + val workQueueIndex = + if (fairDistributionThreshold == 0 || runnableToWorkerQueueIndex.get().size > fairDistributionThreshold) + Math.abs(sbhash(runnableHash)) % parallelism + else + updateIfAbsentAndGetQueueIndex(runnableToWorkerQueueIndex, runnableHash, getNext) + + workQueues(workQueueIndex) + } + + //fires up initial workers + locked(bookKeepingLock) { + workQueues.foreach(q ⇒ addWorker(workers, q)) + } + + private def addWorker(workers: mutable.Set[ThreadPoolWorker], q: BoundedTaskQueue): Unit = { + locked(bookKeepingLock) { + val worker = new ThreadPoolWorker(q, new IdleStrategy(idleCpuLevel)) + workers.add(worker) + worker.startWorker() + } + } + + private def tryEnqueue(command: Runnable) = getQueueForRunnable(command).add(command) + + /** + * Each worker should go through that method while terminating. + * In turn each worker is responsible for modifying the pool + * state accordingly. For example if this is the last worker + * and the queue is empty and we are in a ShuttingDown state + * the worker can transition the pool to ShutDown and attempt + * termination + * + * Furthermore, if this worker has experienced abrupt termination + * due to an exception being thrown in user code, the worker is + * responsible for adding one more worker to compensate for its + * own termination + * + */ + private def onWorkerExit(w: ThreadPoolWorker, abruptTermination: Boolean): Unit = + locked(bookKeepingLock) { + workers.remove(w) + if (workers.isEmpty && !abruptTermination && poolState >= ShuttingDown) { + poolState = ShutDown // transition to shutdown and try to transition to termination + attemptPoolTermination() + } + if (abruptTermination && poolState == Running) + addWorker(workers, w.q) + } + + override def execute(command: Runnable): Unit = { + if (command == null) + throw new NullPointerException + if (!(poolState == Running && tryEnqueue(command))) + rejectionHandler.reject(command, this) + } + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = { + + // recurse until pool is terminated or time out reached + @tailrec + def awaitTermination(nanos: Long): Boolean = { + if (poolState == Terminated) true + else if (nanos <= 0) false + else awaitTermination(terminationCondition.awaitNanos(nanos)) + } + + locked(bookKeepingLock) { + // need to hold the lock to avoid monitor exception + awaitTermination(unit.toNanos(timeout)) + } + + } + + private def attemptPoolTermination() = + locked(bookKeepingLock) { + if (workers.isEmpty && poolState == ShutDown) { + poolState = Terminated + terminationCondition.signalAll() + } + } + + override def shutdownNow(): util.List[Runnable] = + locked(bookKeepingLock) { + poolState = ShutDown + workers.foreach(_.stop()) + attemptPoolTermination() + // like in the FJ executor, we do not provide facility to obtain tasks that were in queue + Collections.emptyList[Runnable]() + } + + override def shutdown(): Unit = + locked(bookKeepingLock) { + poolState = ShuttingDown + // interrupts only idle workers.. so others can process their queues + workers.foreach(_.stopIfIdle()) + attemptPoolTermination() + } + + override def isShutdown: Boolean = poolState == ShutDown + + override def isTerminated: Boolean = poolState == Terminated + + // Following are auxiliary class and trait definitions + + private sealed trait PoolState extends Ordered[PoolState] { + def order: Int + override def compare(that: PoolState): Int = this.order compareTo that.order + } + + // accepts new tasks and processes tasks that are enqueued + private case object Running extends PoolState { + override val order: Int = 0 + } + + // does not accept new tasks, processes tasks that are in the queue + private case object ShuttingDown extends PoolState { + override def order: Int = 1 + } + + // does not accept new tasks, does not process tasks in queue + private case object ShutDown extends PoolState { + override def order: Int = 2 + } + + // all threads have been stopped, does not process tasks and does not accept new ones + private case object Terminated extends PoolState { + override def order: Int = 3 + } + + private final class IdleStrategy(val idleCpuLevel: Int) { + + private val maxSpins = 1100 * idleCpuLevel - 1000 + private val maxYields = 5 * idleCpuLevel + private val minParkPeriodNs = 1 + private val maxParkPeriodNs = MICROSECONDS.toNanos(280 - 30 * idleCpuLevel) + + private sealed trait State + private case object NotIdle extends State + private case object Spinning extends State + private case object Yielding extends State + private case object Parking extends State + + private var state: State = NotIdle + private var spins = 0L + private var yields = 0L + private var parkPeriodNs = 0L + + private val onSpinWaitMethodHandle = + try + OptionVal.Some(MethodHandles.lookup.findStatic(classOf[Thread], "onSpinWait", methodType(classOf[Unit]))) + catch { + case NonFatal(_) ⇒ OptionVal.None + } + + def idle(): Unit = { + state match { + case NotIdle ⇒ + state = Spinning + spins += 1 + case Spinning ⇒ + onSpinWaitMethodHandle match { + case OptionVal.Some(m) ⇒ m.invokeExact() + case OptionVal.None ⇒ + } + spins += 1 + if (spins > maxSpins) { + state = Yielding + yields = 0 + } + case Yielding ⇒ + yields += 1 + if (yields > maxYields) { + state = Parking + parkPeriodNs = minParkPeriodNs + } else Thread.`yield`() + case Parking ⇒ + LockSupport.parkNanos(parkPeriodNs) + parkPeriodNs = Math.min(parkPeriodNs << 1, maxParkPeriodNs) + } + } + + def reset(): Unit = { + spins = 0 + yields = 0 + state = NotIdle + } + + } + + private final class BoundedTaskQueue(capacity: Int) extends AbstractBoundedNodeQueue[Runnable](capacity) + + private final class ThreadPoolWorker(val q: BoundedTaskQueue, val idleStrategy: IdleStrategy) extends Runnable { + + private sealed trait WorkerState + private case object NotStarted extends WorkerState + private case object InExecution extends WorkerState + private case object Idle extends WorkerState + + val thread: Thread = tf.newThread(this) + @volatile private var workerState: WorkerState = NotStarted + + def startWorker(): Unit = { + workerState = Idle + thread.start() + } + + private def runCommand(command: Runnable) = { + workerState = InExecution + try + command.run() + finally + workerState = Idle + } + + override def run(): Unit = { + + /** + * Determines whether the worker can keep running or not. + * In order to continue polling for tasks three conditions + * need to be satisfied: + * + * 1) pool state is less than Shutting down or queue + * is not empty (e.g pool state is ShuttingDown but there are still messages to process) + * + * 2) the thread backing up this worker has not been interrupted + * + * 3) We are not in ShutDown state (in which we should not be processing any enqueued tasks) + */ + def shouldKeepRunning = + (poolState < ShuttingDown || !q.isEmpty) && + !Thread.interrupted() && + poolState != ShutDown + + var abruptTermination = true + try { + while (shouldKeepRunning) { + val c = q.poll() + if (c ne null) { + runCommand(c) + idleStrategy.reset() + } else // if not wait for a bit + idleStrategy.idle() + } + abruptTermination = false // if we have reached here, our termination is not due to an exception + } finally { + onWorkerExit(this, abruptTermination) + } + } + + def stop() = + if (!thread.isInterrupted && workerState != NotStarted) + thread.interrupt() + + def stopIfIdle() = + if (workerState == Idle) + stop() + } + +} + +/** + * INTERNAL API + */ +@InternalApi +@ApiMayChange +private[akka] final class AffinityPoolConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) { + + private final val MaxfairDistributionThreshold = 2048 + + private val poolSize = ThreadPoolConfig.scaledPoolSize( + config.getInt("parallelism-min"), + config.getDouble("parallelism-factor"), + config.getInt("parallelism-max")) + private val taskQueueSize = config.getInt("task-queue-size") + + private val idleCpuLevel = config.getInt("idle-cpu-level").requiring(level ⇒ + 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") + + private val fairDistributionThreshold = config.getInt("fair-work-distribution-threshold").requiring(thr ⇒ + 0 <= thr && thr <= MaxfairDistributionThreshold, s"idle-cpu-level must be between 1 and $MaxfairDistributionThreshold") + + private val rejectionHandlerFCQN = config.getString("rejection-handler-factory") + + private val rejectionHandlerFactory = prerequisites.dynamicAccess + .createInstanceFor[RejectionHandlerFactory](rejectionHandlerFCQN, Nil).recover({ + case exception ⇒ throw new IllegalArgumentException( + s"Cannot instantiate RejectionHandlerFactory (rejection-handler-factory = $rejectionHandlerFCQN),make sure it has an accessible empty constructor", + exception) + }).get + + override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + new ExecutorServiceFactory { + override def createExecutorService: ExecutorService = + new AffinityPool(poolSize, taskQueueSize, threadFactory, idleCpuLevel, fairDistributionThreshold, rejectionHandlerFactory.create()) + } +} + +trait RejectionHandler { + def reject(command: Runnable, service: ExecutorService) +} + +trait RejectionHandlerFactory { + def create(): RejectionHandler +} + +/** + * INTERNAL API + */ +@InternalApi +@ApiMayChange +private[akka] final class DefaultRejectionHandlerFactory extends RejectionHandlerFactory { + private class DefaultRejectionHandler extends RejectionHandler { + override def reject(command: Runnable, service: ExecutorService): Unit = + throw new RejectedExecutionException(s"Task ${command.toString} rejected from ${service.toString}") + } + override def create(): RejectionHandler = new DefaultRejectionHandler() +} + diff --git a/akka-actor/src/main/scala/akka/util/ImmutableIntMap.scala b/akka-actor/src/main/scala/akka/util/ImmutableIntMap.scala new file mode 100644 index 0000000000..8b981c28b9 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/ImmutableIntMap.scala @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.util +import java.util.Arrays + +import akka.annotation.InternalApi + +import scala.annotation.tailrec + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ImmutableIntMap { + val empty: ImmutableIntMap = + new ImmutableIntMap(Array.emptyIntArray, Array.empty) + + private final val MaxScanLength = 10 +} + +/** + * INTERNAL API + * Specialized Map for primitive `Int` keys to avoid allocations (boxing). + * Keys and values are backed by arrays and lookup is performed with binary + * search. It's intended for rather small (<1000) maps. + */ +@InternalApi private[akka] final class ImmutableIntMap private ( + private val keys: Array[Int], private val values: Array[Int]) { + + final val size: Int = keys.length + + /** + * Worst case `O(log n)`, allocation free. + */ + def get(key: Int): Int = { + val i = Arrays.binarySearch(keys, key) + if (i >= 0) values(i) + else Int.MinValue // cant use null, cant use OptionVal, other option is to throw an exception... + } + + /** + * Worst case `O(log n)`, allocation free. + */ + def contains(key: Int): Boolean = { + Arrays.binarySearch(keys, key) >= 0 + } + + def updateIfAbsent(key: Int, value: ⇒ Int): ImmutableIntMap = { + if (contains(key)) + this + else + updated(key, value) + } + + /** + * Worst case `O(log n)`, creates new `ImmutableIntMap` + * with copies of the internal arrays for the keys and + * values. + */ + def updated(key: Int, value: Int): ImmutableIntMap = { + if (size == 0) + new ImmutableIntMap(Array(key), Array(value)) + else { + val i = Arrays.binarySearch(keys, key) + if (i >= 0) { + // existing key, replace value + val newValues = new Array[Int](values.length) + System.arraycopy(values, 0, newValues, 0, values.length) + newValues(i) = value + new ImmutableIntMap(keys, newValues) + } else { + // insert the entry at the right position, and keep the arrays sorted + val j = -(i + 1) + val newKeys = new Array[Int](size + 1) + System.arraycopy(keys, 0, newKeys, 0, j) + newKeys(j) = key + System.arraycopy(keys, j, newKeys, j + 1, keys.length - j) + + val newValues = new Array[Int](size + 1) + System.arraycopy(values, 0, newValues, 0, j) + newValues(j) = value + System.arraycopy(values, j, newValues, j + 1, values.length - j) + + new ImmutableIntMap(newKeys, newValues) + } + } + } + + def remove(key: Int): ImmutableIntMap = { + val i = Arrays.binarySearch(keys, key) + if (i >= 0) { + if (size == 1) + ImmutableIntMap.empty + else { + val newKeys = new Array[Int](size - 1) + System.arraycopy(keys, 0, newKeys, 0, i) + System.arraycopy(keys, i + 1, newKeys, i, keys.length - i - 1) + + val newValues = new Array[Int](size - 1) + System.arraycopy(values, 0, newValues, 0, i) + System.arraycopy(values, i + 1, newValues, i, values.length - i - 1) + + new ImmutableIntMap(newKeys, newValues) + } + } else + this + } + + /** + * All keys + */ + def keysIterator: Iterator[Int] = + keys.iterator + + override def toString: String = + keysIterator.map(key ⇒ s"$key -> ${get(key)}").mkString("ImmutableIntMap(", ", ", ")") + + override def hashCode: Int = { + var result = HashCode.SEED + result = HashCode.hash(result, keys) + result = HashCode.hash(result, values) + result + } + + override def equals(obj: Any): Boolean = obj match { + case other: ImmutableIntMap ⇒ + if (other eq this) true + else if (size != other.size) false + else if (size == 0 && other.size == 0) true + else { + @tailrec def check(i: Int): Boolean = { + if (i < 0) true + else if (keys(i) == other.keys(i) && values(i) == other.values(i)) + check(i - 1) // recur, next elem + else false + } + check(size - 1) + } + case _ ⇒ false + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolComparativeBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolComparativeBenchmark.scala new file mode 100644 index 0000000000..84e9d64c93 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolComparativeBenchmark.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.TimeUnit + +import akka.actor.BenchmarkActors._ +import akka.actor.ForkJoinActorBenchmark.cores +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class AffinityPoolComparativeBenchmark { + + @Param(Array("1")) + var throughPut = 0 + + @Param(Array("affinity-dispatcher", "default-fj-dispatcher", "fixed-size-dispatcher")) + var dispatcher = "" + + @Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default" + var mailbox = "" + + final val numThreads, numActors = 8 + final val numMessagesPerActorPair = 2000000 + final val totalNumberOfMessages = numMessagesPerActorPair * (numActors / 2) + + implicit var system: ActorSystem = _ + + @Setup(Level.Trial) + def setup(): Unit = { + + requireRightNumberOfCores(cores) + + val mailboxConf = mailbox match { + case "default" => "" + case "SingleConsumerOnlyUnboundedMailbox" => + s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}"""" + } + + system = ActorSystem("AffinityPoolComparativeBenchmark", ConfigFactory.parseString( + s"""| akka { + | log-dead-letters = off + | actor { + | default-fj-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = $numThreads + | parallelism-factor = 1.0 + | parallelism-max = $numThreads + | } + | throughput = $throughPut + | } + | + | fixed-size-dispatcher { + | executor = "thread-pool-executor" + | thread-pool-executor { + | fixed-pool-size = $numThreads + | } + | throughput = $throughPut + | } + | + | affinity-dispatcher { + | executor = "affinity-pool-executor" + | affinity-pool-executor { + | parallelism-min = $numThreads + | parallelism-factor = 1.0 + | parallelism-max = $numThreads + | task-queue-size = 512 + | idle-cpu-level = 5 + | fair-work-distribution-threshold = 2048 + | } + | throughput = $throughPut + | } + | $mailboxConf + | } + | } + """.stripMargin + )) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = tearDownSystem() + + @Benchmark + @OperationsPerInvocation(totalNumberOfMessages) + def pingPong(): Unit = benchmarkPingPongActors(numMessagesPerActorPair, numActors, dispatcher, throughPut, timeout) +} diff --git a/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolIdleCPULevelBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolIdleCPULevelBenchmark.scala new file mode 100644 index 0000000000..e264a4f45b --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolIdleCPULevelBenchmark.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.TimeUnit + +import akka.actor.BenchmarkActors._ +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class AffinityPoolIdleCPULevelBenchmark { + + final val numThreads, numActors = 8 + final val numMessagesPerActorPair = 2000000 + final val totalNumberOfMessages = numMessagesPerActorPair * (numActors / 2) + + implicit var system: ActorSystem = _ + + @Param(Array("1", "3", "5", "7", "10")) + var idleCPULevel = "" + + @Param(Array("25")) + var throughPut = 0 + + @Setup(Level.Trial) + def setup(): Unit = { + + requireRightNumberOfCores(numThreads) + + system = ActorSystem("AffinityPoolWaitingStrategyBenchmark", ConfigFactory.parseString( + s""" | akka { + | log-dead-letters = off + | actor { + | affinity-dispatcher { + | executor = "affinity-pool-executor" + | affinity-pool-executor { + | parallelism-min = $numThreads + | parallelism-factor = 1.0 + | parallelism-max = $numThreads + | task-queue-size = 512 + | idle-cpu-level = $idleCPULevel + | fair-work-distribution-threshold = 2048 + | } + | throughput = $throughPut + | } + | + | } + | } + """.stripMargin + )) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = tearDownSystem() + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + @OperationsPerInvocation(8000000) + def pingPong(): Unit = benchmarkPingPongActors(numMessagesPerActorPair, numActors, "affinity-dispatcher", throughPut, timeout) + +} diff --git a/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolRequestResponseBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolRequestResponseBenchmark.scala new file mode 100644 index 0000000000..dc8df65d04 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/AffinityPoolRequestResponseBenchmark.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import akka.actor.BenchmarkActors._ +import akka.actor.ForkJoinActorBenchmark.cores +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 20, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class AffinityPoolRequestResponseBenchmark { + + @Param(Array("1", "5", "50")) + var throughPut = 0 + + @Param(Array("affinity-dispatcher", "default-fj-dispatcher", "fixed-size-dispatcher")) + var dispatcher = "" + + @Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default" + var mailbox = "" + + final val numThreads, numActors = 8 + final val numQueriesPerActor = 400000 + final val totalNumberOfMessages = numQueriesPerActor * numActors + final val numUsersInDB = 300000 + + implicit var system: ActorSystem = _ + + var actors: Vector[(ActorRef, ActorRef)] = null + var latch: CountDownLatch = null + + @Setup(Level.Trial) + def setup(): Unit = { + + requireRightNumberOfCores(cores) + + val mailboxConf = mailbox match { + case "default" => "" + case "SingleConsumerOnlyUnboundedMailbox" => + s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}"""" + } + + system = ActorSystem("AffinityPoolComparativeBenchmark", ConfigFactory.parseString( + s"""| akka { + | log-dead-letters = off + | actor { + | default-fj-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = $numThreads + | parallelism-factor = 1.0 + | parallelism-max = $numThreads + | } + | throughput = $throughPut + | } + | + | fixed-size-dispatcher { + | executor = "thread-pool-executor" + | thread-pool-executor { + | fixed-pool-size = $numThreads + | } + | throughput = $throughPut + | } + | + | affinity-dispatcher { + | executor = "affinity-pool-executor" + | affinity-pool-executor { + | parallelism-min = $numThreads + | parallelism-factor = 1.0 + | parallelism-max = $numThreads + | task-queue-size = 512 + | idle-cpu-level = 5 + | fair-work-distribution-threshold = 2048 + | } + | throughput = $throughPut + | } + | $mailboxConf + | } + | } + """.stripMargin + )) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = tearDownSystem() + + @Setup(Level.Invocation) + def setupActors(): Unit = { + val (_actors, _latch) = RequestResponseActors.startUserQueryActorPairs(numActors, numQueriesPerActor, numUsersInDB, dispatcher) + actors = _actors + latch = _latch + } + + @Benchmark + @OperationsPerInvocation(totalNumberOfMessages) + def queryUserServiceActor(): Unit = { + val startNanoTime = System.nanoTime() + RequestResponseActors.initiateQuerySimulation(actors, throughPut * 2) + latch.await(BenchmarkActors.timeout.toSeconds, TimeUnit.SECONDS) + BenchmarkActors.printProgress(totalNumberOfMessages, numActors, startNanoTime) + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala new file mode 100644 index 0000000000..618ae35868 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ + +object BenchmarkActors { + + val timeout = 30.seconds + + case object Message + case object Stop + + class PingPong(val messages: Int, latch: CountDownLatch) extends Actor { + var left = messages / 2 + def receive = { + case Message => + + if (left == 0) { + latch.countDown() + context stop self + } + + sender() ! Message + left -= 1 + } + } + + object PingPong { + def props(messages: Int, latch: CountDownLatch) = Props(new PingPong(messages, latch)) + } + + class Pipe(next: Option[ActorRef]) extends Actor { + def receive = { + case Message => + if (next.isDefined) next.get forward Message + case Stop => + context stop self + if (next.isDefined) next.get forward Stop + } + } + + object Pipe { + def props(next: Option[ActorRef]) = Props(new Pipe(next)) + } + + private def startPingPongActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String)(implicit system: ActorSystem) = { + val fullPathToDispatcher = "akka.actor." + dispatcher + val latch = new CountDownLatch(numPairs * 2) + val actors = for { + i <- (1 to numPairs).toVector + } yield { + val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) + val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) + (ping, pong) + } + (actors, latch) + } + + private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int) = { + for { + (ping, pong) <- refs + _ <- 1 to inFlight + } { + ping.tell(Message, pong) + } + } + + def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = { + val durationMicros = (System.nanoTime() - startNanoTime) / 1000 + println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " + + f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s") + } + + def requireRightNumberOfCores(numCores: Int) = + require( + Runtime.getRuntime.availableProcessors == numCores, + s"Update the cores constant to ${Runtime.getRuntime.availableProcessors}" + ) + + def benchmarkPingPongActors(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, throughPut: Int, shutdownTimeout: Duration)(implicit system: ActorSystem): Unit = { + val numPairs = numActors / 2 + val totalNumMessages = numPairs * numMessagesPerActorPair + val (actors, latch) = startPingPongActorPairs(numMessagesPerActorPair, numPairs, dispatcher) + val startNanoTime = System.nanoTime() + initiatePingPongForPairs(actors, inFlight = throughPut * 2) + latch.await(shutdownTimeout.toSeconds, TimeUnit.SECONDS) + printProgress(totalNumMessages, numActors, startNanoTime) + } + + def tearDownSystem()(implicit system: ActorSystem): Unit = { + system.terminate() + Await.ready(system.whenTerminated, timeout) + } + +} + diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala index ca8f5529a9..93f1d1d1a5 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala @@ -6,46 +6,61 @@ package akka.actor import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ -import scala.concurrent.duration._ import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.annotation.tailrec +import BenchmarkActors._ +import scala.concurrent.duration._ @State(Scope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @Fork(1) @Threads(1) @Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) -@Measurement(iterations = 20) +@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) class ForkJoinActorBenchmark { import ForkJoinActorBenchmark._ - @Param(Array("5")) + @Param(Array("5", "25", "50")) var tpt = 0 - @Param(Array("1")) + @Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr var threads = "" + @Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default" + var mailbox = "" + implicit var system: ActorSystem = _ @Setup(Level.Trial) def setup(): Unit = { + + requireRightNumberOfCores(cores) + + val mailboxConf = mailbox match { + case "default" => "" + case "SingleConsumerOnlyUnboundedMailbox" => + s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}"""" + } + system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString( - s"""| akka { - | log-dead-letters = off - | actor { - | default-dispatcher { - | executor = "fork-join-executor" - | fork-join-executor { - | parallelism-min = 1 - | parallelism-factor = $threads - | parallelism-max = 64 - | } - | throughput = $tpt - | } - | } - | } - """.stripMargin + s""" + akka { + log-dead-letters = off + actor { + default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = $threads + parallelism-factor = 1 + parallelism-max = $threads + } + throughput = $tpt + } + $mailboxConf + } + } + """ )) } @@ -55,110 +70,31 @@ class ForkJoinActorBenchmark { Await.ready(system.whenTerminated, 15.seconds) } - var pingPongActors: Vector[(ActorRef, ActorRef)] = null - var pingPongLessActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null - var pingPongSameNumberOfActorsAsCoresActors: Vector[(ActorRef, ActorRef)] = null - var pingPongMoreActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null - - @Setup(Level.Invocation) - def setupActors(): Unit = { - pingPongActors = startActors(1) - pingPongLessActorsThanCoresActors = startActors(lessThanCoresActorPairs) - pingPongSameNumberOfActorsAsCoresActors = startActors(cores / 2) - pingPongMoreActorsThanCoresActors = startActors(moreThanCoresActorPairs) - } - - @TearDown(Level.Invocation) - def tearDownActors(): Unit = { - stopActors(pingPongActors) - stopActors(pingPongLessActorsThanCoresActors) - stopActors(pingPongSameNumberOfActorsAsCoresActors) - stopActors(pingPongMoreActorsThanCoresActors) - } - - def startActors(n: Int): Vector[(ActorRef, ActorRef)] = { - for { - i <- (1 to n).toVector - } yield { - val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) - val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) - (ping, pong) - } - } - - def stopActors(refs: Vector[(ActorRef, ActorRef)]): Unit = { - if (refs ne null) { - refs.foreach { - case (ping, pong) => - system.stop(ping) - system.stop(pong) - } - awaitTerminated(refs) - } - } - - def awaitTerminated(refs: Vector[(ActorRef, ActorRef)]): Unit = { - if (refs ne null) refs.foreach { - case (ping, pong) => - val p = TestProbe() - p.watch(ping) - p.expectTerminated(ping, timeout) - p.watch(pong) - p.expectTerminated(pong, timeout) - } - } - - def sendMessage(refs: Vector[(ActorRef, ActorRef)], inFlight: Int): Unit = { - for { - (ping, pong) <- refs - _ <- 1 to inFlight - } { - ping.tell(Message, pong) - } - } + @Benchmark + @OperationsPerInvocation(totalMessagesTwoActors) + def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "default-dispatcher", tpt, timeout) @Benchmark - @Measurement(timeUnit = TimeUnit.MILLISECONDS) - @OperationsPerInvocation(messages) - def pingPong(): Unit = { - // only one message in flight - sendMessage(pingPongActors, inFlight = 1) - awaitTerminated(pingPongActors) - } - - @Benchmark - @Measurement(timeUnit = TimeUnit.MILLISECONDS) @OperationsPerInvocation(totalMessagesLessThanCores) - def pingPongLessActorsThanCores(): Unit = { - sendMessage(pingPongLessActorsThanCoresActors, inFlight = 2 * tpt) - awaitTerminated(pingPongLessActorsThanCoresActors) - } + def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "default-dispatcher", tpt, timeout) @Benchmark - @Measurement(timeUnit = TimeUnit.MILLISECONDS) @OperationsPerInvocation(totalMessagesSameAsCores) - def pingPongSameNumberOfActorsAsCores(): Unit = { - sendMessage(pingPongSameNumberOfActorsAsCoresActors, inFlight = 2 * tpt) - awaitTerminated(pingPongSameNumberOfActorsAsCoresActors) - } + def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "default-dispatcher", tpt, timeout) @Benchmark - @Measurement(timeUnit = TimeUnit.MILLISECONDS) @OperationsPerInvocation(totalMessagesMoreThanCores) - def pingPongMoreActorsThanCores(): Unit = { - sendMessage(pingPongMoreActorsThanCoresActors, inFlight = 2 * tpt) - awaitTerminated(pingPongMoreActorsThanCoresActors) - } + def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "default-dispatcher", tpt, timeout) // @Benchmark // @Measurement(timeUnit = TimeUnit.MILLISECONDS) // @OperationsPerInvocation(messages) def floodPipe(): Unit = { - val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None)) - val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end))) - val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle))) - val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate))) + val end = system.actorOf(Props(classOf[Pipe], None)) + val middle = system.actorOf(Props(classOf[Pipe], Some(end))) + val penultimate = system.actorOf(Props(classOf[Pipe], Some(middle))) + val beginning = system.actorOf(Props(classOf[Pipe], Some(penultimate))) val p = TestProbe() p.watch(end) @@ -178,39 +114,23 @@ class ForkJoinActorBenchmark { } object ForkJoinActorBenchmark { - case object Stop - case object Message - final val timeout = 15.seconds - final val messages = 400000 + final val messages = 2000000 // messages per actor pair + // Constants because they are used in annotations // update according to cpu final val cores = 8 - // 2 actors per - final val moreThanCoresActorPairs = cores * 2 - final val lessThanCoresActorPairs = (cores / 2) - 1 - final val totalMessagesMoreThanCores = moreThanCoresActorPairs * messages - final val totalMessagesLessThanCores = lessThanCoresActorPairs * messages - final val totalMessagesSameAsCores = cores * messages + final val coresStr = "8" + final val cores2xStr = "16" + final val cores4xStr = "24" - class Pipe(next: Option[ActorRef]) extends Actor { - def receive = { - case Message => - if (next.isDefined) next.get forward Message - case Stop => - context stop self - if (next.isDefined) next.get forward Stop - } - } - class PingPong extends Actor { - var left = messages / 2 - def receive = { - case Message => + final val twoActors = 2 + final val moreThanCoresActors = cores * 2 + final val lessThanCoresActors = cores / 2 + final val sameAsCoresActors = cores - if (left <= 1) - context stop self + final val totalMessagesTwoActors = messages + final val totalMessagesMoreThanCores = (moreThanCoresActors * messages) / 2 + final val totalMessagesLessThanCores = (lessThanCoresActors * messages) / 2 + final val totalMessagesSameAsCores = (sameAsCoresActors * messages) / 2 - sender() ! Message - left -= 1 - } - } -} +} \ No newline at end of file diff --git a/akka-bench-jmh/src/main/scala/akka/actor/RequestResponseActors.scala b/akka-bench-jmh/src/main/scala/akka/actor/RequestResponseActors.scala new file mode 100644 index 0000000000..dd7e0f8e41 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/RequestResponseActors.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.CountDownLatch + +import scala.collection.mutable +import scala.util.Random + +object RequestResponseActors { + + case class Request(userId: Int) + case class User(userId: Int, firstName: String, lastName: String, ssn: Int, friends: Seq[Int]) + + class UserQueryActor(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) extends Actor { + + private var left = numQueries + private val receivedUsers: mutable.Map[Int, User] = mutable.Map() + private val randGenerator = new Random() + + override def receive: Receive = { + case u: User => { + receivedUsers.put(u.userId, u) + if (left == 0) { + latch.countDown() + context stop self + } else { + sender() ! Request(randGenerator.nextInt(numUsersInDB)) + } + left -= 1 + } + } + } + + object UserQueryActor { + def props(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) = { + Props(new UserQueryActor(latch, numQueries, numUsersInDB)) + } + } + + class UserServiceActor(userDb: Map[Int, User], latch: CountDownLatch, numQueries: Int) extends Actor { + private var left = numQueries + def receive = { + case Request(id) => + userDb.get(id) match { + case Some(u) => sender() ! u + case None => + } + if (left == 0) { + latch.countDown() + context stop self + } + left -= 1 + } + + } + + object UserServiceActor { + def props(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) = { + val r = new Random() + val users = for { + id <- 0 until numUsersInDB + firstName = r.nextString(5) + lastName = r.nextString(7) + ssn = r.nextInt() + friendIds = for { _ <- 0 until 5 } yield r.nextInt(numUsersInDB) + } yield id -> User(id, firstName, lastName, ssn, friendIds) + Props(new UserServiceActor(users.toMap, latch, numQueries)) + } + } + + def startUserQueryActorPairs(numActors: Int, numQueriesPerActor: Int, numUsersInDBPerActor: Int, dispatcher: String)(implicit system: ActorSystem) = { + val fullPathToDispatcher = "akka.actor." + dispatcher + val latch = new CountDownLatch(numActors) + val actorsPairs = for { + i <- (1 to (numActors / 2)).toVector + userQueryActor = system.actorOf(UserQueryActor.props(latch, numQueriesPerActor, numUsersInDBPerActor).withDispatcher(fullPathToDispatcher)) + userServiceActor = system.actorOf(UserServiceActor.props(latch, numQueriesPerActor, numUsersInDBPerActor).withDispatcher(fullPathToDispatcher)) + } yield (userQueryActor, userServiceActor) + (actorsPairs, latch) + } + + def initiateQuerySimulation(requestResponseActorPairs: Seq[(ActorRef, ActorRef)], inFlight: Int) = { + for { + (queryActor, serviceActor) <- requestResponseActorPairs + i <- 1 to inFlight + } { + serviceActor.tell(Request(i), queryActor) + } + } + +} diff --git a/akka-docs/src/main/paradox/scala/dispatchers.md b/akka-docs/src/main/paradox/scala/dispatchers.md index 4931946d6d..2741175c3b 100644 --- a/akka-docs/src/main/paradox/scala/dispatchers.md +++ b/akka-docs/src/main/paradox/scala/dispatchers.md @@ -143,6 +143,13 @@ Another example that uses the thread pool based on the number of cores (e.g. for @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-thread-pool-dispatcher-config } +A different kind of dispatcher that uses an affinity pool may increase throughput in cases where there is relatively small +number of actors that maintain some internal state. The affinity pool tries its best to ensure that an actor is always +scheduled to run on the same thread. This actor to thread pinning aims to decrease CPU cache misses which can result +in significant throughput improvement. + +@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #affinity-pool-dispatcher-config } + Configuring a `PinnedDispatcher`: diff --git a/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala index 0e71951987..7af5de2967 100644 --- a/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala @@ -92,6 +92,28 @@ object DispatcherDocSpec { } //#my-thread-pool-dispatcher-config + //#affinity-pool-dispatcher-config + affinity-pool-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "affinity-pool-executor" + # Configuration for the thread pool + affinity-pool-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 8 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 1 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 16 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } + //#affinity-pool-dispatcher-config + //#fixed-pool-size-dispatcher-config blocking-io-dispatcher { type = Dispatcher @@ -294,6 +316,14 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { //#defining-pinned-dispatcher } + "defining affinity-pool dispatcher" in { + val context = system + //#defining-affinity-pool-dispatcher + val myActor = + context.actorOf(Props[MyActor].withDispatcher("affinity-pool-dispatcher"), "myactor4") + //#defining-affinity-pool-dispatcher + } + "looking up a dispatcher" in { //#lookup // for use with Futures, Scheduler, etc. diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d985fa4457..f0b27cd551 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -75,7 +75,6 @@ object Dependencies { val aeronDriver = "io.aeron" % "aeron-driver" % aeronVersion // ApacheV2 val aeronClient = "io.aeron" % "aeron-client" % aeronVersion // ApacheV2 - object Docs { val sprayJson = "io.spray" %% "spray-json" % "1.3.3" % "test" val gson = "com.google.code.gson" % "gson" % "2.8.0" % "test"