Typed version of the ForkJoinActorBenchmark #25986

This commit is contained in:
Johan Andrén 2019-03-20 14:52:00 +01:00 committed by GitHub
parent f8be7ddb6f
commit 322b07a2fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 220 additions and 2 deletions

View file

@ -39,7 +39,7 @@ class TypedActorBenchmark {
@Param(Array("50"))
var batchSize = 0
@Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox"))
@Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox"))
var mailbox = ""
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))

View file

@ -4,8 +4,13 @@
package akka.actor.typed
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.Done
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
object TypedBenchmarkActors {
@ -110,7 +115,84 @@ object TypedBenchmarkActors {
.narrow[Unit]
}
private def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = {
sealed trait PingPongCommand
case class StartPingPong(
messagesPerPair: Int,
numActors: Int,
dispatcher: String,
throughPut: Int,
shutdownTimeout: Duration,
replyTo: ActorRef[PingPongStarted])
extends PingPongCommand
case class PingPongStarted(completedLatch: CountDownLatch, startNanoTime: Long, totalNumMessages: Int)
case object Stop extends PingPongCommand
def benchmarkPingPongSupervisor(): Behavior[PingPongCommand] = {
Behaviors.setup { ctx =>
Behaviors.receiveMessage {
case StartPingPong(numMessagesPerActorPair, numActors, dispatcher, throughput, _, replyTo) =>
val numPairs = numActors / 2
val totalNumMessages = numPairs * numMessagesPerActorPair
val (actors, latch) = startPingPongActorPairs(ctx, numMessagesPerActorPair, numPairs, dispatcher)
val startNanoTime = System.nanoTime()
replyTo ! PingPongStarted(latch, startNanoTime, totalNumMessages)
initiatePingPongForPairs(actors, inFlight = throughput * 2)
Behaviors.same
case Stop =>
ctx.children.foreach(ctx.stop _)
Behaviors.same
}
}
}
private def initiatePingPongForPairs(refs: Vector[(ActorRef[Message], ActorRef[Message])], inFlight: Int): Unit = {
for {
(ping, pong) <- refs
val message = Message(pong) // just allocate once
_ <- 1 to inFlight
} {
ping ! message
}
}
private def startPingPongActorPairs(
ctx: ActorContext[_],
messagesPerPair: Int,
numPairs: Int,
dispatcher: String): (Vector[(ActorRef[Message], ActorRef[Message])], CountDownLatch) = {
val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numPairs * 2)
val pingPongBehavior = newPingPongBehavior(messagesPerPair, latch)
val pingPongProps = Props.empty.withDispatcherFromConfig(fullPathToDispatcher)
val actors = for {
i <- (1 to numPairs).toVector
} yield {
val ping = ctx.spawnAnonymous(pingPongBehavior, pingPongProps)
val pong = ctx.spawnAnonymous(pingPongBehavior, pingPongProps)
(ping, pong)
}
(actors, latch)
}
case class Message(replyTo: ActorRef[Message])
private def newPingPongBehavior(messagesPerPair: Int, latch: CountDownLatch): Behavior[Message] =
Behaviors.setup { ctx =>
var left = messagesPerPair / 2
val pong = Message(ctx.self) // we re-use a single pong to avoid alloc on each msg
Behaviors.receiveMessage[Message] {
case Message(replyTo) =>
replyTo ! pong
if (left == 0) {
latch.countDown()
Behaviors.stopped // note that this will likely lead to dead letters
} else {
left -= 1
Behaviors.same
}
}
}
def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = {
val durationMicros = (System.nanoTime() - startNanoTime) / 1000
println(
f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " +

View file

@ -0,0 +1,136 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import java.util.concurrent.TimeUnit
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Level
import org.openjdk.jmh.annotations.OperationsPerInvocation
import org.openjdk.jmh.annotations.Param
import org.openjdk.jmh.annotations.Setup
import akka.actor.typed.scaladsl.AskPattern._
import org.openjdk.jmh.annotations.BenchmarkMode
import org.openjdk.jmh.annotations.Fork
import org.openjdk.jmh.annotations.Measurement
import org.openjdk.jmh.annotations.Mode
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.annotations.TearDown
import org.openjdk.jmh.annotations.Threads
import org.openjdk.jmh.annotations.Warmup
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.Future
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class TypedForkJoinActorBenchmark {
import TypedForkJoinActorBenchmark._
import TypedBenchmarkActors._
@Param(Array("50"))
var tpt = 0
@Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr
var threads = ""
@Param(
Array(
"akka.dispatch.UnboundedMailbox",
"akka.dispatch.SingleConsumerOnlyUnboundedMailbox",
"akka.actor.ManyToOneArrayMailbox",
"akka.actor.JCToolsMailbox"))
var mailbox = ""
implicit var system: ActorSystem[PingPongCommand] = _
@Setup(Level.Trial)
def setup(): Unit = {
akka.actor.BenchmarkActors.requireRightNumberOfCores(cores)
system = ActorSystem(
TypedBenchmarkActors.benchmarkPingPongSupervisor(),
"TypedForkJoinActorBenchmark",
ConfigFactory.parseString(s"""
akka.actor {
default-mailbox.mailbox-capacity = 512
fjp-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
}
throughput = $tpt
mailbox-type = "$mailbox"
}
}
"""))
}
@Benchmark
@OperationsPerInvocation(totalMessagesLessThanCores)
def pingPongLessActorsThanCores(): Unit =
runPingPongBench(messages, lessThanCoresActors, "fjp-dispatcher", tpt)
@Benchmark
@OperationsPerInvocation(totalMessagesSameAsCores)
def pingPongSameNumberOfActorsAsCores(): Unit =
runPingPongBench(messages, sameAsCoresActors, "fjp-dispatcher", tpt)
@Benchmark
@OperationsPerInvocation(totalMessagesMoreThanCores)
def pingPongMoreActorsThanCores(): Unit =
runPingPongBench(messages, moreThanCoresActors, "fjp-dispatcher", tpt)
def runPingPongBench(numMessages: Int, numActors: Int, dispatcher: String, tpt: Int): Unit = {
val response: Future[PingPongStarted] =
system.ask[PingPongStarted](ref => StartPingPong(numMessages, numActors, dispatcher, tpt, timeout, ref))(
timeout,
system.scheduler)
val started = Await.result(response, timeout)
started.completedLatch.await(timeout.toSeconds, TimeUnit.SECONDS)
printProgress(started.totalNumMessages, numActors, started.startNanoTime)
system ! Stop
}
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
}
object TypedForkJoinActorBenchmark {
final val messages = 2000000 // messages per actor pair
val timeout = 30.seconds
// Constants because they are used in annotations
// update according to cpu
final val cores = 8
final val coresStr = "8"
final val cores2xStr = "16"
final val cores4xStr = "24"
final val twoActors = 2
final val moreThanCoresActors = cores * 2
final val lessThanCoresActors = cores / 2
final val sameAsCoresActors = cores
final val totalMessagesTwoActors = messages
final val totalMessagesMoreThanCores = (moreThanCoresActors * messages) / 2
final val totalMessagesLessThanCores = (lessThanCoresActors * messages) / 2
final val totalMessagesSameAsCores = (sameAsCoresActors * messages) / 2
}