new actor benchmark (#23489)

* and array based mailbox in bench project
This commit is contained in:
Patrik Nordwall 2017-09-11 16:57:57 +02:00 committed by Konrad `ktoso` Malawski
parent 207b43a139
commit fa4ce6bbe7
3 changed files with 234 additions and 3 deletions

View file

@ -0,0 +1,98 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.annotation.tailrec
import BenchmarkActors._
import scala.concurrent.duration._
object ActorBenchmark {
// Constants because they are used in annotations
final val threads = 8 // update according to cpu
final val numMessagesPerActorPair = 1000000 // messages per actor pair
final val numActors = 512
final val totalMessages = numMessagesPerActorPair * numActors / 2
}
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class ActorBenchmark {
import ActorBenchmark._
@Param(Array("50"))
var tpt = 0
@Param(Array("50"))
var batchSize = 0
@Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox"))
var mailbox = ""
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))
var dispatcher = ""
implicit var system: ActorSystem = _
@Setup(Level.Trial)
def setup(): Unit = {
requireRightNumberOfCores(threads)
system = ActorSystem("ActorBenchmark", ConfigFactory.parseString(
s"""
akka.actor {
default-mailbox.mailbox-capacity = 512
fjp-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
}
throughput = $tpt
mailbox-type = "$mailbox"
}
affinity-dispatcher {
executor = "affinity-pool-executor"
affinity-pool-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
task-queue-size = 512
idle-cpu-level = 5
fair-work-distribution.threshold = 2048
}
throughput = $tpt
mailbox-type = "$mailbox"
}
}
"""
))
}
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OperationsPerInvocation(totalMessages)
def echo(): Unit =
benchmarkEchoActors(numMessagesPerActorPair, numActors, dispatcher, batchSize, timeout)
}

View file

@ -16,8 +16,8 @@ object BenchmarkActors {
case object Message
case object Stop
class PingPong(val messages: Int, latch: CountDownLatch) extends Actor {
var left = messages / 2
class PingPong(val messagesPerPair: Int, latch: CountDownLatch) extends Actor {
var left = messagesPerPair / 2
def receive = {
case Message =>
@ -32,7 +32,51 @@ object BenchmarkActors {
}
object PingPong {
def props(messages: Int, latch: CountDownLatch) = Props(new PingPong(messages, latch))
def props(messagesPerPair: Int, latch: CountDownLatch) = Props(new PingPong(messagesPerPair, latch))
}
class Echo extends Actor {
def receive = {
case Message =>
sender() ! Message
}
}
object EchoSender {
def props(messagesPerPair: Int, latch: CountDownLatch, batchSize: Int): Props =
Props(new EchoSender(messagesPerPair, latch, batchSize))
}
class EchoSender(messagesPerPair: Int, latch: CountDownLatch, batchSize: Int) extends Actor {
private val echo = context.actorOf(Props[Echo].withDispatcher(context.props.dispatcher), "echo")
private var left = messagesPerPair / 2
private var batch = 0
def receive = {
case Message =>
batch -= 1
if (batch <= 0) {
if (!sendBatch()) {
latch.countDown()
context.stop(self)
}
}
}
private def sendBatch(): Boolean = {
if (left > 0) {
var i = 0
while (i < batchSize) {
echo ! Message
i += 1
}
left -= batchSize
batch = batchSize
true
} else
false
}
}
class Pipe(next: Option[ActorRef]) extends Actor {
@ -71,6 +115,21 @@ object BenchmarkActors {
}
}
private def startEchoActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String,
batchSize: Int)(implicit system: ActorSystem) = {
val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numPairs)
val actors = (1 to numPairs).map { _ =>
system.actorOf(EchoSender.props(messagesPerPair, latch, batchSize).withDispatcher(fullPathToDispatcher))
}.toVector
(actors, latch)
}
private def initiateEchoPairs(refs: Vector[ActorRef]) = {
refs.foreach(_ ! Message)
}
def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = {
val durationMicros = (System.nanoTime() - startNanoTime) / 1000
println(f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " +
@ -93,6 +152,16 @@ object BenchmarkActors {
printProgress(totalNumMessages, numActors, startNanoTime)
}
def benchmarkEchoActors(numMessagesPerActorPair: Int, numActors: Int, dispatcher: String, batchSize: Int, shutdownTimeout: Duration)(implicit system: ActorSystem): Unit = {
val numPairs = numActors / 2
val totalNumMessages = numPairs * numMessagesPerActorPair
val (actors, latch) = startEchoActorPairs(numMessagesPerActorPair, numPairs, dispatcher, batchSize)
val startNanoTime = System.nanoTime()
initiateEchoPairs(actors)
latch.await(shutdownTimeout.toSeconds, TimeUnit.SECONDS)
printProgress(totalNumMessages, numActors, startNanoTime)
}
def tearDownSystem()(implicit system: ActorSystem): Unit = {
system.terminate()
Await.ready(system.whenTerminated, timeout)

View file

@ -0,0 +1,64 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
import akka.dispatch.BoundedNodeMessageQueue
import com.typesafe.config.Config
import akka.dispatch.MessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
import scala.concurrent.duration.Duration
import akka.dispatch.Envelope
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import scala.annotation.tailrec
/**
* ManyToOneArrayMailbox is a high-performance, multiple-producer single-consumer, bounded MailboxType,
* Noteworthy is that it discards overflow as DeadLetters.
*
* It can't have multiple consumers, which rules out using it with BalancingPool (BalancingDispatcher) for instance.
*
* NOTE: ManyToOneArrayMailbox does not use `mailbox-push-timeout-time` as it is non-blocking.
*/
case class ManyToOneArrayMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"))
if (capacity < 0) throw new IllegalArgumentException("The capacity for ManyToOneArrayMailbox can not be negative")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new ManyToOneArrayMessageQueue(capacity)
}
/**
* Lock-free bounded non-blocking multiple-producer single-consumer queue.
* Discards overflowing messages into DeadLetters.
* Allocation free, using `org.agrona.concurrent.ManyToOneConcurrentArrayQueue`.
*/
class ManyToOneArrayMessageQueue(capacity: Int) extends MessageQueue with BoundedMessageQueueSemantics {
final def pushTimeOut: Duration = Duration.Undefined
private val queue = new ManyToOneConcurrentArrayQueue[Envelope](capacity)
final def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (!queue.add(handle))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender
)
final def dequeue(): Envelope = queue.poll()
final def numberOfMessages: Int = queue.size()
final def hasMessages: Boolean = !queue.isEmpty()
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
val envelope = dequeue()
if (envelope ne null) {
deadLetters.enqueue(owner, envelope)
cleanUp(owner, deadLetters)
}
}
}