Add AffinityPool which pins actors to threads (#23104)

This commit is contained in:
Zahari Dichev 2017-07-07 14:36:26 +03:00 committed by Patrik Nordwall
parent fe2bf91659
commit 4d45064296
14 changed files with 1345 additions and 139 deletions

View file

@ -0,0 +1,151 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
import org.scalatest.Matchers
import org.scalatest.WordSpec
import scala.util.Random
class ImmutableIntMapSpec extends WordSpec with Matchers {
"ImmutableIntMap" must {
"have no entries when empty" in {
val empty = ImmutableIntMap.empty
empty.size should be(0)
empty.keysIterator.toList should be(Nil)
}
"add and get entries" in {
val m1 = ImmutableIntMap.empty.updated(10, 10)
m1.keysIterator.toList should be(List(10))
m1.keysIterator.map(m1.get).toList should be(List(10))
val m2 = m1.updated(20, 20)
m2.keysIterator.toList should be(List(10, 20))
m2.keysIterator.map(m2.get).toList should be(List(10, 20))
val m3 = m1.updated(5, 5)
m3.keysIterator.toList should be(List(5, 10))
m3.keysIterator.map(m3.get).toList should be(List(5, 10))
val m4 = m2.updated(5, 5)
m4.keysIterator.toList should be(List(5, 10, 20))
m4.keysIterator.map(m4.get).toList should be(List(5, 10, 20))
val m5 = m4.updated(15, 15)
m5.keysIterator.toList should be(List(5, 10, 15, 20))
m5.keysIterator.map(m5.get).toList should be(List(5, 10, 15, 20))
}
"replace entries" in {
val m1 = ImmutableIntMap.empty.updated(10, 10).updated(10, 11)
m1.keysIterator.map(m1.get).toList should be(List(11))
val m2 = m1.updated(20, 20).updated(30, 30)
.updated(20, 21).updated(30, 31)
m2.keysIterator.map(m2.get).toList should be(List(11, 21, 31))
}
"update if absent" in {
val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 11)
m1.updateIfAbsent(10, 15) should be(ImmutableIntMap.empty.updated(10, 10).updated(20, 11))
m1.updateIfAbsent(30, 12) should be(ImmutableIntMap.empty.updated(10, 10).updated(20, 11).updated(30, 12))
}
"have toString" in {
ImmutableIntMap.empty.toString should be("ImmutableIntMap()")
ImmutableIntMap.empty.updated(10, 10).toString should be("ImmutableIntMap(10 -> 10)")
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).toString should be(
"ImmutableIntMap(10 -> 10, 20 -> 20)")
}
"have equals and hashCode" in {
ImmutableIntMap.empty.updated(10, 10) should be(ImmutableIntMap.empty.updated(10, 10))
ImmutableIntMap.empty.updated(10, 10).hashCode should be(
ImmutableIntMap.empty.updated(10, 10).hashCode)
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should be(
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30))
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30).hashCode should be(
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30).hashCode)
ImmutableIntMap.empty.updated(10, 10).updated(20, 20) should not be ImmutableIntMap.empty.updated(10, 10)
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should not be
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 31)
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30) should not be
ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(31, 30)
ImmutableIntMap.empty should be(ImmutableIntMap.empty)
ImmutableIntMap.empty.hashCode should be(ImmutableIntMap.empty.hashCode)
}
"remove entries" in {
val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30)
val m2 = m1.remove(10)
m2.keysIterator.map(m2.get).toList should be(List(20, 30))
val m3 = m1.remove(20)
m3.keysIterator.map(m3.get).toList should be(List(10, 30))
val m4 = m1.remove(30)
m4.keysIterator.map(m4.get).toList should be(List(10, 20))
m1.remove(5) should be(m1)
m1.remove(10).remove(20).remove(30) should be(ImmutableIntMap.empty)
}
"get None when entry doesn't exist" in {
val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30)
m1.get(5) should be(Int.MinValue)
m1.get(15) should be(Int.MinValue)
m1.get(25) should be(Int.MinValue)
m1.get(35) should be(Int.MinValue)
}
"contain keys" in {
val m1 = ImmutableIntMap.empty.updated(10, 10).updated(20, 20).updated(30, 30)
m1.contains(10) should be(true)
m1.contains(20) should be(true)
m1.contains(30) should be(true)
m1.contains(5) should be(false)
m1.contains(25) should be(false)
}
"have correct behavior for random operations" in {
val seed = System.nanoTime()
val rnd = new Random(seed)
var longMap = ImmutableIntMap.empty
var reference = Map.empty[Long, Int]
def verify(): Unit = {
val m = longMap.keysIterator.map(key key longMap.get(key)).toMap
m should be(reference)
}
(1 to 1000).foreach { i
withClue(s"seed=$seed, iteration=$i") {
val key = rnd.nextInt(100)
val value = rnd.nextPrintableChar()
rnd.nextInt(3) match {
case 0 | 1
longMap = longMap.updated(key, value)
reference = reference.updated(key, value)
case 2
longMap = longMap.remove(key)
reference = reference - key
}
verify()
}
}
}
}
}

View file

