diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala index f3543d0f68..b528163eea 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala @@ -35,7 +35,8 @@ class ActorBenchmark { @Param(Array("50")) var batchSize = 0 - @Param(Array("akka.actor.ManyToOneArrayMailbox")) // @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox")) + //@Param(Array("akka.actor.ManyToOneArrayMailbox")) + @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox", "akka.actor.JCToolsMailbox")) var mailbox = "" @Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher")) diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala index c076a56b8b..202d310f67 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala @@ -22,13 +22,13 @@ import scala.concurrent.duration._ class ForkJoinActorBenchmark { import ForkJoinActorBenchmark._ - @Param(Array("5", "25", "50")) + @Param(Array("50")) var tpt = 0 @Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr var threads = "" - @Param(Array("SingleConsumerOnlyUnboundedMailbox")) //"default" + @Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.actor.ManyToOneArrayMailbox", "akka.actor.JCToolsMailbox")) var mailbox = "" implicit var system: ActorSystem = _ @@ -38,27 +38,22 @@ class ForkJoinActorBenchmark { requireRightNumberOfCores(cores) - val mailboxConf = mailbox match { - case "default" ⇒ "" - case "SingleConsumerOnlyUnboundedMailbox" ⇒ - s"""default-mailbox.mailbox-type = "${classOf[akka.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}"""" - } - system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString( s""" akka { - log-dead-letters = off + log-dead-letters = off + default-mailbox.mailbox-capacity = 512 actor { - default-dispatcher { + fjp-dispatcher { executor = "fork-join-executor" fork-join-executor { parallelism-min = $threads - parallelism-factor = 1 + parallelism-factor = 1.0 parallelism-max = $threads } throughput = $tpt + mailbox-type = "$mailbox" } - $mailboxConf } } """ @@ -71,21 +66,21 @@ class ForkJoinActorBenchmark { Await.ready(system.whenTerminated, 15.seconds) } - @Benchmark - @OperationsPerInvocation(totalMessagesTwoActors) - def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "default-dispatcher", tpt, timeout) + // @Benchmark + // @OperationsPerInvocation(totalMessagesTwoActors) + // def pingPong(): Unit = benchmarkPingPongActors(messages, twoActors, "fjp-dispatcher", tpt, timeout) @Benchmark @OperationsPerInvocation(totalMessagesLessThanCores) - def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "default-dispatcher", tpt, timeout) + def pingPongLessActorsThanCores(): Unit = benchmarkPingPongActors(messages, lessThanCoresActors, "fjp-dispatcher", tpt, timeout) - @Benchmark - @OperationsPerInvocation(totalMessagesSameAsCores) - def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "default-dispatcher", tpt, timeout) + // @Benchmark + // @OperationsPerInvocation(totalMessagesSameAsCores) + // def pingPongSameNumberOfActorsAsCores(): Unit = benchmarkPingPongActors(messages, sameAsCoresActors, "fjp-dispatcher", tpt, timeout) @Benchmark @OperationsPerInvocation(totalMessagesMoreThanCores) - def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "default-dispatcher", tpt, timeout) + def pingPongMoreActorsThanCores(): Unit = benchmarkPingPongActors(messages, moreThanCoresActors, "fjp-dispatcher", tpt, timeout) // @Benchmark // @Measurement(timeUnit = TimeUnit.MILLISECONDS) diff --git a/akka-bench-jmh/src/main/scala/akka/actor/JCToolsMailbox.scala b/akka-bench-jmh/src/main/scala/akka/actor/JCToolsMailbox.scala new file mode 100644 index 0000000000..462059e2a1 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/JCToolsMailbox.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.actor + +import akka.dispatch.MailboxType +import akka.dispatch.ProducesMessageQueue +import akka.dispatch.BoundedNodeMessageQueue +import com.typesafe.config.Config +import akka.dispatch.MessageQueue +import akka.dispatch.BoundedMessageQueueSemantics +import scala.concurrent.duration.Duration +import akka.dispatch.Envelope +import org.jctools.queues.MpscGrowableArrayQueue +import scala.annotation.tailrec + +case class JCToolsMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] { + + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity")) + + if (capacity < 0) throw new IllegalArgumentException("The capacity for JCToolsMailbox can not be negative") + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new JCToolsMessageQueue(capacity) +} + +class JCToolsMessageQueue(capacity: Int) extends MpscGrowableArrayQueue[Envelope](capacity) with MessageQueue with BoundedMessageQueueSemantics { + final def pushTimeOut: Duration = Duration.Undefined + + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = + if (!offer(handle)) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender + ) + + final def dequeue(): Envelope = poll() + + final def numberOfMessages: Int = size() + + final def hasMessages: Boolean = !isEmpty() + + @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { + val envelope = dequeue() + if (envelope ne null) { + deadLetters.enqueue(owner, envelope) + cleanUp(owner, deadLetters) + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 345bfb3f5f..d64c55eee7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -58,6 +58,8 @@ object Dependencies { val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2 + val jctools = "org.jctools" % "jctools-core" % "2.1.1" // ApacheV2 + // reactive streams val reactiveStreams = "org.reactivestreams" % "reactive-streams" % "1.0.2" // CC0 @@ -172,7 +174,7 @@ object Dependencies { val contrib = l ++= Seq(Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools) // akka stream