+pro #16372 bring akka-bench-jmh benchmarks to master

This commit is contained in:
Konrad 'ktoso' Malawski 2014-11-23 22:18:09 +01:00
parent 338f61886e
commit d39427b3d9
6 changed files with 422 additions and 10 deletions

View file

@ -4,7 +4,7 @@
package akka.japi.pf
import FI.{UnitApply, Apply, Predicate}
import FI.{ UnitApply, Apply, Predicate }
private[pf] object CaseStatement {
def empty[F, T](): PartialFunction[F, T] = PartialFunction.empty

7
akka-bench-jmh/build.sbt Normal file
View file

@ -0,0 +1,7 @@
import akka.{ AkkaBuild, Dependencies, Formatting, Unidoc }
import pl.project13.scala.sbt.SbtJmh._
import pl.project13.scala.sbt.SbtJmh.JmhKeys._
jmhSettings

View file

@ -0,0 +1,179 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import org.openjdk.jmh.annotations._
import org.openjdk.jmh._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit.TestProbe
import java.io.File
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations.Scope
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class PersistentActorThroughputBenchmark {
val config = PersistenceSpec.config("leveldb", "benchmark")
lazy val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
var system: ActorSystem = _
var probe: TestProbe = _
var actor: ActorRef = _
var persistPersistentActor: ActorRef = _
var persistProcessor: ActorRef = _
var persistAsync1PersistentActor: ActorRef = _
var noPersistPersistentActor: ActorRef = _
var persistAsyncQuickReplyPersistentActor: ActorRef = _
val data10k = (1 to 10000).toArray
@Setup
def setup() {
system = ActorSystem("test", config)
probe = TestProbe()(system)
storageLocations.foreach(FileUtils.deleteDirectory)
actor = system.actorOf(Props(classOf[BaselineActor], data10k.last), "a-1")
persistProcessor = system.actorOf(Props(classOf[PersistProcessor], data10k.last), "p-1")
noPersistPersistentActor = system.actorOf(Props(classOf[NoPersistPersistentActor], data10k.last), "nop-1")
persistPersistentActor = system.actorOf(Props(classOf[PersistPersistentActor], data10k.last), "ep-1")
persistAsync1PersistentActor = system.actorOf(Props(classOf[PersistAsyncPersistentActor], data10k.last), "epa-1")
persistAsyncQuickReplyPersistentActor = system.actorOf(Props(classOf[PersistAsyncQuickReplyPersistentActor], data10k.last), "epa-2")
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
storageLocations.foreach(FileUtils.deleteDirectory)
}
@Benchmark
@OperationsPerInvocation(10000)
def actor_normalActor_reply_baseline() {
for (i <- data10k) actor.tell(i, probe.ref)
probe.expectMsg(data10k.last)
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persist_reply() {
for (i <- data10k) persistPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
}
@Benchmark
@OperationsPerInvocation(10000)
def processor_persist_reply() {
for (i <- data10k) persistProcessor.tell(Persistent(i), probe.ref)
probe.expectMsg(Evt(data10k.last))
}
@Benchmark
@OperationsPerInvocation(10000)
def processor_noPersist_reply() {
for (i <- data10k) persistProcessor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_reply() {
for (i <- data10k) persistAsync1PersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_noPersist_reply() {
for (i <- data10k) noPersistPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_replyRightOnCommandReceive() {
for (i <- data10k) persistAsyncQuickReplyPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
}
}
class NoPersistPersistentActor(respondAfter: Int) extends PersistentActor {
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int => if (n == respondAfter) sender() ! Evt(n)
}
override def receiveRecover = {
case _ => // do nothing
}
}
class PersistPersistentActor(respondAfter: Int) extends PersistentActor {
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int => persist(Evt(n)) { e => if (e.i == respondAfter) sender() ! e }
}
override def receiveRecover = {
case _ => // do nothing
}
}
class PersistProcessor(respondAfter: Int) extends Processor {
override def receive = {
case Persistent(n: Int, _) => if (n == respondAfter) sender() ! Evt(n)
case n: Int => if (n == respondAfter) sender() ! Evt(n)
}
}
class PersistAsyncPersistentActor(respondAfter: Int) extends PersistentActor {
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int =>
persistAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e }
}
override def receiveRecover = {
case _ => // do nothing
}
}
class PersistAsyncQuickReplyPersistentActor(respondAfter: Int) extends PersistentActor {
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int =>
val e = Evt(n)
if (n == respondAfter) sender() ! e
persistAsync(e)(identity)
}
override def receiveRecover = {
case _ => // do nothing
}
}

View file