@ -335,6 +335,7 @@ akka {
# - "default-executor" requires a "default-executor" section # - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section # - "fork-join-executor" requires a "fork-join-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator # - A FQCN of a class extending ExecutorServiceConfigurator
executor = "default-executor" executor = "default-executor"
@ -350,6 +351,69 @@ akka {
fallback = "fork-join-executor" fallback = "fork-join-executor"
} }
# This will be used if you have set "executor = "affinity-pool-executor""
# Underlying thread pool implementation is akka.dispatch.affinity.AffinityPool.
# This executor is classified as "ApiMayChange".
affinity-pool-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 4
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 0.8
# Max number of threads to cap factor-based parallelism number to.
parallelism-max = 64
# Each worker in the pool uses a separate bounded MPSC queue. This value
# indicates the upper bound of the queue. Whenever an attempt to enqueue
# a task is made and the queue does not have capacity to accomodate
# the task, the rejection handler created by the factory specified
# in "rejection-handler-factory" is invoked.
task-queue-size = 512
# FQCN of the Rejection handler used in the pool.
# Must have an empty public constructor constructor with and needs
# to implement akka.actor.affinity.RejectionHandlerFactory.
rejection-handler-factory = "akka.dispatch.affinity.DefaultRejectionHandlerFactory"
# Level of CPU time used, on a scale between 1 and 10, during backoff/idle.
# The tradeoff is that to have low latency more CPU time must be used to be
# able to react quickly on incoming messages or send as fast as possible after
# backoff backpressure.
# Level 1 strongly prefer low CPU consumption over low latency.
# Level 10 strongly prefer low latency over low CPU consumption.
idle-cpu-level = 5
# Internally the AffinityPool uses two methods to determine which task
# queue to allocate a Runnable to:
# - map based - maintains a round robin counter and a map of Runnable
# hashcodes to queues that they have been associated with. This ensures
# maximum fairness in terms of work distribution, meaning that each worker
# will get approximately equal amount of mailboxes to execute. This is suitable
# in cases where we have a small number of actors that will be scheduled on
# the pool and we want to ensure the maximum possible utilization of the
# available threads.
# - hash based - the task - queue in which the runnable should go is determined
# by using an uniformly distributed int to int hash function which uses the
# hash code of the Runnable as an input. This is preferred in situations where we
# have enough number of distinct actors to ensure statistically uniform
# distribution of work across threads or we are ready to sacrifice the
# former for the added benefit of avoiding map look-ups.
#
# The value serves as a threshold which determines the point at which the
# pool switches from the first to the second work distribution schemes.
# For example, if the value is set to 128, the pool can observe up to
# 128 unique actors and schedule their mailboxes using the map based
# approach. Once this number is reached the pool switches to hash based
# task distribution mode. If the value is set to 0, the map based
# work distribution approach is disabled and only the hash based is
# used irrespective of the number of unique actors. Valid range is
# 0 to 2048 (inclusive)
fair-work-distribution-threshold = 128
}
# This will be used if you have set "executor = "fork-join-executor"" # This will be used if you have set "executor = "fork-join-executor""
# Underlying thread pool implementation is akka.dispatch.forkjoin.ForkJoinPool # Underlying thread pool implementation is akka.dispatch.forkjoin.ForkJoinPool
fork-join-executor { fork-join-executor {

View file

@ -8,11 +8,13 @@ import java.util.concurrent._
import java.{ util ju } import java.{ util ju }
import akka.actor._ import akka.actor._
import akka.dispatch.affinity.AffinityPoolConfigurator
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.event.EventStream import akka.event.EventStream
import akka.event.Logging.{ Debug, Error, LogEventException } import akka.event.Logging.{ Debug, Error, LogEventException }
import akka.util.{ Index, Unsafe } import akka.util.{ Index, Unsafe }
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
@ -327,6 +329,8 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
def configurator(executor: String): ExecutorServiceConfigurator = executor match { def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "affinity-pool-executor" new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)
case fqcn case fqcn
val args = List( val args = List(
classOf[Config] config, classOf[Config] config,

View file

@ -0,0 +1,423 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.dispatch.affinity
import java.lang.invoke.MethodHandles
import java.lang.invoke.MethodType.methodType
import java.util
import java.util.Collections
import java.util.concurrent.TimeUnit.MICROSECONDS
import java.util.concurrent._
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import java.util.concurrent.locks.{ Lock, LockSupport, ReentrantLock }
import akka.dispatch._
import akka.util.Helpers.Requiring
import com.typesafe.config.Config
import scala.annotation.tailrec
import java.lang.Integer.reverseBytes
import akka.annotation.InternalApi
import akka.annotation.ApiMayChange
import akka.util.ImmutableIntMap
import akka.util.OptionVal
import scala.collection.mutable
import scala.util.control.NonFatal
/**
* An [[ExecutorService]] implementation which pins actor to particular threads
* and guaranteed that an actor's [[Mailbox]] will e run on the thread it used
* it used to run. In situations where we see a lot of cache ping pong, this
* might lead to significant performance improvements.
*
* INTERNAL API
*/
@InternalApi
@ApiMayChange
private[akka] class AffinityPool(
parallelism: Int,
affinityGroupSize: Int,
tf: ThreadFactory,
idleCpuLevel: Int,
fairDistributionThreshold: Int,
rejectionHandler: RejectionHandler)
extends AbstractExecutorService {
if (parallelism <= 0)
throw new IllegalArgumentException("Size of pool cannot be less or equal to 0")
// Held while starting/shutting down workers/pool in order to make
// the operations linear and enforce atomicity. An example of that would be
// adding a worker. We want the creation of the worker, addition
// to the set and starting to worker to be an atomic action. Using
// a concurrent set would not give us that
private val bookKeepingLock = new ReentrantLock()
// condition used for awaiting termination
private val terminationCondition = bookKeepingLock.newCondition()
// indicates the current state of the pool
@volatile final private var poolState: PoolState = Running
private final val workQueues = Array.fill(parallelism)(new BoundedTaskQueue(affinityGroupSize))
private final val workers = mutable.Set[ThreadPoolWorker]()
// a counter that gets incremented every time a task is queued
private val executionCounter: AtomicInteger = new AtomicInteger(0)
// maps a runnable to an index of a worker queue
private val runnableToWorkerQueueIndex = new AtomicReference(ImmutableIntMap.empty)
private def locked[T](l: Lock)(body: T) = {
l.lock()
try {
body
} finally {
l.unlock()
}
}
private def getQueueForRunnable(command: Runnable) = {
val runnableHash = command.hashCode()
def sbhash(i: Int) = reverseBytes(i * 0x9e3775cd) * 0x9e3775cd
def getNext = executionCounter.incrementAndGet() % parallelism
def updateIfAbsentAndGetQueueIndex(
workerQueueIndex: AtomicReference[ImmutableIntMap],
runnableHash: Int, queueIndex: Int): Int = {
@tailrec
def updateIndex(): Unit = {
val prev = workerQueueIndex.get()
if (!runnableToWorkerQueueIndex.compareAndSet(prev, prev.updateIfAbsent(runnableHash, queueIndex))) {
updateIndex()
}
}
updateIndex()
workerQueueIndex.get().get(runnableHash) // can safely call get..
}
val workQueueIndex =
if (fairDistributionThreshold == 0 || runnableToWorkerQueueIndex.get().size > fairDistributionThreshold)
Math.abs(sbhash(runnableHash)) % parallelism
else
updateIfAbsentAndGetQueueIndex(runnableToWorkerQueueIndex, runnableHash, getNext)
workQueues(workQueueIndex)
}
//fires up initial workers
locked(bookKeepingLock) {
workQueues.foreach(q addWorker(workers, q))
}
private def addWorker(workers: mutable.Set[ThreadPoolWorker], q: BoundedTaskQueue): Unit = {
locked(bookKeepingLock) {
val worker = new ThreadPoolWorker(q, new IdleStrategy(idleCpuLevel))
workers.add(worker)
worker.startWorker()
}
}
private def tryEnqueue(command: Runnable) = getQueueForRunnable(command).add(command)
/**
* Each worker should go through that method while terminating.
* In turn each worker is responsible for modifying the pool
* state accordingly. For example if this is the last worker
* and the queue is empty and we are in a ShuttingDown state
* the worker can transition the pool to ShutDown and attempt
* termination
*
* Furthermore, if this worker has experienced abrupt termination
* due to an exception being thrown in user code, the worker is
* responsible for adding one more worker to compensate for its
* own termination
*
*/
private def onWorkerExit(w: ThreadPoolWorker, abruptTermination: Boolean): Unit =
locked(bookKeepingLock) {
workers.remove(w)
if (workers.isEmpty && !abruptTermination && poolState >= ShuttingDown) {
poolState = ShutDown // transition to shutdown and try to transition to termination
attemptPoolTermination()
}
if (abruptTermination && poolState == Running)
addWorker(workers, w.q)
}
override def execute(command: Runnable): Unit = {
if (command == null)
throw new NullPointerException
if (!(poolState == Running && tryEnqueue(command)))
rejectionHandler.reject(command, this)
}
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
// recurse until pool is terminated or time out reached
@tailrec
def awaitTermination(nanos: Long): Boolean = {
if (poolState == Terminated) true
else if (nanos <= 0) false
else awaitTermination(terminationCondition.awaitNanos(nanos))
}
locked(bookKeepingLock) {
// need to hold the lock to avoid monitor exception
awaitTermination(unit.toNanos(timeout))
}
}
private def attemptPoolTermination() =
locked(bookKeepingLock) {
if (workers.isEmpty && poolState == ShutDown) {
poolState = Terminated
terminationCondition.signalAll()
}
}
override def shutdownNow(): util.List[Runnable] =
locked(bookKeepingLock) {
poolState = ShutDown
workers.foreach(_.stop())
attemptPoolTermination()
// like in the FJ executor, we do not provide facility to obtain tasks that were in queue
Collections.emptyList[Runnable]()
}
override def shutdown(): Unit =
locked(bookKeepingLock) {
poolState = ShuttingDown
// interrupts only idle workers.. so others can process their queues
workers.foreach(_.stopIfIdle())
attemptPoolTermination()
}
override def isShutdown: Boolean = poolState == ShutDown
override def isTerminated: Boolean = poolState == Terminated
// Following are auxiliary class and trait definitions
private sealed trait PoolState extends Ordered[PoolState] {
def order: Int
override def compare(that: PoolState): Int = this.order compareTo that.order
}
// accepts new tasks and processes tasks that are enqueued
private case object Running extends PoolState {
override val order: Int = 0
}
// does not accept new tasks, processes tasks that are in the queue
private case object ShuttingDown extends PoolState {
override def order: Int = 1
}
// does not accept new tasks, does not process tasks in queue
private case object ShutDown extends PoolState {
override def order: Int = 2
}
// all threads have been stopped, does not process tasks and does not accept new ones
private case object Terminated extends PoolState {
override def order: Int = 3
}
private final class IdleStrategy(val idleCpuLevel: Int) {
private val maxSpins = 1100 * idleCpuLevel - 1000
private val maxYields = 5 * idleCpuLevel
private val minParkPeriodNs = 1
private val maxParkPeriodNs = MICROSECONDS.toNanos(280 - 30 * idleCpuLevel)
private sealed trait State
private case object NotIdle extends State
private case object Spinning extends State
private case object Yielding extends State
private case object Parking extends State
private var state: State = NotIdle
private var spins = 0L
private var yields = 0L
private var parkPeriodNs = 0L
private val onSpinWaitMethodHandle =
try
OptionVal.Some(MethodHandles.lookup.findStatic(classOf[Thread], "onSpinWait", methodType(classOf[Unit])))
catch {
case NonFatal(_) OptionVal.None
}
def idle(): Unit = {
state match {
case NotIdle
state = Spinning
spins += 1
case Spinning
onSpinWaitMethodHandle match {
case OptionVal.Some(m) m.invokeExact()
case OptionVal.None
}
spins += 1
if (spins > maxSpins) {
state = Yielding
yields = 0
}
case Yielding
yields += 1
if (yields > maxYields) {
state = Parking
parkPeriodNs = minParkPeriodNs
} else Thread.`yield`()
case Parking
LockSupport.parkNanos(parkPeriodNs)
parkPeriodNs = Math.min(parkPeriodNs << 1, maxParkPeriodNs)
}
}
def reset(): Unit = {
spins = 0
yields = 0
state = NotIdle
}
}
private final class BoundedTaskQueue(capacity: Int) extends AbstractBoundedNodeQueue[Runnable](capacity)
private final class ThreadPoolWorker(val q: BoundedTaskQueue, val idleStrategy: IdleStrategy) extends Runnable {
private sealed trait WorkerState
private case object NotStarted extends WorkerState
private case object InExecution extends WorkerState
private case object Idle extends WorkerState
val thread: Thread = tf.newThread(this)
@volatile private var workerState: WorkerState = NotStarted
def startWorker(): Unit = {
workerState = Idle
thread.start()
}
private def runCommand(command: Runnable) = {
workerState = InExecution
try
command.run()
finally
workerState = Idle
}
override def run(): Unit = {
/**
* Determines whether the worker can keep running or not.
* In order to continue polling for tasks three conditions
* need to be satisfied:
*
* 1) pool state is less than Shutting down or queue
* is not empty (e.g pool state is ShuttingDown but there are still messages to process)
*
* 2) the thread backing up this worker has not been interrupted
*
* 3) We are not in ShutDown state (in which we should not be processing any enqueued tasks)
*/
def shouldKeepRunning =
(poolState < ShuttingDown || !q.isEmpty) &&
!Thread.interrupted() &&
poolState != ShutDown
var abruptTermination = true
try {
while (shouldKeepRunning) {
val c = q.poll()
if (c ne null) {
runCommand(c)
idleStrategy.reset()
} else // if not wait for a bit
idleStrategy.idle()
}
abruptTermination = false // if we have reached here, our termination is not due to an exception
} finally {
onWorkerExit(this, abruptTermination)
}
}
def stop() =
if (!thread.isInterrupted && workerState != NotStarted)
thread.interrupt()
def stopIfIdle() =
if (workerState == Idle)
stop()
}
}
/**
* INTERNAL API
*/
@InternalApi
@ApiMayChange
private[akka] final class AffinityPoolConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
private final val MaxfairDistributionThreshold = 2048
private val poolSize = ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max"))
private val taskQueueSize = config.getInt("task-queue-size")
private val idleCpuLevel = config.getInt("idle-cpu-level").requiring(level
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
private val fairDistributionThreshold = config.getInt("fair-work-distribution-threshold").requiring(thr
0 <= thr && thr <= MaxfairDistributionThreshold, s"idle-cpu-level must be between 1 and $MaxfairDistributionThreshold")
private val rejectionHandlerFCQN = config.getString("rejection-handler-factory")
private val rejectionHandlerFactory = prerequisites.dynamicAccess
.createInstanceFor[RejectionHandlerFactory](rejectionHandlerFCQN, Nil).recover({
case exception throw new IllegalArgumentException(
s"Cannot instantiate RejectionHandlerFactory (rejection-handler-factory = $rejectionHandlerFCQN),make sure it has an accessible empty constructor",
exception)
}).get
override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ExecutorServiceFactory {
override def createExecutorService: ExecutorService =
new AffinityPool(poolSize, taskQueueSize, threadFactory, idleCpuLevel, fairDistributionThreshold, rejectionHandlerFactory.create())
}
}
trait RejectionHandler {
def reject(command: Runnable, service: ExecutorService)
}
trait RejectionHandlerFactory {
def create(): RejectionHandler
}
/**
* INTERNAL API
*/
@InternalApi
@ApiMayChange
private[akka] final class DefaultRejectionHandlerFactory extends RejectionHandlerFactory {
private class DefaultRejectionHandler extends RejectionHandler {
override def reject(command: Runnable, service: ExecutorService): Unit =
throw new RejectedExecutionException(s"Task ${command.toString} rejected from ${service.toString}")
}
override def create(): RejectionHandler = new DefaultRejectionHandler()
}

View file

@ -0,0 +1,141 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
import java.util.Arrays
import akka.annotation.InternalApi
import scala.annotation.tailrec
/**
* INTERNAL API
*/
@InternalApi private[akka] object ImmutableIntMap {
val empty: ImmutableIntMap =
new ImmutableIntMap(Array.emptyIntArray, Array.empty)
private final val MaxScanLength = 10
}
/**
* INTERNAL API
* Specialized Map for primitive `Int` keys to avoid allocations (boxing).
* Keys and values are backed by arrays and lookup is performed with binary
* search. It's intended for rather small (<1000) maps.
*/
@InternalApi private[akka] final class ImmutableIntMap private (
private val keys: Array[Int], private val values: Array[Int]) {
final val size: Int = keys.length
/**
* Worst case `O(log n)`, allocation free.
*/
def get(key: Int): Int = {
val i = Arrays.binarySearch(keys, key)
if (i >= 0) values(i)
else Int.MinValue // cant use null, cant use OptionVal, other option is to throw an exception...
}
/**
* Worst case `O(log n)`, allocation free.
*/
def contains(key: Int): Boolean = {
Arrays.binarySearch(keys, key) >= 0
}
def updateIfAbsent(key: Int, value: Int): ImmutableIntMap = {
if (contains(key))
this
else
updated(key, value)
}
/**
* Worst case `O(log n)`, creates new `ImmutableIntMap`
* with copies of the internal arrays for the keys and
* values.
*/
def updated(key: Int, value: Int): ImmutableIntMap = {
if (size == 0)
new ImmutableIntMap(Array(key), Array(value))
else {
val i = Arrays.binarySearch(keys, key)
if (i >= 0) {
// existing key, replace value
val newValues = new Array[Int](values.length)
System.arraycopy(values, 0, newValues, 0, values.length)
newValues(i) = value
new ImmutableIntMap(keys, newValues)
} else {
// insert the entry at the right position, and keep the arrays sorted
val j = -(i + 1)
val newKeys = new Array[Int](size + 1)
System.arraycopy(keys, 0, newKeys, 0, j)
newKeys(j) = key
System.arraycopy(keys, j, newKeys, j + 1, keys.length - j)
val newValues = new Array[Int](size + 1)
System.arraycopy(values, 0, newValues, 0, j)
newValues(j) = value
System.arraycopy(values, j, newValues, j + 1, values.length - j)
new ImmutableIntMap(newKeys, newValues)
}
}
}
def remove(key: Int): ImmutableIntMap = {
val i = Arrays.binarySearch(keys, key)
if (i >= 0) {
if (size == 1)
ImmutableIntMap.empty
else {
val newKeys = new Array[Int](size - 1)
System.arraycopy(keys, 0, newKeys, 0, i)
System.arraycopy(keys, i + 1, newKeys, i, keys.length - i - 1)
val newValues = new Array[Int](size - 1)
System.arraycopy(values, 0, newValues, 0, i)
System.arraycopy(values, i + 1, newValues, i, values.length - i - 1)
new ImmutableIntMap(newKeys, newValues)
}
} else
this
}
/**
* All keys
*/
def keysIterator: Iterator[Int] =
keys.iterator
override def toString: String =
keysIterator.map(key s"$key -> ${get(key)}").mkString("ImmutableIntMap(", ", ", ")")
override def hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, keys)
result = HashCode.hash(result, values)
result
}
override def equals(obj: Any): Boolean = obj match {
case other: ImmutableIntMap
if (other eq this) true
else if (size != other.size) false
else if (size == 0 && other.size == 0) true
else {
@tailrec def check(i: Int): Boolean = {
if (i < 0) true
else if (keys(i) == other.keys(i) && values(i) == other.values(i))
check(i - 1) // recur, next elem
else false
}
check(size - 1)
}
case _ false
}
}

View file

@ -0,0 +1,94 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.TimeUnit
import akka.actor.BenchmarkActors._
import akka.actor.ForkJoinActorBenchmark.cores
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class AffinityPoolComparativeBenchmark {
@Param(Array("1"))
var throughPut = 0
@Param(Array("affinity-dispatcher", "default-fj-dispatcher", "fixed-size-dispatcher"))
var dispatcher = ""
@Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default"
var mailbox = ""
final val numThreads, numActors = 8
final val numMessagesPerActorPair = 2000000
final val totalNumberOfMessages = numMessagesPerActorPair * (numActors / 2)
implicit var system: ActorSystem = _
@Setup(Level.Trial)
def setup(): Unit = {
requireRightNumberOfCores(cores)
val mailboxConf = mailbox match {
case "default" => ""
case "SingleConsumerOnlyUnboundedMailbox" =>
s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
}
system = ActorSystem("AffinityPoolComparativeBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
| actor {
| default-fj-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = $numThreads
| parallelism-factor = 1.0
| parallelism-max = $numThreads
| }
| throughput = $throughPut
| }
|
| fixed-size-dispatcher {
| executor = "thread-pool-executor"
| thread-pool-executor {
| fixed-pool-size = $numThreads
| }
| throughput = $throughPut
| }
|
| affinity-dispatcher {
| executor = "affinity-pool-executor"
| affinity-pool-executor {
| parallelism-min = $numThreads
| parallelism-factor = 1.0
| parallelism-max = $numThreads
| task-queue-size = 512
| idle-cpu-level = 5
| fair-work-distribution-threshold = 2048
| }
| throughput = $throughPut
| }
| $mailboxConf
| }
| }
""".stripMargin
))
}
@TearDown(Level.Trial)
def shutdown(): Unit = tearDownSystem()
@Benchmark
@OperationsPerInvocation(totalNumberOfMessages)
def pingPong(): Unit = benchmarkPingPongActors(numMessagesPerActorPair, numActors, dispatcher, throughPut, timeout)
}

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.TimeUnit
import akka.actor.BenchmarkActors._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class AffinityPoolIdleCPULevelBenchmark {
final val numThreads, numActors = 8
final val numMessagesPerActorPair = 2000000
final val totalNumberOfMessages = numMessagesPerActorPair * (numActors / 2)
implicit var system: ActorSystem = _
@Param(Array("1", "3", "5", "7", "10"))
var idleCPULevel = ""
@Param(Array("25"))
var throughPut = 0
@Setup(Level.Trial)
def setup(): Unit = {
requireRightNumberOfCores(numThreads)
system = ActorSystem("AffinityPoolWaitingStrategyBenchmark", ConfigFactory.parseString(
s""" | akka {
| log-dead-letters = off
| actor {
| affinity-dispatcher {
| executor = "affinity-pool-executor"
| affinity-pool-executor {
| parallelism-min = $numThreads
| parallelism-factor = 1.0
| parallelism-max = $numThreads
| task-queue-size = 512
| idle-cpu-level = $idleCPULevel
| fair-work-distribution-threshold = 2048
| }
| throughput = $throughPut
| }
|
| }
| }
""".stripMargin
))
}
@TearDown(Level.Trial)
def shutdown(): Unit = tearDownSystem()
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@OperationsPerInvocation(8000000)
def pingPong(): Unit = benchmarkPingPongActors(numMessagesPerActorPair, numActors, "affinity-dispatcher", throughPut, timeout)
}

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.actor.BenchmarkActors._
import akka.actor.ForkJoinActorBenchmark.cores
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 20, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class AffinityPoolRequestResponseBenchmark {
@Param(Array("1", "5", "50"))
var throughPut = 0
@Param(Array("affinity-dispatcher", "default-fj-dispatcher", "fixed-size-dispatcher"))
var dispatcher = ""
@Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default"
var mailbox = ""
final val numThreads, numActors = 8
final val numQueriesPerActor = 400000
final val totalNumberOfMessages = numQueriesPerActor * numActors
final val numUsersInDB = 300000
implicit var system: ActorSystem = _
var actors: Vector[(ActorRef, ActorRef)] = null
var latch: CountDownLatch = null
@Setup(Level.Trial)
def setup(): Unit = {
requireRightNumberOfCores(cores)
val mailboxConf = mailbox match {
case "default" => ""
case "SingleConsumerOnlyUnboundedMailbox" =>
s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
}
system = ActorSystem("AffinityPoolComparativeBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
| actor {
| default-fj-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = $numThreads
| parallelism-factor = 1.0
| parallelism-max = $numThreads
| }
| throughput = $throughPut
| }
|
| fixed-size-dispatcher {
| executor = "thread-pool-executor"
| thread-pool-executor {
| fixed-pool-size = $numThreads
| }
| throughput = $throughPut
| }
|
| affinity-dispatcher {
| executor = "affinity-pool-executor"
| affinity-pool-executor {
| parallelism-min = $numThreads
| parallelism-factor = 1.0
| parallelism-max = $numThreads
| task-queue-size = 512
| idle-cpu-level = 5
| fair-work-distribution-threshold = 2048
| }
| throughput = $throughPut
| }
| $mailboxConf
| }
| }
""".stripMargin
))
}
@TearDown(Level.Trial)
def shutdown(): Unit = tearDownSystem()
@Setup(Level.Invocation)
def setupActors(): Unit = {
val (_actors, _latch) = RequestResponseActors.startUserQueryActorPairs(numActors, numQueriesPerActor, numUsersInDB, dispatcher)
actors = _actors
latch = _latch
}
@Benchmark
@OperationsPerInvocation(totalNumberOfMessages)
def queryUserServiceActor(): Unit = {
val startNanoTime = System.nanoTime()
RequestResponseActors.initiateQuerySimulation(actors, throughPut * 2)
latch.await(BenchmarkActors.timeout.toSeconds, TimeUnit.SECONDS)
BenchmarkActors.printProgress(totalNumberOfMessages, numActors, startNanoTime)
}
}

