diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 8cc54f8635..4975532c9a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -11,14 +11,22 @@ import scala.concurrent.duration._ class TellThroughputPerformanceSpec extends PerformanceSpec { import TellThroughputPerformanceSpec._ - val repeat = 30000L * repeatFactor + val defaultRepeat = 30000L * repeatFactor + + def getClient(num: Int, actor: ActorRef, latch: CountDownLatch, repeat: Long): Props = + Props(if (num % 2 == 0) classOf[Client1] else classOf[Client2], actor, latch, repeat) + + def getDestination(num: Int): Props = + Props(if (num % 3 == 0) classOf[Destination1] else classOf[Destination2]) + + override def expectedTestDuration: FiniteDuration = 3 minutes "Tell" must { "warmup" in { - runScenario(8, warmup = true) + runScenario(8, warmup = true, repeat = defaultRepeat / 4) } "warmup more" in { - runScenario(8, warmup = true) + runScenario(1, warmup = true, repeat = defaultRepeat / 8) } "perform with load 1" in { runScenario(1) @@ -96,17 +104,18 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { runScenario(48) } - def runScenario(numberOfClients: Int, warmup: Boolean = false) { - if (acceptClients(numberOfClients)) { + def runScenario(numberOfClients: Int, warmup: Boolean = false, repeat: Long = defaultRepeat) { + if (acceptClients(numberOfClients) || warmup) { val throughputDispatcher = "benchmark.throughput-dispatcher" val latch = new CountDownLatch(numberOfClients) val repeatsPerClient = repeat / numberOfClients - val destinations = for (i ← 0 until numberOfClients) - yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher)) - val clients = for (dest ← destinations) - yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher)) + val (destinations, clients) = (for { + i ← 0 until numberOfClients + dest = system.actorOf(getDestination(i).withDispatcher(throughputDispatcher)) + client = system.actorOf(getClient(i, dest, latch, repeatsPerClient).withDispatcher(throughputDispatcher)) + } yield (dest, client)).unzip val start = System.nanoTime clients.foreach(_ ! Run) @@ -130,13 +139,19 @@ object TellThroughputPerformanceSpec { case object Run case object Msg - class Destination extends Actor { + class Destination1 extends Actor { def receive = { case Msg ⇒ sender ! Msg } } - class Client( + class Destination2 extends Actor { + def receive = { + case Msg ⇒ sender ! Msg + } + } + + class Client1( actor: ActorRef, latch: CountDownLatch, repeat: Long) extends Actor { @@ -159,7 +174,31 @@ object TellThroughputPerformanceSpec { sent += 1 } } + } + class Client2( + actor: ActorRef, + latch: CountDownLatch, + repeat: Long) extends Actor { + + var sent = 0L + var received = 0L + + def receive = { + case Msg ⇒ + received += 1 + if (sent < repeat) { + actor ! Msg + sent += 1 + } else if (received >= repeat) { + latch.countDown() + } + case Run ⇒ + for (i ← 0L until math.min(1000L, repeat)) { + actor ! Msg + sent += 1 + } + } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala index 244f965bd4..0bec1d7610 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala @@ -4,8 +4,11 @@ import com.typesafe.config.ConfigFactory object BenchmarkConfig { private val benchmarkConfig = ConfigFactory.parseString(""" akka { - loggers = ["akka.testkit.TestEventListener"] - loglevel = "WARNING" + loggers = ["akka.testkit.TestEventListener"] + loglevel = "WARNING" + actor { + serialize-messages = off + } } benchmark {