@ -0,0 +1,214 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import org.openjdk.jmh.annotations._
import akka.actor._
import akka.testkit.TestProbe
import java.io.File
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations.Scope
import scala.concurrent.duration._
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class PersistentActorWithAtLeastOnceDeliveryBenchmark {
val config = PersistenceSpec.config("leveldb", "benchmark")
lazy val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
var system: ActorSystem = _
var probe: TestProbe = _
var actor: ActorRef = _
var persistPersistentActorWithAtLeastOnceDelivery: ActorRef = _
var persistAsyncPersistentActorWithAtLeastOnceDelivery: ActorRef = _
var noPersistPersistentActorWithAtLeastOnceDelivery: ActorRef = _
var destinationActor: ActorRef = _
val dataCount = 10000
@Setup
def setup() {
system = ActorSystem("PersistentActorWithAtLeastOnceDeliveryBenchmark", config)
probe = TestProbe()(system)
storageLocations.foreach(FileUtils.deleteDirectory)
destinationActor = system.actorOf(Props[DestinationActor], "destination")
noPersistPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[NoPersistPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "nop-1")
persistPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[PersistPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "ep-1")
persistAsyncPersistentActorWithAtLeastOnceDelivery = system.actorOf(Props(classOf[PersistAsyncPersistentActorWithAtLeastOnceDelivery], dataCount, probe.ref, destinationActor.path), "epa-1")
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
storageLocations.foreach(FileUtils.deleteDirectory)
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_with_AtLeastOnceDelivery() {
for (i <- 1 to dataCount)
persistAsyncPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(20 seconds, Evt(dataCount))
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persist_with_AtLeastOnceDelivery() {
for (i <- 1 to dataCount)
persistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(2 minutes, Evt(dataCount))
}
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_noPersist_with_AtLeastOnceDelivery() {
for (i <- 1 to dataCount)
noPersistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(20 seconds, Evt(dataCount))
}
}
class NoPersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery {
override def redeliverInterval = 100.milliseconds
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
case _ => // do nothing
}
override def receiveRecover = {
case _ => // do nothing
}
val waitConfirm: Actor.Receive = {
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
if (numberOfUnconfirmed == 0) {
upStream ! Evt(respondAfter)
context.unbecome()
}
case _ => // do nothing
}
}
class PersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery {
override def redeliverInterval = 100.milliseconds
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int =>
persist(MsgSent(n)) { e =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)
}
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
case _ => // do nothing
}
override def receiveRecover = {
case _ => // do nothing
}
val waitConfirm: Actor.Receive = {
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
if (numberOfUnconfirmed == 0) {
upStream ! Evt(respondAfter)
context.unbecome()
}
case _ => // do nothing
}
}
class PersistAsyncPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStream: ActorRef, val downStream: ActorPath) extends PersistentActor with AtLeastOnceDelivery {
override def redeliverInterval = 100.milliseconds
override def persistenceId: String = self.path.name
override def receiveCommand = {
case n: Int =>
persistAsync(MsgSent(n)) { e =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)
}
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
case _ => // do nothing
}
override def receiveRecover = {
case _ => // do nothing
}
val waitConfirm: Actor.Receive = {
case Confirm(deliveryId) =>
confirmDelivery(deliveryId)
if (numberOfUnconfirmed == 0) {
upStream ! Evt(respondAfter)
context.unbecome()
}
case _ => // do nothing
}
}
case class Msg(deliveryId: Long, n: Int)
case class Confirm(deliveryId: Long)
sealed trait Event
case class MsgSent(n: Int) extends Event
case class MsgConfirmed(deliveryId: Long) extends Event
class DestinationActor extends Actor {
var seqNr = 0L
override def receive = {
case n: Int =>
sender() ! Confirm(n)
case Msg(deliveryId, _) =>
seqNr += 1
if (seqNr % 11 == 0) {
//drop it
} else {
sender() ! Confirm(deliveryId)
}
case _ => // do nothing
}
}

View file

@ -4,18 +4,23 @@
package akka
import java.io.{FileInputStream, InputStreamReader}
import java.io.FileInputStream
import java.io.InputStreamReader
import java.util.Properties
import akka.Formatting.docFormatSettings
import akka.MultiNode.multiJvmSettings
import akka.TestExtras.{GraphiteBuildEvents, JUnitFileReporting, StatsDMetrics}
import akka.Unidoc.{scaladocSettings, unidocSettings}
import com.typesafe.sbt.S3Plugin.{S3, s3Settings}
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{MultiJvm, extraOptions}
import akka.TestExtras.GraphiteBuildEvents
import akka.TestExtras.JUnitFileReporting
import akka.TestExtras.StatsDMetrics
import akka.Unidoc.scaladocSettings
import akka.Unidoc.unidocSettings
import com.typesafe.sbt.S3Plugin.S3
import com.typesafe.sbt.S3Plugin.s3Settings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
import com.typesafe.sbt.site.SphinxSupport
import com.typesafe.sbt.site.SphinxSupport.Sphinx
import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact, reportBinaryIssues}
import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import sbt.Keys._
import sbt._
@ -87,6 +92,12 @@ object AkkaBuild extends Build {
dependencies = Seq(testkit % "compile;test->test")
)
lazy val benchJmh = Project(
id = "akka-bench-jmh",
base = file("akka-bench-jmh"),
dependencies = Seq(actor, persistence, testkit).map(_ % "compile;compile->test")
)
lazy val remote = Project(
id = "akka-remote",
base = file("akka-remote"),
@ -340,7 +351,6 @@ object AkkaBuild extends Build {
val validatePullRequestTask = validatePullRequest := ()
lazy val mimaIgnoredProblems = {
import com.typesafe.tools.mima.core._
Seq(
// add filters here, see release-2.2 branch
)

View file

@ -23,5 +23,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.1.6")
// stats reporting
libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0"