View file

@ -0,0 +1,102 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
object BenchmarkActors {
val timeout = 30.seconds
case object Message
case object Stop
class PingPong(val messages: Int, latch: CountDownLatch) extends Actor {
var left = messages / 2
def receive = {
case Message =>
if (left == 0) {
latch.countDown()
context stop self
}
sender() ! Message
left -= 1
}
}
object PingPong {
def props(messages: Int, latch: CountDownLatch) = Props(new PingPong(messages, latch))
}
class Pipe(next: Option[ActorRef]) extends Actor {
def receive = {
case Message =>
if (next.isDefined) next.get forward Message
case Stop =>
context stop self
if (next.isDefined) next.get forward Stop
}
}
object Pipe {
def props(next: Option[ActorRef]) = Props(new Pipe(next))
}
private def startPingPongActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String)(implicit system: ActorSystem) = {
val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numPairs * 2)
val actors = for {
i <- (1 to numPairs).toVector
} yield {
val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher))
val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher))
(ping, pong)
}
(actors, latch)
}
private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int) = {
for {
(ping, pong) <- refs
_ <- 1 to inFlight
} {
ping.tell(Message, pong)
}
}
def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = {
val durationMicros = (System.nanoTime() - startNanoTime) / 1000
println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " +
f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s")
}
def requireRightNumberOfCores(numCores: Int) =
require(
Runtime.getRuntime.availableProcessors == numCores,
s"Update the cores constant to ${Runtime.getRuntime.availableProcessors}"
)
def benchmarkPingPongActors(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, throughPut: Int, shutdownTimeout: Duration)(implicit system: ActorSystem): Unit = {
val numPairs = numActors / 2
val totalNumMessages = numPairs * numMessagesPerActorPair
val (actors, latch) = startPingPongActorPairs(numMessagesPerActorPair, numPairs, dispatcher)
val startNanoTime = System.nanoTime()
initiatePingPongForPairs(actors, inFlight = throughPut * 2)
latch.await(shutdownTimeout.toSeconds, TimeUnit.SECONDS)
printProgress(totalNumMessages, numActors, startNanoTime)
}
def tearDownSystem()(implicit system: ActorSystem): Unit = {
system.terminate()
Await.ready(system.whenTerminated, timeout)
}
}

