From 30423e3b845afb92a8d5250cb37ae8bcd13ea677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 6 Apr 2018 08:56:34 +0200 Subject: [PATCH] Ported ActorBenchmark to typed (#24815) --- .../actor/typed/TypedActorBenchmark.scala | 106 ++++++++++++++++++ .../actor/typed/TypedBenchmarkActors.scala | 106 ++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 akka-bench-jmh/src/main/scala/akka/actor/typed/TypedActorBenchmark.scala create mode 100644 akka-bench-jmh/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala diff --git a/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedActorBenchmark.scala new file mode 100644 index 0000000000..a05cee427b --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedActorBenchmark.scala @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ + +package akka.actor.typed + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor.typed.scaladsl.AskPattern._ + +object TypedActorBenchmark { + // Constants because they are used in annotations + final val threads = 8 // update according to cpu + final val numMessagesPerActorPair = 1000000 // messages per actor pair + + final val numActors = 512 + final val totalMessages = numMessagesPerActorPair * numActors / 2 + final val timeout = 30.seconds +} + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class TypedActorBenchmark { + import TypedActorBenchmark._ + import TypedBenchmarkActors._ + + @Param(Array("50")) + var tpt = 0 + + @Param(Array("50")) + var batchSize = 0 + + @Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox")) + var mailbox = "" + + @Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher")) + var dispatcher = "" + + implicit var system: ActorSystem[Start] = _ + + implicit val askTimeout = akka.util.Timeout(timeout) + implicit def scheduler = system.scheduler + + @Setup(Level.Trial) + def setup(): Unit = { + akka.actor.BenchmarkActors.requireRightNumberOfCores(threads) + system = ActorSystem( + TypedBenchmarkActors.echoActorsSupervisor(numMessagesPerActorPair, numActors, dispatcher, batchSize, timeout), + "TypedActorBenchmark", + ConfigFactory.parseString( + s""" + akka.actor { + + default-mailbox.mailbox-capacity = 512 + + fjp-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = $threads + parallelism-factor = 1.0 + parallelism-max = $threads + } + throughput = $tpt + mailbox-type = "$mailbox" + } + affinity-dispatcher { + executor = "affinity-pool-executor" + affinity-pool-executor { + parallelism-min = $threads + parallelism-factor = 1.0 + parallelism-max = $threads + task-queue-size = 512 + idle-cpu-level = 5 + fair-work-distribution.threshold = 2048 + } + throughput = $tpt + mailbox-type = "$mailbox" + } + } + """ + )) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OperationsPerInvocation(totalMessages) + def echo(): Unit = { + Await.result(system ? Start, timeout) + } + +} + diff --git a/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala b/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala new file mode 100644 index 0000000000..9dc367dbca --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.actor.typed + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import akka.Done +import akka.actor.typed.scaladsl.{ Behaviors, MutableBehavior } +import akka.actor.typed.scaladsl.{ ActorContext ⇒ SActorContext } + +import scala.concurrent.duration._ + +object TypedBenchmarkActors { + + // to avoid benchmark to be dominated by allocations of message + // we pass the respondTo actor ref into the behavior + final case object Message + + private def echoBehavior(respondTo: ActorRef[Message.type]): Behavior[Message.type] = Behaviors.receive { (ctx, msg) ⇒ + respondTo ! Message + Behaviors.same + } + + private def echoSender(messagesPerPair: Int, onDone: ActorRef[Done], batchSize: Int, childProps: Props): Behavior[Message.type] = + Behaviors.setup { ctx ⇒ + val echo = ctx.spawn(echoBehavior(ctx.self), "echo", childProps) + var left = messagesPerPair / 2 + var batch = 0 + + def sendBatch(): Boolean = { + if (left > 0) { + var i = 0 + while (i < batchSize) { + echo ! Message + i += 1 + } + left -= batchSize + batch = batchSize + true + } else + false + } + + Behaviors.receiveMessage { msg ⇒ + batch -= 1 + if (batch <= 0 && !sendBatch()) { + onDone ! Done + Behaviors.stopped + } else { + Behaviors.same + } + } + } + + case class Start(respondTo: ActorRef[Completed]) + case class Completed(startNanoTime: Long) + + def echoActorsSupervisor(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, batchSize: Int, + shutdownTimeout: FiniteDuration): Behavior[Start] = + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case Start(respondTo) ⇒ + // note: no protection against accidentally running bench sessions in paralell + val sessionBehavior = startEchoBenchSession(numMessagesPerActorPair, numActors, dispatcher, batchSize, respondTo) + ctx.spawnAnonymous(sessionBehavior) + Behaviors.same + } + } + + private def startEchoBenchSession(messagesPerPair: Int, numActors: Int, dispatcher: String, + batchSize: Int, respondTo: ActorRef[Completed]): Behavior[Unit] = { + + val numPairs = numActors / 2 + + Behaviors.setup[Any] { ctx ⇒ + val props = Props.empty.withDispatcherFromConfig("akka.actor." + dispatcher) + val pairs = (1 to numPairs).map { _ ⇒ + ctx.spawnAnonymous(echoSender(messagesPerPair, ctx.self.narrow[Done], batchSize, props), props) + } + val startNanoTime = System.nanoTime() + pairs.foreach(_ ! Message) + var interactionsLeft = numPairs + Behaviors.receiveMessage { + case Done ⇒ + interactionsLeft -= 1 + if (interactionsLeft == 0) { + val totalNumMessages = numPairs * messagesPerPair + printProgress(totalNumMessages, numActors, startNanoTime) + respondTo ! Completed(startNanoTime) + Behaviors.stopped + } else { + Behaviors.same + } + + } + }.narrow[Unit] + } + + private def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = { + val durationMicros = (System.nanoTime() - startNanoTime) / 1000 + println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " + + f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s") + } +}