Ported ActorBenchmark to typed (#24815)

This commit is contained in:
Johan Andrén 2018-04-06 08:56:34 +02:00 committed by GitHub
parent dd884117c1
commit 30423e3b84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 212 additions and 0 deletions

View file

@ -0,0 +1,106 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.typed.scaladsl.AskPattern._
object TypedActorBenchmark {
// Constants because they are used in annotations
final val threads = 8 // update according to cpu
final val numMessagesPerActorPair = 1000000 // messages per actor pair
final val numActors = 512
final val totalMessages = numMessagesPerActorPair * numActors / 2
final val timeout = 30.seconds
}
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class TypedActorBenchmark {
import TypedActorBenchmark._
import TypedBenchmarkActors._
@Param(Array("50"))
var tpt = 0
@Param(Array("50"))
var batchSize = 0
@Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox"))
var mailbox = ""
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))
var dispatcher = ""
implicit var system: ActorSystem[Start] = _
implicit val askTimeout = akka.util.Timeout(timeout)
implicit def scheduler = system.scheduler
@Setup(Level.Trial)
def setup(): Unit = {
akka.actor.BenchmarkActors.requireRightNumberOfCores(threads)
system = ActorSystem(
TypedBenchmarkActors.echoActorsSupervisor(numMessagesPerActorPair, numActors, dispatcher, batchSize, timeout),
"TypedActorBenchmark",
ConfigFactory.parseString(
s"""
akka.actor {
default-mailbox.mailbox-capacity = 512
fjp-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
}
throughput = $tpt
mailbox-type = "$mailbox"
}
affinity-dispatcher {
executor = "affinity-pool-executor"
affinity-pool-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
task-queue-size = 512
idle-cpu-level = 5
fair-work-distribution.threshold = 2048
}
throughput = $tpt
mailbox-type = "$mailbox"
}
}
"""
))
}
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OperationsPerInvocation(totalMessages)
def echo(): Unit = {
Await.result(system ? Start, timeout)
}
}

View file

@ -0,0 +1,106 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.Done
import akka.actor.typed.scaladsl.{ Behaviors, MutableBehavior }
import akka.actor.typed.scaladsl.{ ActorContext SActorContext }
import scala.concurrent.duration._
object TypedBenchmarkActors {
// to avoid benchmark to be dominated by allocations of message
// we pass the respondTo actor ref into the behavior
final case object Message
private def echoBehavior(respondTo: ActorRef[Message.type]): Behavior[Message.type] = Behaviors.receive { (ctx, msg)
respondTo ! Message
Behaviors.same
}
private def echoSender(messagesPerPair: Int, onDone: ActorRef[Done], batchSize: Int, childProps: Props): Behavior[Message.type] =
Behaviors.setup { ctx
val echo = ctx.spawn(echoBehavior(ctx.self), "echo", childProps)
var left = messagesPerPair / 2
var batch = 0
def sendBatch(): Boolean = {
if (left > 0) {
var i = 0
while (i < batchSize) {
echo ! Message
i += 1
}
left -= batchSize
batch = batchSize
true
} else
false
}
Behaviors.receiveMessage { msg
batch -= 1
if (batch <= 0 && !sendBatch()) {
onDone ! Done
Behaviors.stopped
} else {
Behaviors.same
}
}
}
case class Start(respondTo: ActorRef[Completed])
case class Completed(startNanoTime: Long)
def echoActorsSupervisor(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, batchSize: Int,
shutdownTimeout: FiniteDuration): Behavior[Start] =
Behaviors.receive { (ctx, msg)
msg match {
case Start(respondTo)
// note: no protection against accidentally running bench sessions in paralell
val sessionBehavior = startEchoBenchSession(numMessagesPerActorPair, numActors, dispatcher, batchSize, respondTo)
ctx.spawnAnonymous(sessionBehavior)
Behaviors.same
}
}
private def startEchoBenchSession(messagesPerPair: Int, numActors: Int, dispatcher: String,
batchSize: Int, respondTo: ActorRef[Completed]): Behavior[Unit] = {
val numPairs = numActors / 2
Behaviors.setup[Any] { ctx
val props = Props.empty.withDispatcherFromConfig("akka.actor." + dispatcher)
val pairs = (1 to numPairs).map { _
ctx.spawnAnonymous(echoSender(messagesPerPair, ctx.self.narrow[Done], batchSize, props), props)
}
val startNanoTime = System.nanoTime()
pairs.foreach(_ ! Message)
var interactionsLeft = numPairs
Behaviors.receiveMessage {
case Done
interactionsLeft -= 1
if (interactionsLeft == 0) {
val totalNumMessages = numPairs * messagesPerPair
printProgress(totalNumMessages, numActors, startNanoTime)
respondTo ! Completed(startNanoTime)
Behaviors.stopped
} else {
Behaviors.same
}
}
}.narrow[Unit]
}
private def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = {
val durationMicros = (System.nanoTime() - startNanoTime) / 1000
println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " +
f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s")
}
}