View file

@ -6,46 +6,61 @@ package akka.actor
import akka.testkit.TestProbe import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations._
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.Await import scala.concurrent.Await
import scala.annotation.tailrec import scala.annotation.tailrec
import BenchmarkActors._
import scala.concurrent.duration._
@State(Scope.Benchmark) @State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput)) @BenchmarkMode(Array(Mode.Throughput))
@Fork(1) @Fork(1)
@Threads(1) @Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) @Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 20) @Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class ForkJoinActorBenchmark { class ForkJoinActorBenchmark {
import ForkJoinActorBenchmark._ import ForkJoinActorBenchmark._
@Param(Array("5")) @Param(Array("5", "25", "50"))
var tpt = 0 var tpt = 0
@Param(Array("1")) @Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr
var threads = "" var threads = ""
@Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default"
var mailbox = ""
implicit var system: ActorSystem = _ implicit var system: ActorSystem = _
@Setup(Level.Trial) @Setup(Level.Trial)
def setup(): Unit = { def setup(): Unit = {
requireRightNumberOfCores(cores)
val mailboxConf = mailbox match {
case "default" => ""
case "SingleConsumerOnlyUnboundedMailbox" =>
s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
}
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString( system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
s"""| akka { s"""
| log-dead-letters = off akka {
| actor { log-dead-letters = off
| default-dispatcher { actor {
| executor = "fork-join-executor" default-dispatcher {
| fork-join-executor { executor = "fork-join-executor"
| parallelism-min = 1 fork-join-executor {
| parallelism-factor = $threads parallelism-min = $threads
| parallelism-max = 64 parallelism-factor = 1
| } parallelism-max = $threads
| throughput = $tpt }
| } throughput = $tpt
| } }
| } $mailboxConf
""".stripMargin }
}
"""
)) ))
} }
@ -55,110 +70,31 @@ class ForkJoinActorBenchmark {
Await.ready(system.whenTerminated, 15.seconds) Await.ready(system.whenTerminated, 15.seconds)
} }
var pingPongActors: Vector[(ActorRef, ActorRef)] = null @Benchmark
var pingPongLessActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null @OperationsPerInvocation(totalMessagesTwoActors)
var pingPongSameNumberOfActorsAsCoresActors: Vector[(ActorRef, ActorRef)] = null def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "default-dispatcher", tpt, timeout)
var pingPongMoreActorsThanCoresActors: Vector[(ActorRef, ActorRef)] = null
@Setup(Level.Invocation)
def setupActors(): Unit = {
pingPongActors = startActors(1)
pingPongLessActorsThanCoresActors = startActors(lessThanCoresActorPairs)
pingPongSameNumberOfActorsAsCoresActors = startActors(cores / 2)
pingPongMoreActorsThanCoresActors = startActors(moreThanCoresActorPairs)
}
@TearDown(Level.Invocation)
def tearDownActors(): Unit = {
stopActors(pingPongActors)
stopActors(pingPongLessActorsThanCoresActors)
stopActors(pingPongSameNumberOfActorsAsCoresActors)
stopActors(pingPongMoreActorsThanCoresActors)
}
def startActors(n: Int): Vector[(ActorRef, ActorRef)] = {
for {
i <- (1 to n).toVector
} yield {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
(ping, pong)
}
}
def stopActors(refs: Vector[(ActorRef, ActorRef)]): Unit = {
if (refs ne null) {
refs.foreach {
case (ping, pong) =>
system.stop(ping)
system.stop(pong)
}
awaitTerminated(refs)
}
}
def awaitTerminated(refs: Vector[(ActorRef, ActorRef)]): Unit = {
if (refs ne null) refs.foreach {
case (ping, pong) =>
val p = TestProbe()
p.watch(ping)
p.expectTerminated(ping, timeout)
p.watch(pong)
p.expectTerminated(pong, timeout)
}
}
def sendMessage(refs: Vector[(ActorRef, ActorRef)], inFlight: Int): Unit = {
for {
(ping, pong) <- refs
_ <- 1 to inFlight
} {
ping.tell(Message, pong)
}
}
@Benchmark @Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong(): Unit = {
// only one message in flight
sendMessage(pingPongActors, inFlight = 1)
awaitTerminated(pingPongActors)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesLessThanCores) @OperationsPerInvocation(totalMessagesLessThanCores)
def pingPongLessActorsThanCores(): Unit = { def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "default-dispatcher", tpt, timeout)
sendMessage(pingPongLessActorsThanCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongLessActorsThanCoresActors)
}
@Benchmark @Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesSameAsCores) @OperationsPerInvocation(totalMessagesSameAsCores)
def pingPongSameNumberOfActorsAsCores(): Unit = { def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "default-dispatcher", tpt, timeout)
sendMessage(pingPongSameNumberOfActorsAsCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongSameNumberOfActorsAsCoresActors)
}
@Benchmark @Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(totalMessagesMoreThanCores) @OperationsPerInvocation(totalMessagesMoreThanCores)
def pingPongMoreActorsThanCores(): Unit = { def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "default-dispatcher", tpt, timeout)
sendMessage(pingPongMoreActorsThanCoresActors, inFlight = 2 * tpt)
awaitTerminated(pingPongMoreActorsThanCoresActors)
}
// @Benchmark // @Benchmark
// @Measurement(timeUnit = TimeUnit.MILLISECONDS) // @Measurement(timeUnit = TimeUnit.MILLISECONDS)
// @OperationsPerInvocation(messages) // @OperationsPerInvocation(messages)
def floodPipe(): Unit = { def floodPipe(): Unit = {
val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None)) val end = system.actorOf(Props(classOf[Pipe], None))
val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end))) val middle = system.actorOf(Props(classOf[Pipe], Some(end)))
val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle))) val penultimate = system.actorOf(Props(classOf[Pipe], Some(middle)))
val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate))) val beginning = system.actorOf(Props(classOf[Pipe], Some(penultimate)))
val p = TestProbe() val p = TestProbe()
p.watch(end) p.watch(end)
@ -178,39 +114,23 @@ class ForkJoinActorBenchmark {
} }
object ForkJoinActorBenchmark { object ForkJoinActorBenchmark {
case object Stop final val messages = 2000000 // messages per actor pair
case object Message
final val timeout = 15.seconds
final val messages = 400000
// Constants because they are used in annotations
// update according to cpu // update according to cpu
final val cores = 8 final val cores = 8
// 2 actors per final val coresStr = "8"
final val moreThanCoresActorPairs = cores * 2 final val cores2xStr = "16"
final val lessThanCoresActorPairs = (cores / 2) - 1 final val cores4xStr = "24"
final val totalMessagesMoreThanCores = moreThanCoresActorPairs * messages
final val totalMessagesLessThanCores = lessThanCoresActorPairs * messages
final val totalMessagesSameAsCores = cores * messages
class Pipe(next: Option[ActorRef]) extends Actor { final val twoActors = 2
def receive = { final val moreThanCoresActors = cores * 2
case Message => final val lessThanCoresActors = cores / 2
if (next.isDefined) next.get forward Message final val sameAsCoresActors = cores
case Stop =>
context stop self
if (next.isDefined) next.get forward Stop
}
}
class PingPong extends Actor {
var left = messages / 2
def receive = {
case Message =>
if (left <= 1) final val totalMessagesTwoActors = messages
context stop self final val totalMessagesMoreThanCores = (moreThanCoresActors * messages) / 2
final val totalMessagesLessThanCores = (lessThanCoresActors * messages) / 2
final val totalMessagesSameAsCores = (sameAsCoresActors * messages) / 2
sender() ! Message }
left -= 1
}
}
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
import scala.util.Random
object RequestResponseActors {
case class Request(userId: Int)
case class User(userId: Int, firstName: String, lastName: String, ssn: Int, friends: Seq[Int])
class UserQueryActor(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) extends Actor {
private var left = numQueries
private val receivedUsers: mutable.Map[Int, User] = mutable.Map()
private val randGenerator = new Random()
override def receive: Receive = {
case u: User => {
receivedUsers.put(u.userId, u)
if (left == 0) {
latch.countDown()
context stop self
} else {
sender() ! Request(randGenerator.nextInt(numUsersInDB))
}
left -= 1
}
}
}
object UserQueryActor {
def props(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) = {
Props(new UserQueryActor(latch, numQueries, numUsersInDB))
}
}
class UserServiceActor(userDb: Map[Int, User], latch: CountDownLatch, numQueries: Int) extends Actor {
private var left = numQueries
def receive = {
case Request(id) =>
userDb.get(id) match {
case Some(u) => sender() ! u
case None =>
}
if (left == 0) {
latch.countDown()
context stop self
}
left -= 1
}
}
object UserServiceActor {
def props(latch: CountDownLatch, numQueries: Int, numUsersInDB: Int) = {
val r = new Random()
val users = for {
id <- 0 until numUsersInDB
firstName = r.nextString(5)
lastName = r.nextString(7)
ssn = r.nextInt()
friendIds = for { _ <- 0 until 5 } yield r.nextInt(numUsersInDB)
} yield id -> User(id, firstName, lastName, ssn, friendIds)
Props(new UserServiceActor(users.toMap, latch, numQueries))
}
}
def startUserQueryActorPairs(numActors: Int, numQueriesPerActor: Int, numUsersInDBPerActor: Int, dispatcher: String)(implicit system: ActorSystem) = {
val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numActors)
val actorsPairs = for {
i <- (1 to (numActors / 2)).toVector
userQueryActor = system.actorOf(UserQueryActor.props(latch, numQueriesPerActor, numUsersInDBPerActor).withDispatcher(fullPathToDispatcher))
userServiceActor = system.actorOf(UserServiceActor.props(latch, numQueriesPerActor, numUsersInDBPerActor).withDispatcher(fullPathToDispatcher))
} yield (userQueryActor, userServiceActor)
(actorsPairs, latch)
}
def initiateQuerySimulation(requestResponseActorPairs: Seq[(ActorRef, ActorRef)], inFlight: Int) = {
for {
(queryActor, serviceActor) <- requestResponseActorPairs
i <- 1 to inFlight
} {
serviceActor.tell(Request(i), queryActor)
}
}
}

