diff --git a/akka-actor/src/main/scala/akka/japi/pf/CaseStatements.scala b/akka-actor/src/main/scala/akka/japi/pf/CaseStatements.scala index ed040b310f..da89eabb8f 100644 --- a/akka-actor/src/main/scala/akka/japi/pf/CaseStatements.scala +++ b/akka-actor/src/main/scala/akka/japi/pf/CaseStatements.scala @@ -4,7 +4,7 @@ package akka.japi.pf -import FI.{UnitApply, Apply, Predicate} +import FI.{ UnitApply, Apply, Predicate } private[pf] object CaseStatement { def empty[F, T](): PartialFunction[F, T] = PartialFunction.empty diff --git a/akka-bench-jmh/build.sbt b/akka-bench-jmh/build.sbt new file mode 100644 index 0000000000..87c27a5cd8 --- /dev/null +++ b/akka-bench-jmh/build.sbt @@ -0,0 +1,7 @@ +import akka.{ AkkaBuild, Dependencies, Formatting, Unidoc } + +import pl.project13.scala.sbt.SbtJmh._ +import pl.project13.scala.sbt.SbtJmh.JmhKeys._ + +jmhSettings + diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala new file mode 100644 index 0000000000..1e777c7393 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit.TestProbe +import java.io.File +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations.Scope + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +class PersistentActorThroughputBenchmark { + + val config = PersistenceSpec.config("leveldb", "benchmark") + + lazy val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + var system: ActorSystem = _ + + var probe: TestProbe = _ + var actor: ActorRef = _ + var persistPersistentActor: ActorRef = _ + var persistProcessor: ActorRef = _ + var persistAsync1PersistentActor: ActorRef = _ + var noPersistPersistentActor: ActorRef = _ + var persistAsyncQuickReplyPersistentActor: ActorRef = _ + + val data10k = (1 to 10000).toArray + + @Setup + def setup() { + system = ActorSystem("test", config) + + probe = TestProbe()(system) + + storageLocations.foreach(FileUtils.deleteDirectory) + + actor = system.actorOf(Props(classOf[BaselineActor], data10k.last), "a-1") + + persistProcessor = system.actorOf(Props(classOf[PersistProcessor], data10k.last), "p-1") + + noPersistPersistentActor = system.actorOf(Props(classOf[NoPersistPersistentActor], data10k.last), "nop-1") + persistPersistentActor = system.actorOf(Props(classOf[PersistPersistentActor], data10k.last), "ep-1") + persistAsync1PersistentActor = system.actorOf(Props(classOf[PersistAsyncPersistentActor], data10k.last), "epa-1") + + persistAsyncQuickReplyPersistentActor = system.actorOf(Props(classOf[PersistAsyncQuickReplyPersistentActor], data10k.last), "epa-2") + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + + storageLocations.foreach(FileUtils.deleteDirectory) + } + + @Benchmark + @OperationsPerInvocation(10000) + def actor_normalActor_reply_baseline() { + for (i <- data10k) actor.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_persist_reply() { + for (i <- data10k) persistPersistentActor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def processor_persist_reply() { + for (i <- data10k) persistProcessor.tell(Persistent(i), probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def processor_noPersist_reply() { + for (i <- data10k) persistProcessor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_persistAsync_reply() { + for (i <- data10k) persistAsync1PersistentActor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_noPersist_reply() { + for (i <- data10k) noPersistPersistentActor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_persistAsync_replyRightOnCommandReceive() { + for (i <- data10k) persistAsyncQuickReplyPersistentActor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + +} + +class NoPersistPersistentActor(respondAfter: Int) extends PersistentActor { + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => if (n == respondAfter) sender() ! Evt(n) + } + override def receiveRecover = { + case _ => // do nothing + } + +} +class PersistPersistentActor(respondAfter: Int) extends PersistentActor { + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => persist(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } + } + override def receiveRecover = { + case _ => // do nothing + } + +} +class PersistProcessor(respondAfter: Int) extends Processor { + override def receive = { + case Persistent(n: Int, _) => if (n == respondAfter) sender() ! Evt(n) + case n: Int => if (n == respondAfter) sender() ! Evt(n) + } +} + +class PersistAsyncPersistentActor(respondAfter: Int) extends PersistentActor { + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } + } + override def receiveRecover = { + case _ => // do nothing + } +} + +class PersistAsyncQuickReplyPersistentActor(respondAfter: Int) extends PersistentActor { + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => + val e = Evt(n) + if (n == respondAfter) sender() ! e + persistAsync(e)(identity) + } + override def receiveRecover = { + case _ => // do nothing + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala new file mode 100644 index 0000000000..30b70cd0f2 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -0,0 +1,214 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + + +import org.openjdk.jmh.annotations._ +import akka.actor._ +import akka.testkit.TestProbe +import java.io.File +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations.Scope + +import scala.concurrent.duration._ + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +class PersistentActorWithAtLeastOnceDeliveryBenchmark { + + val config = PersistenceSpec.config("leveldb", "benchmark") + + lazy val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + var system: ActorSystem = _ + + var probe: TestProbe = _ + var actor: ActorRef = _ + var persistPersistentActorWithAtLeastOnceDelivery: ActorRef = _ + var persistAsyncPersistentActorWithAtLeastOnceDelivery: ActorRef = _ + var noPersistPersistentActorWithAtLeastOnceDelivery: ActorRef = _ + var destinationActor: ActorRef = _ + + val dataCount = 10000 + + @Setup + def setup() { + system = ActorSystem("PersistentActorWithAtLeastOnceDeliveryBenchmark", config) + + probe = TestProbe()(system) + + storageLocations.foreach(FileUtils.deleteDirectory) + + destinationActor = system.actorOf(Props[DestinationActor], "destination") + + noPersistPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[NoPersistPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "nop-1") + persistPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[PersistPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "ep-1") + persistAsyncPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[PersistAsyncPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "epa-1") + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + + storageLocations.foreach(FileUtils.deleteDirectory) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_persistAsync_with_AtLeastOnceDelivery() { + for (i <- 1 to dataCount) + persistAsyncPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) + probe.expectMsg(20 seconds, Evt(dataCount)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_persist_with_AtLeastOnceDelivery() { + for (i <- 1 to dataCount) + persistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) + probe.expectMsg(2 minutes, Evt(dataCount)) + } + + @Benchmark + @OperationsPerInvocation(10000) + def persistentActor_noPersist_with_AtLeastOnceDelivery() { + for (i <- 1 to dataCount) + noPersistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) + probe.expectMsg(20 seconds, Evt(dataCount)) + } +} + +class NoPersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery { + + override def redeliverInterval = 100.milliseconds + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => + deliver(downStream, deliveryId => + Msg(deliveryId, n)) + if (n == respondAfter) + //switch to wait all message confirmed + context.become(waitConfirm) + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + case _ => // do nothing + } + + override def receiveRecover = { + case _ => // do nothing + } + + val waitConfirm: Actor.Receive = { + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + if (numberOfUnconfirmed == 0) { + upStream ! Evt(respondAfter) + context.unbecome() + } + case _ => // do nothing + } +} + +class PersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery { + + override def redeliverInterval = 100.milliseconds + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => + persist(MsgSent(n)) { e => + deliver(downStream, deliveryId => + Msg(deliveryId, n)) + if (n == respondAfter) + //switch to wait all message confirmed + context.become(waitConfirm) + } + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + case _ => // do nothing + } + + override def receiveRecover = { + case _ => // do nothing + } + + val waitConfirm: Actor.Receive = { + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + if (numberOfUnconfirmed == 0) { + upStream ! Evt(respondAfter) + context.unbecome() + } + case _ => // do nothing + } +} + +class PersistAsyncPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery { + + override def redeliverInterval = 100.milliseconds + + override def persistenceId: String = self.path.name + + override def receiveCommand = { + case n: Int => + persistAsync(MsgSent(n)) { e => + deliver(downStream, deliveryId => + Msg(deliveryId, n)) + if (n == respondAfter) + //switch to wait all message confirmed + context.become(waitConfirm) + } + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + case _ => // do nothing + } + + override def receiveRecover = { + case _ => // do nothing + } + + val waitConfirm: Actor.Receive = { + case Confirm(deliveryId) => + confirmDelivery(deliveryId) + if (numberOfUnconfirmed == 0) { + upStream ! Evt(respondAfter) + context.unbecome() + } + case _ => // do nothing + } +} + +case class Msg(deliveryId: Long, n: Int) + +case class Confirm(deliveryId: Long) + +sealed trait Event + +case class MsgSent(n: Int) extends Event + +case class MsgConfirmed(deliveryId: Long) extends Event + +class DestinationActor extends Actor { + var seqNr = 0L + + override def receive = { + case n: Int => + sender() ! Confirm(n) + case Msg(deliveryId, _) => + seqNr += 1 + if (seqNr % 11 == 0) { + //drop it + } else { + sender() ! Confirm(deliveryId) + } + case _ => // do nothing + } +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e4438b5b7a..8a06b71258 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -4,18 +4,23 @@ package akka -import java.io.{FileInputStream, InputStreamReader} +import java.io.FileInputStream +import java.io.InputStreamReader import java.util.Properties -import akka.Formatting.docFormatSettings -import akka.MultiNode.multiJvmSettings -import akka.TestExtras.{GraphiteBuildEvents, JUnitFileReporting, StatsDMetrics} -import akka.Unidoc.{scaladocSettings, unidocSettings} -import com.typesafe.sbt.S3Plugin.{S3, s3Settings} -import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{MultiJvm, extraOptions} +import akka.TestExtras.GraphiteBuildEvents +import akka.TestExtras.JUnitFileReporting +import akka.TestExtras.StatsDMetrics +import akka.Unidoc.scaladocSettings +import akka.Unidoc.unidocSettings +import com.typesafe.sbt.S3Plugin.S3 +import com.typesafe.sbt.S3Plugin.s3Settings +import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm import com.typesafe.sbt.site.SphinxSupport import com.typesafe.sbt.site.SphinxSupport.Sphinx -import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact, reportBinaryIssues} +import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters +import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact +import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import sbt.Keys._ import sbt._ @@ -87,6 +92,12 @@ object AkkaBuild extends Build { dependencies = Seq(testkit % "compile;test->test") ) + lazy val benchJmh = Project( + id = "akka-bench-jmh", + base = file("akka-bench-jmh"), + dependencies = Seq(actor, persistence, testkit).map(_ % "compile;compile->test") + ) + lazy val remote = Project( id = "akka-remote", base = file("akka-remote"), @@ -340,7 +351,6 @@ object AkkaBuild extends Build { val validatePullRequestTask = validatePullRequest := () lazy val mimaIgnoredProblems = { - import com.typesafe.tools.mima.core._ Seq( // add filters here, see release-2.2 branch ) diff --git a/project/plugins.sbt b/project/plugins.sbt index 47b79b6c43..c5959e7171 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -23,5 +23,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1") addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.1.6") + // stats reporting libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0"