diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala new file mode 100644 index 0000000000..a59866e3c5 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.actor + +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.annotation.tailrec +import BenchmarkActors._ +import scala.concurrent.duration._ + +object ActorBenchmark { + // Constants because they are used in annotations + final val threads = 8 // update according to cpu + final val numMessagesPerActorPair = 1000000 // messages per actor pair + + final val numActors = 512 + final val totalMessages = numMessagesPerActorPair * numActors / 2 +} + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class ActorBenchmark { + import ActorBenchmark._ + + @Param(Array("50")) + var tpt = 0 + + @Param(Array("50")) + var batchSize = 0 + + @Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox")) + var mailbox = "" + + @Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher")) + var dispatcher = "" + + implicit var system: ActorSystem = _ + + @Setup(Level.Trial) + def setup(): Unit = { + + requireRightNumberOfCores(threads) + + system = ActorSystem("ActorBenchmark", ConfigFactory.parseString( + s""" + akka.actor { + + default-mailbox.mailbox-capacity = 512 + + fjp-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = $threads + parallelism-factor = 1.0 + parallelism-max = $threads + } + throughput = $tpt + mailbox-type = "$mailbox" + } + affinity-dispatcher { + executor = "affinity-pool-executor" + affinity-pool-executor { + parallelism-min = $threads + parallelism-factor = 1.0 + parallelism-max = $threads + task-queue-size = 512 + idle-cpu-level = 5 + fair-work-distribution.threshold = 2048 + } + throughput = $tpt + mailbox-type = "$mailbox" + } + } + """ + )) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OperationsPerInvocation(totalMessages) + def echo(): Unit = + benchmarkEchoActors(numMessagesPerActorPair, numActors, dispatcher, batchSize, timeout) + +} + diff --git a/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala index 618ae35868..8d3d1e9f86 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala @@ -16,8 +16,8 @@ object BenchmarkActors { case object Message case object Stop - class PingPong(val messages: Int, latch: CountDownLatch) extends Actor { - var left = messages / 2 + class PingPong(val messagesPerPair: Int, latch: CountDownLatch) extends Actor { + var left = messagesPerPair / 2 def receive = { case Message => @@ -32,7 +32,51 @@ object BenchmarkActors { } object PingPong { - def props(messages: Int, latch: CountDownLatch) = Props(new PingPong(messages, latch)) + def props(messagesPerPair: Int, latch: CountDownLatch) = Props(new PingPong(messagesPerPair, latch)) + } + + class Echo extends Actor { + def receive = { + case Message => + sender() ! Message + } + } + + object EchoSender { + def props(messagesPerPair: Int, latch: CountDownLatch, batchSize: Int): Props = + Props(new EchoSender(messagesPerPair, latch, batchSize)) + } + + class EchoSender(messagesPerPair: Int, latch: CountDownLatch, batchSize: Int) extends Actor { + private val echo = context.actorOf(Props[Echo].withDispatcher(context.props.dispatcher), "echo") + + private var left = messagesPerPair / 2 + private var batch = 0 + + def receive = { + case Message => + batch -= 1 + if (batch <= 0) { + if (!sendBatch()) { + latch.countDown() + context.stop(self) + } + } + } + + private def sendBatch(): Boolean = { + if (left > 0) { + var i = 0 + while (i < batchSize) { + echo ! Message + i += 1 + } + left -= batchSize + batch = batchSize + true + } else + false + } } class Pipe(next: Option[ActorRef]) extends Actor { @@ -71,6 +115,21 @@ object BenchmarkActors { } } + private def startEchoActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String, + batchSize: Int)(implicit system: ActorSystem) = { + + val fullPathToDispatcher = "akka.actor." + dispatcher + val latch = new CountDownLatch(numPairs) + val actors = (1 to numPairs).map { _ => + system.actorOf(EchoSender.props(messagesPerPair, latch, batchSize).withDispatcher(fullPathToDispatcher)) + }.toVector + (actors, latch) + } + + private def initiateEchoPairs(refs: Vector[ActorRef]) = { + refs.foreach(_ ! Message) + } + def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = { val durationMicros = (System.nanoTime() - startNanoTime) / 1000 println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " + @@ -93,6 +152,16 @@ object BenchmarkActors { printProgress(totalNumMessages, numActors, startNanoTime) } + def benchmarkEchoActors(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, batchSize: Int, shutdownTimeout: Duration)(implicit system: ActorSystem): Unit = { + val numPairs = numActors / 2 + val totalNumMessages = numPairs * numMessagesPerActorPair + val (actors, latch) = startEchoActorPairs(numMessagesPerActorPair, numPairs, dispatcher, batchSize) + val startNanoTime = System.nanoTime() + initiateEchoPairs(actors) + latch.await(shutdownTimeout.toSeconds, TimeUnit.SECONDS) + printProgress(totalNumMessages, numActors, startNanoTime) + } + def tearDownSystem()(implicit system: ActorSystem): Unit = { system.terminate() Await.ready(system.whenTerminated, timeout) diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ManyToOneArrayMailbox.scala b/akka-bench-jmh/src/main/scala/akka/actor/ManyToOneArrayMailbox.scala new file mode 100644 index 0000000000..f605317e03 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/ManyToOneArrayMailbox.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor + +import akka.dispatch.MailboxType +import akka.dispatch.ProducesMessageQueue +import akka.dispatch.BoundedNodeMessageQueue +import com.typesafe.config.Config +import akka.dispatch.MessageQueue +import akka.dispatch.BoundedMessageQueueSemantics +import scala.concurrent.duration.Duration +import akka.dispatch.Envelope +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue +import scala.annotation.tailrec + +/** + * ManyToOneArrayMailbox is a high-performance, multiple-producer single-consumer, bounded MailboxType, + * Noteworthy is that it discards overflow as DeadLetters. + * + * It can't have multiple consumers, which rules out using it with BalancingPool (BalancingDispatcher) for instance. + * + * NOTE: ManyToOneArrayMailbox does not use `mailbox-push-timeout-time` as it is non-blocking. + */ +case class ManyToOneArrayMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] { + + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity")) + + if (capacity < 0) throw new IllegalArgumentException("The capacity for ManyToOneArrayMailbox can not be negative") + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new ManyToOneArrayMessageQueue(capacity) +} + +/** + * Lock-free bounded non-blocking multiple-producer single-consumer queue. + * Discards overflowing messages into DeadLetters. + * Allocation free, using `org.agrona.concurrent.ManyToOneConcurrentArrayQueue`. + */ +class ManyToOneArrayMessageQueue(capacity: Int) extends MessageQueue with BoundedMessageQueueSemantics { + final def pushTimeOut: Duration = Duration.Undefined + + private val queue = new ManyToOneConcurrentArrayQueue[Envelope](capacity) + + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = + if (!queue.add(handle)) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender + ) + + final def dequeue(): Envelope = queue.poll() + + final def numberOfMessages: Int = queue.size() + + final def hasMessages: Boolean = !queue.isEmpty() + + @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { + val envelope = dequeue() + if (envelope ne null) { + deadLetters.enqueue(owner, envelope) + cleanUp(owner, deadLetters) + } + } +}