First JCTools mailbox benchmark, #25044

This commit is contained in:
Patrik Nordwall 2018-05-03 10:16:56 +02:00 committed by GitHub
commit 1d3ce1734b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 22 deletions

View file

@ -35,7 +35,8 @@ class ActorBenchmark {
@Param(Array("50"))
var batchSize = 0
@Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox"))
//@Param(Array("akka.actor.ManyToOneArrayMailbox"))
@Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox", "akka.actor.JCToolsMailbox"))
var mailbox = ""
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))

View file

@ -22,13 +22,13 @@ import scala.concurrent.duration._
class ForkJoinActorBenchmark {
import ForkJoinActorBenchmark._
@Param(Array("5", "25", "50"))
@Param(Array("50"))
var tpt = 0
@Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr
var threads = ""
@Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default"
@Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox", "akka.actor.JCToolsMailbox"))
var mailbox = ""
implicit var system: ActorSystem = _
@ -38,27 +38,22 @@ class ForkJoinActorBenchmark {
requireRightNumberOfCores(cores)
val mailboxConf = mailbox match {
case "default" ""
case "SingleConsumerOnlyUnboundedMailbox"
s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
}
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
s"""
akka {
log-dead-letters = off
log-dead-letters = off
default-mailbox.mailbox-capacity = 512
actor {
default-dispatcher {
fjp-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $threads
parallelism-factor = 1
parallelism-factor = 1.0
parallelism-max = $threads
}
throughput = $tpt
mailbox-type = "$mailbox"
}
$mailboxConf
}
}
"""
@ -71,21 +66,21 @@ class ForkJoinActorBenchmark {
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OperationsPerInvocation(totalMessagesTwoActors)
def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "default-dispatcher", tpt, timeout)
// @Benchmark
// @OperationsPerInvocation(totalMessagesTwoActors)
// def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "fjp-dispatcher", tpt, timeout)
@Benchmark
@OperationsPerInvocation(totalMessagesLessThanCores)
def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "default-dispatcher", tpt, timeout)
def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "fjp-dispatcher", tpt, timeout)
@Benchmark
@OperationsPerInvocation(totalMessagesSameAsCores)
def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "default-dispatcher", tpt, timeout)
// @Benchmark
// @OperationsPerInvocation(totalMessagesSameAsCores)
// def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "fjp-dispatcher", tpt, timeout)
@Benchmark
@OperationsPerInvocation(totalMessagesMoreThanCores)
def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "default-dispatcher", tpt, timeout)
def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "fjp-dispatcher", tpt, timeout)
// @Benchmark
// @Measurement(timeUnit = TimeUnit.MILLISECONDS)

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
import akka.dispatch.BoundedNodeMessageQueue
import com.typesafe.config.Config
import akka.dispatch.MessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
import scala.concurrent.duration.Duration
import akka.dispatch.Envelope
import org.jctools.queues.MpscGrowableArrayQueue
import scala.annotation.tailrec
case class JCToolsMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"))
if (capacity < 0) throw new IllegalArgumentException("The capacity for JCToolsMailbox can not be negative")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new JCToolsMessageQueue(capacity)
}
class JCToolsMessageQueue(capacity: Int) extends MpscGrowableArrayQueue[Envelope](capacity) with MessageQueue with BoundedMessageQueueSemantics {
final def pushTimeOut: Duration = Duration.Undefined
final def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (!offer(handle))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender
)
final def dequeue(): Envelope = poll()
final def numberOfMessages: Int = size()
final def hasMessages: Boolean = !isEmpty()
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
val envelope = dequeue()
if (envelope ne null) {
deadLetters.enqueue(owner, envelope)
cleanUp(owner, deadLetters)
}
}
}

View file

@ -58,6 +58,8 @@ object Dependencies {
val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2
val jctools = "org.jctools" % "jctools-core" % "2.1.1" // ApacheV2
// reactive streams
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % "1.0.2" // CC0
@ -172,7 +174,7 @@ object Dependencies {
val contrib = l ++= Seq(Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools)
// akka stream