View file

@ -143,6 +143,13 @@ Another example that uses the thread pool based on the number of cores (e.g. for
<!--same config text for Scala & Java--> <!--same config text for Scala & Java-->
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-thread-pool-dispatcher-config } @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-thread-pool-dispatcher-config }
A different kind of dispatcher that uses an affinity pool may increase throughput in cases where there is relatively small
number of actors that maintain some internal state. The affinity pool tries its best to ensure that an actor is always
scheduled to run on the same thread. This actor to thread pinning aims to decrease CPU cache misses which can result
in significant throughput improvement.
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #affinity-pool-dispatcher-config }
Configuring a `PinnedDispatcher`: Configuring a `PinnedDispatcher`:
<!--same config text for Scala & Java--> <!--same config text for Scala & Java-->

View file

@ -92,6 +92,28 @@ object DispatcherDocSpec {
} }
//#my-thread-pool-dispatcher-config //#my-thread-pool-dispatcher-config
//#affinity-pool-dispatcher-config
affinity-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "affinity-pool-executor"
# Configuration for the thread pool
affinity-pool-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 16
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
//#affinity-pool-dispatcher-config
//#fixed-pool-size-dispatcher-config //#fixed-pool-size-dispatcher-config
blocking-io-dispatcher { blocking-io-dispatcher {
type = Dispatcher type = Dispatcher
@ -294,6 +316,14 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
//#defining-pinned-dispatcher //#defining-pinned-dispatcher
} }
"defining affinity-pool dispatcher" in {
val context = system
//#defining-affinity-pool-dispatcher
val myActor =
context.actorOf(Props[MyActor].withDispatcher("affinity-pool-dispatcher"), "myactor4")
//#defining-affinity-pool-dispatcher
}
"looking up a dispatcher" in { "looking up a dispatcher" in {
//#lookup //#lookup
// for use with Futures, Scheduler, etc. // for use with Futures, Scheduler, etc.

View file

@ -75,7 +75,6 @@ object Dependencies {
val aeronDriver = "io.aeron" % "aeron-driver" % aeronVersion // ApacheV2 val aeronDriver = "io.aeron" % "aeron-driver" % aeronVersion // ApacheV2
val aeronClient = "io.aeron" % "aeron-client" % aeronVersion // ApacheV2 val aeronClient = "io.aeron" % "aeron-client" % aeronVersion // ApacheV2
object Docs { object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.3" % "test" val sprayJson = "io.spray" %% "spray-json" % "1.3.3" % "test"
val gson = "com.google.code.gson" % "gson" % "2.8.0" % "test" val gson = "com.google.code.gson" % "gson" % "2.8.0" % "test"