diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/PerformanceSpec.scala deleted file mode 100644 index a2bbdc9a22..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/PerformanceSpec.scala +++ /dev/null @@ -1,90 +0,0 @@ -package akka.performance.microbench - -import scala.collection.immutable.TreeMap - -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics -import org.scalatest.BeforeAndAfterEach - -import akka.actor.simpleName -import akka.performance.workbench.BenchResultRepository -import akka.performance.workbench.Report -import akka.performance.workbench.Stats -import akka.testkit.AkkaSpec -import akka.AkkaApplication - -trait PerformanceSpec extends AkkaSpec with BeforeAndAfterEach { - - def app: AkkaApplication - - def isBenchmark() = System.getProperty("benchmark") == "true" - - def minClients() = System.getProperty("benchmark.minClients", "1").toInt; - - def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt; - - def repeatFactor() = { - val defaultRepeatFactor = if (isBenchmark) "150" else "2" - System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt - } - - def sampling = { - System.getProperty("benchmark.sampling", "200").toInt - } - - var stat: DescriptiveStatistics = _ - - override def beforeEach() { - stat = new SynchronizedDescriptiveStatistics - } - - val resultRepository = BenchResultRepository() - lazy val report = new Report(app, resultRepository, compareResultWith) - - /** - * To compare two tests with each other you can override this method, in - * the test. For example Some("OneWayPerformanceTest") - */ - def compareResultWith: Option[String] = None - - def logMeasurement(scenario: String, numberOfClients: Int, durationNs: Long) { - try { - val name = simpleName(this) - val durationS = durationNs.toDouble / 1000000000.0 - - val percentiles = TreeMap[Int, Long]( - 5 -> (stat.getPercentile(5.0) / 1000).toLong, - 25 -> (stat.getPercentile(25.0) / 1000).toLong, - 50 -> (stat.getPercentile(50.0) / 1000).toLong, - 75 -> (stat.getPercentile(75.0) / 1000).toLong, - 95 -> (stat.getPercentile(95.0) / 1000).toLong) - - val n = stat.getN * sampling - - val stats = Stats( - name, - load = numberOfClients, - timestamp = TestStart.startTime, - durationNanos = durationNs, - n = n, - min = (stat.getMin / 1000).toLong, - max = (stat.getMax / 1000).toLong, - mean = (stat.getMean / 1000).toLong, - tps = (n.toDouble / durationS), - percentiles) - - resultRepository.add(stats) - - report.html(resultRepository.get(name)) - } catch { - // don't fail test due to problems saving bench report - case e: Exception ⇒ app.eventHandler.error(this, e.getMessage) - } - } - -} - -object TestStart { - val startTime = System.currentTimeMillis -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala new file mode 100644 index 0000000000..14acef4373 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala @@ -0,0 +1,140 @@ +package akka.performance.microbench + +import akka.performance.workbench.PerformanceSpec +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics +import org.junit.runner.RunWith +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import java.util.Random +import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics + +// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TellLatencyPerformanceSpec extends PerformanceSpec { + import TellLatencyPerformanceSpec._ + + val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") + .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setCorePoolSize(8) + .build + + val repeat = 200L * repeatFactor + def clientDelayMicros = { + System.getProperty("benchmark.clientDelayMicros", "250").toInt + } + + var stat: DescriptiveStatistics = _ + + override def beforeEach() { + stat = new SynchronizedDescriptiveStatistics + } + + "Tell" must { + "warmup" in { + runScenario(2, warmup = true) + } + "warmup more" in { + runScenario(4, warmup = true) + } + "perform with load 1" in { + runScenario(1) + } + "perform with load 2" in { + runScenario(2) + } + "perform with load 4" in { + runScenario(4) + } + "perform with load 6" in { + runScenario(6) + } + "perform with load 8" in { + runScenario(8) + } + + def runScenario(numberOfClients: Int, warmup: Boolean = false) { + if (acceptClients(numberOfClients)) { + + val latch = new CountDownLatch(numberOfClients) + val repeatsPerClient = repeat / numberOfClients + val clients = (for (i ← 0 until numberOfClients) yield { + val destination = app.actorOf[Destination] + val w4 = app.actorOf(new Waypoint(destination)) + val w3 = app.actorOf(new Waypoint(w4)) + val w2 = app.actorOf(new Waypoint(w3)) + val w1 = app.actorOf(new Waypoint(w2)) + Props(new Client(w1, latch, repeatsPerClient, clientDelayMicros, stat)).withDispatcher(clientDispatcher) + }).toList.map(app.actorOf(_)) + + val start = System.nanoTime + clients.foreach(_ ! Run) + val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS) + val durationNs = (System.nanoTime - start) + + if (!warmup) { + ok must be(true) + logMeasurement(numberOfClients, durationNs, stat) + } + clients.foreach(_ ! PoisonPill) + + } + } + } +} + +object TellLatencyPerformanceSpec { + + val random: Random = new Random(0) + + case object Run + case class Msg(nanoTime: Long = System.nanoTime) + + class Waypoint(next: ActorRef) extends Actor { + def receive = { + case msg: Msg ⇒ next forward msg + } + } + + class Destination extends Actor { + def receive = { + case msg: Msg ⇒ sender ! msg + } + } + + class Client( + actor: ActorRef, + latch: CountDownLatch, + repeat: Long, + delayMicros: Int, + stat: DescriptiveStatistics) extends Actor { + + var sent = 0L + var received = 0L + + def receive = { + case Msg(sendTime) ⇒ + val duration = System.nanoTime - sendTime + stat.addValue(duration) + received += 1 + if (sent < repeat) { + PerformanceSpec.shortDelay(delayMicros, received) + actor ! Msg() + sent += 1 + } else if (received >= repeat) { + latch.countDown() + } + case Run ⇒ + // random initial delay to spread requests + val initialDelay = random.nextInt(20) + Thread.sleep(initialDelay) + actor ! Msg() + sent += 1 + } + + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellPerformanceSpec.scala deleted file mode 100644 index 72c2f016c2..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellPerformanceSpec.scala +++ /dev/null @@ -1,125 +0,0 @@ -package akka.performance.microbench - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import org.junit.runner.RunWith - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props - -// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TellPerformanceSpec extends PerformanceSpec { - import TellPerformanceSpec._ - - val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") - .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - .setCorePoolSize(maxClients) - .build - - val repeat = repeatFactor * 30000 - - "Tell" must { - "warmup" in { - runScenario(2, warmup = true) - } - "perform with load 1" in { - runScenario(1) - } - "perform with load 2" in { - runScenario(2) - } - "perform with load 4" in { - runScenario(4) - } - "perform with load 6" in { - runScenario(6) - } - "perform with load 8" in { - runScenario(8) - } - - def runScenario(numberOfClients: Int, warmup: Boolean = false) { - if (numberOfClients <= maxClients) { - - val latch = new CountDownLatch(numberOfClients) - val repeatsPerClient = repeat / numberOfClients - val clients = (for (i ← 0 until numberOfClients) yield { - val c = app.actorOf[Destination] - val b = app.actorOf(new Waypoint(c)) - val a = app.actorOf(new Waypoint(b)) - Props(new Client(a, latch, repeatsPerClient, sampling, stat)).withDispatcher(clientDispatcher) - }).toList.map(app.actorOf(_)) - - val start = System.nanoTime - clients.foreach(_ ! Run) - latch.await(30, TimeUnit.SECONDS) must be(true) - val durationNs = (System.nanoTime - start) - - if (!warmup) { - logMeasurement("one-way tell", numberOfClients, durationNs) - } - clients.foreach(_ ! PoisonPill) - - } - } - } -} - -object TellPerformanceSpec { - - case object Run - case class Msg(latch: Option[CountDownLatch]) - - class Waypoint(next: ActorRef) extends Actor { - def receive = { - case msg: Msg ⇒ next ! msg - } - } - - class Destination extends Actor { - def receive = { - case Msg(latch) ⇒ latch.foreach(_.countDown()) - } - } - - class Client( - actor: ActorRef, - latch: CountDownLatch, - repeat: Int, - sampling: Int, - stat: DescriptiveStatistics) extends Actor { - - def receive = { - case Run ⇒ - val msgWithoutLatch = Msg(None) - for (n ← 1 to repeat) { - if (measureLatency(n)) { - val t0 = System.nanoTime - tellAndAwait() - val duration = System.nanoTime - t0 - stat.addValue(duration) - } else if (measureLatency(n + 1) || n == repeat) { - tellAndAwait() - } else { - actor ! msgWithoutLatch - } - } - latch.countDown() - } - - def tellAndAwait() { - val msgLatch = new CountDownLatch(1) - actor ! Msg(Some(msgLatch)) - val ok = msgLatch.await(10, TimeUnit.SECONDS) - if (!ok) app.eventHandler.error(this, "Too long delay") - } - - def measureLatency(n: Int) = (n % sampling == 0) - } - -} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala new file mode 100644 index 0000000000..5f2c3ec74f --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -0,0 +1,132 @@ +package akka.performance.microbench + +import akka.performance.workbench.PerformanceSpec +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics +import org.junit.runner.RunWith +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.dispatch.Dispatchers +import akka.dispatch.Dispatcher +import akka.dispatch.Dispatchers + +// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TellThroughputPerformanceSpec extends PerformanceSpec { + import TellThroughputPerformanceSpec._ + + val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") + .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setCorePoolSize(maxClients) + .build + + val destinationDispatcher = app.dispatcherFactory.newDispatcher("destination-dispatcher") + .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setCorePoolSize(maxClients) + .build + + val repeat = 30000L * repeatFactor + + "Tell" must { + "warmup" in { + runScenario(4, warmup = true) + } + "warmup more" in { + runScenario(4, warmup = true) + } + "perform with load 1" in { + runScenario(1) + } + "perform with load 2" in { + runScenario(2) + } + "perform with load 4" in { + runScenario(4) + } + "perform with load 6" in { + runScenario(6) + } + "perform with load 8" in { + runScenario(8) + } + "perform with load 10" in { + runScenario(10) + } + "perform with load 12" in { + runScenario(12) + } + "perform with load 14" in { + runScenario(14) + } + "perform with load 16" in { + runScenario(16) + } + + def runScenario(numberOfClients: Int, warmup: Boolean = false) { + if (acceptClients(numberOfClients)) { + + val latch = new CountDownLatch(numberOfClients) + val repeatsPerClient = repeat / numberOfClients + val destinations = for (i ← 0 until numberOfClients) + yield app.actorOf(Props(new Destination).withDispatcher(destinationDispatcher)) + val clients = for (dest ← destinations) + yield app.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher)) + + val start = System.nanoTime + clients.foreach(_ ! Run) + val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS) + val durationNs = (System.nanoTime - start) + + if (!warmup) { + ok must be(true) + logMeasurement(numberOfClients, durationNs, repeat) + } + clients.foreach(_ ! PoisonPill) + destinations.foreach(_ ! PoisonPill) + + } + } + } +} + +object TellThroughputPerformanceSpec { + + case object Run + case object Msg + + class Destination extends Actor { + def receive = { + case Msg ⇒ sender ! Msg + } + } + + class Client( + actor: ActorRef, + latch: CountDownLatch, + repeat: Long) extends Actor { + + var sent = 0L + var received = 0L + + def receive = { + case Msg ⇒ + received += 1 + if (sent < repeat) { + actor ! Msg + sent += 1 + } else if (received >= repeat) { + latch.countDown() + } + case Run ⇒ + for (i ← 0L until math.min(1000L, repeat)) { + actor ! Msg + sent += 1 + } + } + + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala deleted file mode 100644 index 697b8b299f..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala +++ /dev/null @@ -1,86 +0,0 @@ -package akka.performance.trading.common - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import akka.performance.trading.domain._ -import akka.performance.trading.common._ -import akka.actor.{ Props, ActorRef, Actor, PoisonPill } -import akka.AkkaApplication - -abstract class AkkaPerformanceTest(val app: AkkaApplication) extends BenchmarkScenarios { - - type TS = AkkaTradingSystem - - val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") - .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - .setCorePoolSize(maxClients) - .setMaxPoolSize(maxClients) - .build - - override def createTradingSystem: TS = new AkkaTradingSystem(app) - - /** - * Implemented in subclass - */ - def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp - - override def runScenario(scenario: String, orders: List[Order], repeat: Int, numberOfClients: Int, delayMs: Int) = { - val totalNumberOfRequests = orders.size * repeat - val repeatsPerClient = repeat / numberOfClients - val oddRepeats = repeat - (repeatsPerClient * numberOfClients) - val latch = new CountDownLatch(numberOfClients) - val receivers = tradingSystem.orderReceivers.toIndexedSeq - val start = System.nanoTime - val clients = (for (i ← 0 until numberOfClients) yield { - val receiver = receivers(i % receivers.size) - Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), sampling, delayMs)).withDispatcher(clientDispatcher) - }).toList.map(app.actorOf(_)) - - clients.foreach(_ ! "run") - val ok = latch.await((5000 + (2 + delayMs) * totalNumberOfRequests) * timeDilation, TimeUnit.MILLISECONDS) - val durationNs = (System.nanoTime - start) - - assert(ok) - assert((orders.size / 2) * repeat == TotalTradeCounter.counter.get) - logMeasurement(scenario, numberOfClients, durationNs) - clients.foreach(_ ! PoisonPill) - } - - class Client( - orderReceiver: ActorRef, - orders: List[Order], - latch: CountDownLatch, - repeat: Int, - sampling: Int, - delayMs: Int = 0) extends Actor { - - def receive = { - case "run" ⇒ - var n = 0 - for (r ← 1 to repeat; o ← orders) { - n += 1 - - val rsp = - if (measureLatency(n)) { - val t0 = System.nanoTime - val rsp = placeOrder(orderReceiver, o, await = true) - val duration = System.nanoTime - t0 - stat.addValue(duration) - rsp - } else { - val await = measureLatency(n + 1) || (r == repeat) - placeOrder(orderReceiver, o, await) - } - if (!rsp.status) { - app.eventHandler.error(this, "Invalid rsp") - } - delay(delayMs) - } - latch.countDown() - } - - def measureLatency(n: Int) = (n % sampling == 0) - } - -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala deleted file mode 100644 index 54c6f36ce8..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala +++ /dev/null @@ -1,62 +0,0 @@ -package akka.performance.trading.common - -import org.junit._ -import akka.performance.trading.domain._ - -trait BenchmarkScenarios extends PerformanceTest { - - @Test - def complexScenario1 = complexScenario(1) - @Test - def complexScenario2 = complexScenario(2) - @Test - def complexScenario4 = complexScenario(4) - @Test - def complexScenario6 = complexScenario(6) - @Test - def complexScenario8 = complexScenario(8) - @Test - def complexScenario10 = complexScenario(10) - @Test - def complexScenario20 = complexScenario(20) - @Test - def complexScenario30 = complexScenario(30) - @Test - def complexScenario40 = complexScenario(40) - @Test - def complexScenario60 = complexScenario(60) - @Test - def complexScenario80 = complexScenario(80) - @Test - def complexScenario100 = complexScenario(100) - /* - @Test - def complexScenario200 = complexScenario(200) - @Test - def complexScenario300 = complexScenario(300) - @Test - def complexScenario400 = complexScenario(400) - */ - - def complexScenario(numberOfClients: Int) { - Assume.assumeTrue(numberOfClients >= minClients) - Assume.assumeTrue(numberOfClients <= maxClients) - - val repeat = 500 * repeatFactor - - val prefixes = "A" :: "B" :: "C" :: "D" :: "E" :: Nil - val askOrders = for { - s ← prefixes - i ← 1 to 3 - } yield new Ask(s + i, 100 - i, 1000) - val bidOrders = for { - s ← prefixes - i ← 1 to 3 - } yield new Bid(s + i, 100 - i, 1000) - val orders = askOrders ::: bidOrders - - runScenario("benchmark", orders, repeat, numberOfClients, 0) - } - -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala deleted file mode 100644 index dd400dc82e..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ /dev/null @@ -1,160 +0,0 @@ -package akka.performance.trading.common - -import java.util.Random -import scala.collection.immutable.TreeMap -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics -import org.junit.After -import org.junit.Before -import org.scalatest.junit.JUnitSuite -import akka.performance.trading.domain.Ask -import akka.performance.trading.domain.Bid -import akka.performance.trading.domain.Order -import akka.performance.trading.domain.TotalTradeCounter -import akka.performance.workbench.BenchResultRepository -import akka.performance.workbench.Report -import akka.performance.workbench.Stats -import akka.AkkaApplication -import akka.actor.simpleName - -trait PerformanceTest extends JUnitSuite { - - def app: AkkaApplication - - var isWarm = false - - def isBenchmark() = System.getProperty("benchmark") == "true" - - def minClients() = System.getProperty("benchmark.minClients", "1").toInt; - - def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt; - - def repeatFactor() = { - val defaultRepeatFactor = if (isBenchmark) "150" else "2" - System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt - } - - def warmupRepeatFactor() = { - val defaultRepeatFactor = if (isBenchmark) "200" else "1" - System.getProperty("benchmark.warmupRepeatFactor", defaultRepeatFactor).toInt - } - - def randomSeed() = { - System.getProperty("benchmark.randomSeed", "0").toInt - } - - def timeDilation() = { - System.getProperty("benchmark.timeDilation", "1").toLong - } - - def sampling = { - System.getProperty("benchmark.sampling", "200").toInt - } - - var stat: DescriptiveStatistics = _ - - val resultRepository = BenchResultRepository() - lazy val report = new Report(app, resultRepository, compareResultWith) - - type TS <: TradingSystem - - var tradingSystem: TS = _ - val random: Random = new Random(randomSeed) - - def createTradingSystem(): TS - - def placeOrder(orderReceiver: TS#OR, order: Order, await: Boolean): Rsp - - def runScenario(scenario: String, orders: List[Order], repeat: Int, numberOfClients: Int, delayMs: Int) - - @Before - def setUp() { - stat = new SynchronizedDescriptiveStatistics - tradingSystem = createTradingSystem() - tradingSystem.start() - warmUp() - TotalTradeCounter.reset() - stat = new SynchronizedDescriptiveStatistics - } - - @After - def tearDown() { - tradingSystem.shutdown() - stat = null - } - - def warmUp() { - val bid = new Bid("A1", 100, 1000) - val ask = new Ask("A1", 100, 1000) - - val orderReceiver = tradingSystem.orderReceivers.head - val loopCount = if (isWarm) 1 else 10 * warmupRepeatFactor - - for (i ← 1 to loopCount) { - placeOrder(orderReceiver, bid, true) - placeOrder(orderReceiver, ask, true) - } - isWarm = true - } - - /** - * To compare two tests with each other you can override this method, in - * the test. For example Some("OneWayPerformanceTest") - */ - def compareResultWith: Option[String] = None - - def logMeasurement(scenario: String, numberOfClients: Int, durationNs: Long) { - try { - val name = simpleName(this) - val durationS = durationNs.toDouble / 1000000000.0 - - val percentiles = TreeMap[Int, Long]( - 5 -> (stat.getPercentile(5.0) / 1000).toLong, - 25 -> (stat.getPercentile(25.0) / 1000).toLong, - 50 -> (stat.getPercentile(50.0) / 1000).toLong, - 75 -> (stat.getPercentile(75.0) / 1000).toLong, - 95 -> (stat.getPercentile(95.0) / 1000).toLong) - - val n = stat.getN * sampling - - val stats = Stats( - name, - load = numberOfClients, - timestamp = TestStart.startTime, - durationNanos = durationNs, - n = n, - min = (stat.getMin / 1000).toLong, - max = (stat.getMax / 1000).toLong, - mean = (stat.getMean / 1000).toLong, - tps = (n.toDouble / durationS), - percentiles) - - resultRepository.add(stats) - - report.html(resultRepository.get(name)) - } catch { - // don't fail test due to problems saving bench report - case e: Exception ⇒ app.eventHandler.error(this, e.getMessage) - } - } - - def delay(delayMs: Int) { - val adjustedDelay = - if (delayMs >= 5) { - val dist = 0.2 * delayMs - (delayMs + random.nextGaussian * dist).intValue - } else { - delayMs - } - - if (adjustedDelay > 0) { - Thread.sleep(adjustedDelay) - } - } - -} - -object TestStart { - val startTime = System.currentTimeMillis -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Rsp.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Rsp.scala deleted file mode 100644 index 683ff3d331..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Rsp.scala +++ /dev/null @@ -1,3 +0,0 @@ -package akka.performance.trading.common - -case class Rsp(status: Boolean) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/LatchMessage.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/LatchMessage.scala deleted file mode 100644 index 34675c8b26..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/LatchMessage.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka.performance.trading.domain - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -trait LatchMessage { - val count: Int - lazy val latch: CountDownLatch = new CountDownLatch(count) -} - -object LatchOrder { - def apply(order: Order) = order match { - case bid: Bid ⇒ new Bid(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 } - case ask: Ask ⇒ new Ask(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 } - } -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/Order.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/Order.scala index 9007243863..f8919d05e7 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/Order.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/Order.scala @@ -4,26 +4,34 @@ trait Order { def orderbookSymbol: String def price: Long def volume: Long + def nanoTime: Long + def withNanoTime: Order } case class Bid( orderbookSymbol: String, price: Long, - volume: Long) + volume: Long, + nanoTime: Long = 0L) extends Order { def split(newVolume: Long) = { new Bid(orderbookSymbol, price, newVolume) } + + def withNanoTime: Bid = copy(nanoTime = System.nanoTime) } case class Ask( orderbookSymbol: String, price: Long, - volume: Long) + volume: Long, + nanoTime: Long = 0L) extends Order { def split(newVolume: Long) = { new Ask(orderbookSymbol, price, newVolume) } + + def withNanoTime: Ask = copy(nanoTime = System.nanoTime) } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala index 6476c3faf1..123d785e17 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala @@ -8,7 +8,9 @@ abstract trait TradeObserver { trait SimpleTradeObserver extends TradeObserver { override def trade(bid: Bid, ask: Ask) { - val c = TotalTradeCounter.counter.incrementAndGet + if (!Orderbook.useDummyOrderbook) { + TotalTradeCounter.counter.incrementAndGet + } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala deleted file mode 100644 index 44ce92a92d..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala +++ /dev/null @@ -1,24 +0,0 @@ -package akka.performance.trading.oneway - -import akka.actor._ -import akka.dispatch.MessageDispatcher -import akka.performance.trading.domain.Order -import akka.performance.trading.domain.Orderbook -import akka.performance.trading.common.AkkaMatchingEngine - -class OneWayMatchingEngine(meId: String, orderbooks: List[Orderbook]) extends AkkaMatchingEngine(meId, orderbooks) { - - override def handleOrder(order: Order) { - orderbooksMap.get(order.orderbookSymbol) match { - case Some(orderbook) ⇒ - standby.foreach(_ ! order) - - orderbook.addOrder(order) - orderbook.matchOrders() - - case None ⇒ - app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) - } - } - -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala deleted file mode 100644 index daeabfb36b..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala +++ /dev/null @@ -1,19 +0,0 @@ -package akka.performance.trading.oneway - -import akka.actor._ -import akka.dispatch.MessageDispatcher -import akka.performance.trading.domain._ -import akka.performance.trading.common.AkkaOrderReceiver - -class OneWayOrderReceiver extends AkkaOrderReceiver { - - override def placeOrder(order: Order) = { - val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) - matchingEngine match { - case Some(m) ⇒ - m ! order - case None ⇒ - app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) - } - } -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayPerformanceTest.scala deleted file mode 100644 index ef0d97ece2..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayPerformanceTest.scala +++ /dev/null @@ -1,58 +0,0 @@ -package akka.performance.trading.oneway - -import java.util.concurrent.TimeUnit -import org.junit.Test -import akka.performance.trading.common.AkkaPerformanceTest -import akka.performance.trading.common.Rsp -import akka.performance.trading.domain._ -import akka.actor.{ Props, ActorRef } -import akka.AkkaApplication - -// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark.useDummyOrderbook=true -Dbenchmark=true -Dbenchmark.minClients=1 -Dbenchmark.maxClients=40 -Dbenchmark.repeatFactor=500 -class OneWayPerformanceTest extends AkkaPerformanceTest(AkkaApplication()) { - - val Ok = new Rsp(true) - - override def createTradingSystem: TS = new OneWayTradingSystem(app) { - override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) = meDispatcher match { - case Some(d) ⇒ app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks) with LatchMessageCountDown).withDispatcher(d)) - case _ ⇒ app.actorOf(new OneWayMatchingEngine(meId, orderbooks) with LatchMessageCountDown) - } - } - - override def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp = { - if (await) { - val newOrder = LatchOrder(order) - orderReceiver ! newOrder - val ok = newOrder.latch.await(10, TimeUnit.SECONDS) - new Rsp(ok) - } else { - orderReceiver ! order - Ok - } - } - - // need this so that junit will detect this as a test case - @Test - def dummy {} - - override def compareResultWith = Some("RspPerformanceTest") - - def createLatchOrder(order: Order) = order match { - case bid: Bid ⇒ new Bid(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 } - case ask: Ask ⇒ new Ask(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 } - } - -} - -trait LatchMessageCountDown extends OneWayMatchingEngine { - - override def handleOrder(order: Order) { - super.handleOrder(order) - order match { - case x: LatchMessage ⇒ x.latch.countDown - case _ ⇒ - } - } -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala deleted file mode 100644 index f841c8288b..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala +++ /dev/null @@ -1,20 +0,0 @@ -package akka.performance.trading.oneway - -import akka.performance.trading.common.AkkaTradingSystem -import akka.performance.trading.domain.Orderbook -import akka.actor.{ Props, ActorRef } -import akka.AkkaApplication - -class OneWayTradingSystem(_app: AkkaApplication) extends AkkaTradingSystem(_app) { - - override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) = meDispatcher match { - case Some(d) ⇒ app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks)).withDispatcher(d)) - case _ ⇒ app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks))) - } - - override def createOrderReceiver() = orDispatcher match { - case Some(d) ⇒ app.actorOf(Props[OneWayOrderReceiver].withDispatcher(d)) - case _ ⇒ app.actorOf(Props[OneWayOrderReceiver]) - } - -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/response/RspPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/response/RspPerformanceTest.scala deleted file mode 100644 index 47d0a69e41..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/response/RspPerformanceTest.scala +++ /dev/null @@ -1,25 +0,0 @@ -package akka.performance.trading.response - -import org.junit.Test -import akka.actor.ActorRef -import akka.performance.trading.common.AkkaPerformanceTest -import akka.performance.trading.domain.Order -import akka.performance.trading.common.Rsp -import akka.AkkaApplication - -class RspPerformanceTest extends AkkaPerformanceTest(AkkaApplication()) { - - implicit def appl = app - - override def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp = { - (orderReceiver ? order).get.asInstanceOf[Rsp] - } - - // need this so that junit will detect this as a test case - @Test - def dummy {} - - override def compareResultWith = Some("OneWayPerformanceTest") - -} - diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala similarity index 64% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala rename to akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala index 9c88e67902..fa2f5f2493 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala @@ -1,4 +1,4 @@ -package akka.performance.trading.common +package akka.performance.trading.system import akka.performance.trading.domain._ import akka.actor._ @@ -11,7 +11,7 @@ trait MatchingEngine { val orderbooks: List[Orderbook] val supportedOrderbookSymbols = orderbooks map (_.symbol) protected val orderbooksMap: Map[String, Orderbook] = - Map() ++ (orderbooks map (o ⇒ (o.symbol, o))) + orderbooks.map(o ⇒ (o.symbol, o)).toMap } @@ -21,10 +21,10 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) var standby: Option[ActorRef] = None def receive = { - case standbyRef: ActorRef ⇒ - standby = Some(standbyRef) case order: Order ⇒ handleOrder(order) + case standbyRef: ActorRef ⇒ + standby = Some(standbyRef) case unknown ⇒ app.eventHandler.warning(this, "Received unknown message: " + unknown) } @@ -32,30 +32,21 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) def handleOrder(order: Order) { orderbooksMap.get(order.orderbookSymbol) match { case Some(orderbook) ⇒ - val pendingStandbyReply: Option[Future[_]] = - for (s ← standby) yield { s ? order } + standby.foreach(_ forward order) orderbook.addOrder(order) orderbook.matchOrders() - // wait for standby reply - pendingStandbyReply.foreach(waitForStandby(_)) - done(true) + + done(true, order) + case None ⇒ app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) - done(false) } } - def done(status: Boolean) { - sender ! new Rsp(status) - } - - def waitForStandby(pendingStandbyFuture: Future[_]) { - try { - pendingStandbyFuture.await - } catch { - case e: FutureTimeoutException ⇒ - app.eventHandler.error(this, "Standby timeout: " + e) + def done(status: Boolean, order: Order) { + if (standby.isEmpty) { + sender ! Rsp(order, status) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/OrderReceiver.scala similarity index 86% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala rename to akka-actor-tests/src/test/scala/akka/performance/trading/system/OrderReceiver.scala index 114fe7e349..b6d15e1185 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/OrderReceiver.scala @@ -1,4 +1,4 @@ -package akka.performance.trading.common +package akka.performance.trading.system import akka.performance.trading.domain._ import akka.actor._ @@ -28,20 +28,20 @@ class AkkaOrderReceiver extends Actor with OrderReceiver { type ME = ActorRef def receive = { + case order: Order ⇒ placeOrder(order) case routing @ MatchingEngineRouting(mapping) ⇒ refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) - case order: Order ⇒ placeOrder(order) - case unknown ⇒ app.eventHandler.warning(this, "Received unknown message: " + unknown) + case unknown ⇒ app.eventHandler.warning(this, "Received unknown message: " + unknown) } def placeOrder(order: Order) = { val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) matchingEngine match { case Some(m) ⇒ - m.forward(order) + m forward order case None ⇒ app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) - sender ! new Rsp(false) + sender ! Rsp(order, false) } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/Rsp.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/Rsp.scala new file mode 100644 index 0000000000..3151462e33 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/Rsp.scala @@ -0,0 +1,5 @@ +package akka.performance.trading.system + +import akka.performance.trading.domain.Order + +case class Rsp(order: Order, status: Boolean) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala new file mode 100644 index 0000000000..83c75c41a2 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala @@ -0,0 +1,169 @@ +package akka.performance.trading.system + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.Random +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics +import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics +import org.junit.runner.RunWith +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.performance.trading.domain.Ask +import akka.performance.trading.domain.Bid +import akka.performance.trading.domain.Order +import akka.performance.trading.domain.TotalTradeCounter +import akka.performance.workbench.PerformanceSpec +import akka.performance.trading.domain.Orderbook + +// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TradingLatencyPerformanceSpec extends PerformanceSpec { + + val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") + .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setCorePoolSize(maxClients) + .build + + var tradingSystem: AkkaTradingSystem = _ + + var stat: DescriptiveStatistics = _ + val random: Random = new Random(0) + + def clientDelayMicros = { + System.getProperty("benchmark.clientDelayMicros", "250").toInt + } + + override def beforeEach() { + super.beforeEach() + stat = new SynchronizedDescriptiveStatistics + tradingSystem = new AkkaTradingSystem(app) + tradingSystem.start() + TotalTradeCounter.reset() + stat = new SynchronizedDescriptiveStatistics + } + + override def afterEach() { + super.afterEach() + tradingSystem.shutdown() + stat = null + } + + getClass.getSimpleName must { + "warmup" in { + runScenario(4, warmup = true) + } + "warmup more" in { + runScenario(4, warmup = true) + } + "perform with load 1" in { + runScenario(1) + } + "perform with load 2" in { + runScenario(2) + } + "perform with load 4" in { + runScenario(4) + } + "perform with load 6" in { + runScenario(6) + } + "perform with load 8" in { + runScenario(8) + } + + } + + def runScenario(numberOfClients: Int, warmup: Boolean = false) { + if (acceptClients(numberOfClients)) { + + val repeat = 4L * repeatFactor + + val prefixes = "A" :: "B" :: "C" :: "D" :: Nil + val askOrders = for { + s ← prefixes + i ← 1 to 3 + } yield Ask(s + i, 100 - i, 1000) + val bidOrders = for { + s ← prefixes + i ← 1 to 3 + } yield Bid(s + i, 100 - i, 1000) + val orders = askOrders.zip(bidOrders).map(x ⇒ Seq(x._1, x._2)).flatten + + val ordersPerClient = repeat * orders.size / numberOfClients + val totalNumberOfOrders = ordersPerClient * numberOfClients + val latch = new CountDownLatch(numberOfClients) + val receivers = tradingSystem.orderReceivers.toIndexedSeq + val start = System.nanoTime + val clients = (for (i ← 0 until numberOfClients) yield { + val receiver = receivers(i % receivers.size) + val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelayMicros)).withDispatcher(clientDispatcher) + app.actorOf(props) + }) + + clients.foreach(_ ! "run") + val ok = latch.await((5000000L + (clientDelayMicros + 500) * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS) + val durationNs = (System.nanoTime - start) + + if (!warmup) { + ok must be(true) + if (!Orderbook.useDummyOrderbook) { + TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2) + } + logMeasurement(numberOfClients, durationNs, stat) + } + clients.foreach(_ ! PoisonPill) + } + } + + class Client( + orderReceiver: ActorRef, + orders: List[Order], + latch: CountDownLatch, + repeat: Long, + delayMicros: Int = 0) extends Actor { + + var orderIterator = orders.toIterator + def nextOrder(): Order = { + if (!orderIterator.hasNext) { + orderIterator = orders.toIterator + } + orderIterator.next() + } + + var sent = 0L + var received = 0L + + def receive = { + case Rsp(order, status) ⇒ + if (!status) { + app.eventHandler.error(this, "Invalid rsp") + } + val duration = System.nanoTime - order.nanoTime + stat.addValue(duration) + received += 1 + if (sent < repeat) { + PerformanceSpec.shortDelay(delayMicros, received) + placeOrder() + sent += 1 + } else if (received >= repeat) { + latch.countDown() + } + + case "run" ⇒ + // random initial delay to spread requests + val initialDelay = random.nextInt(20) + Thread.sleep(initialDelay) + placeOrder() + sent += 1 + } + + def placeOrder() { + orderReceiver ! nextOrder().withNanoTime + } + + } + +} + diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala similarity index 98% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala rename to akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala index 076639e4ae..c917e75fcb 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala @@ -1,4 +1,4 @@ -package akka.performance.trading.common +package akka.performance.trading.system import akka.performance.trading.domain.Orderbook import akka.performance.trading.domain.OrderbookRepository diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala new file mode 100644 index 0000000000..a59f164e16 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala @@ -0,0 +1,157 @@ +package akka.performance.trading.system + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.Random +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics +import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics +import org.junit.runner.RunWith +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.performance.trading.domain.Ask +import akka.performance.trading.domain.Bid +import akka.performance.trading.domain.Order +import akka.performance.trading.domain.TotalTradeCounter +import akka.performance.workbench.PerformanceSpec +import akka.performance.trading.domain.Orderbook + +// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TradingThroughputPerformanceSpec extends PerformanceSpec { + + val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") + .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setCorePoolSize(maxClients) + .build + + var tradingSystem: AkkaTradingSystem = _ + + override def beforeEach() { + super.beforeEach() + tradingSystem = new AkkaTradingSystem(app) + tradingSystem.start() + TotalTradeCounter.reset() + } + + override def afterEach() { + super.afterEach() + tradingSystem.shutdown() + } + + getClass.getSimpleName must { + "warmup" in { + runScenario(4, warmup = true) + } + "warmup more" in { + runScenario(4, warmup = true) + } + "perform with load 1" in { + runScenario(1) + } + "perform with load 2" in { + runScenario(2) + } + "perform with load 4" in { + runScenario(4) + } + "perform with load 6" in { + runScenario(6) + } + "perform with load 8" in { + runScenario(8) + } + "perform with load 10" in { + runScenario(10) + } + + } + + def runScenario(numberOfClients: Int, warmup: Boolean = false) { + if (acceptClients(numberOfClients)) { + + val repeat = 400L * repeatFactor + + val prefixes = "A" :: "B" :: "C" :: "D" :: "E" :: "F" :: Nil + val askOrders = for { + s ← prefixes + i ← 1 to 4 + } yield Ask(s + i, 100 - i, 1000) + val bidOrders = for { + s ← prefixes + i ← 1 to 4 + } yield Bid(s + i, 100 - i, 1000) + val orders = askOrders.zip(bidOrders).map(x ⇒ Seq(x._1, x._2)).flatten + + val ordersPerClient = repeat * orders.size / numberOfClients + val totalNumberOfOrders = ordersPerClient * numberOfClients + val latch = new CountDownLatch(numberOfClients) + val receivers = tradingSystem.orderReceivers.toIndexedSeq + val start = System.nanoTime + val clients = (for (i ← 0 until numberOfClients) yield { + val receiver = receivers(i % receivers.size) + val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(clientDispatcher) + app.actorOf(props) + }) + + clients.foreach(_ ! "run") + val ok = latch.await((5000000L + 500 * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS) + val durationNs = (System.nanoTime - start) + + if (!warmup) { + ok must be(true) + if (!Orderbook.useDummyOrderbook) { + TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2) + } + logMeasurement(numberOfClients, durationNs, totalNumberOfOrders) + } + clients.foreach(_ ! PoisonPill) + } + } + + class Client( + orderReceiver: ActorRef, + orders: List[Order], + latch: CountDownLatch, + repeat: Long) extends Actor { + + var orderIterator = orders.toIterator + def nextOrder(): Order = { + if (!orderIterator.hasNext) { + orderIterator = orders.toIterator + } + orderIterator.next() + } + + var sent = 0L + var received = 0L + + def receive = { + case Rsp(order, status) ⇒ + if (!status) { + app.eventHandler.error(this, "Invalid rsp") + } + received += 1 + if (sent < repeat) { + placeOrder() + sent += 1 + } else if (received >= repeat) { + latch.countDown() + } + + case "run" ⇒ + for (i ← 0L until math.min(1000L, repeat)) { + placeOrder() + sent += 1 + } + } + + def placeOrder() { + orderReceiver ! nextOrder() + } + + } + +} + diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala new file mode 100644 index 0000000000..b1c5612024 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala @@ -0,0 +1,113 @@ +package akka.performance.workbench + +import scala.collection.immutable.TreeMap + +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics +import org.scalatest.BeforeAndAfterEach + +import akka.actor.simpleName +import akka.testkit.AkkaSpec +import akka.AkkaApplication + +trait PerformanceSpec extends AkkaSpec with BeforeAndAfterEach { + + def app: AkkaApplication + + def isBenchmark() = System.getProperty("benchmark") == "true" + + def minClients() = System.getProperty("benchmark.minClients", "1").toInt; + + def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt; + + def repeatFactor() = { + val defaultRepeatFactor = if (isBenchmark) "150" else "2" + System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt + } + + def timeDilation() = { + System.getProperty("benchmark.timeDilation", "1").toLong + } + + val resultRepository = BenchResultRepository() + lazy val report = new Report(app, resultRepository, compareResultWith) + + /** + * To compare two tests with each other you can override this method, in + * the test. For example Some("OneWayPerformanceTest") + */ + def compareResultWith: Option[String] = None + + def acceptClients(numberOfClients: Int): Boolean = { + (minClients <= numberOfClients && numberOfClients <= maxClients) + } + + def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) { + val name = simpleName(this) + val durationS = durationNs.toDouble / 1000000000.0 + + val stats = Stats( + name, + load = numberOfClients, + timestamp = TestStart.startTime, + durationNanos = durationNs, + n = n, + tps = (n.toDouble / durationS)) + + logMeasurement(stats) + } + + def logMeasurement(numberOfClients: Int, durationNs: Long, stat: DescriptiveStatistics) { + val name = simpleName(this) + val durationS = durationNs.toDouble / 1000000000.0 + + val percentiles = TreeMap[Int, Long]( + 5 -> (stat.getPercentile(5.0) / 1000).toLong, + 25 -> (stat.getPercentile(25.0) / 1000).toLong, + 50 -> (stat.getPercentile(50.0) / 1000).toLong, + 75 -> (stat.getPercentile(75.0) / 1000).toLong, + 95 -> (stat.getPercentile(95.0) / 1000).toLong) + + val n = stat.getN + + val stats = Stats( + name, + load = numberOfClients, + timestamp = TestStart.startTime, + durationNanos = durationNs, + n = n, + min = (stat.getMin / 1000).toLong, + max = (stat.getMax / 1000).toLong, + mean = (stat.getMean / 1000).toLong, + tps = (n.toDouble / durationS), + percentiles) + + logMeasurement(stats) + } + + def logMeasurement(stats: Stats) { + try { + resultRepository.add(stats) + report.html(resultRepository.get(stats.name)) + } catch { + // don't fail test due to problems saving bench report + case e: Exception ⇒ app.eventHandler.error(e, this, e.getMessage) + } + } + +} + +object PerformanceSpec { + def shortDelay(micros: Int, n: Long) { + if (micros > 0) { + val sampling = 1000 / micros + if (n % sampling == 0) { + Thread.sleep(1) + } + } + } +} + +object TestStart { + val startTime = System.currentTimeMillis +} + diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index 9e13b032c5..232bdef36f 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -33,17 +33,20 @@ class Report( sb.append(resultTable) sb.append("\n\n") - sb.append(img(percentilesAndMeanChart(current))) sb.append(img(latencyAndThroughputChart(current))) compareWithHistoricalTpsChart(statistics).foreach(url ⇒ sb.append(img(url))) - for (stats ← statistics) { - compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) - } + if (current.max > 0L) { + sb.append(img(percentilesAndMeanChart(current))) - for (stats ← statistics) { - comparePercentilesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) + for (stats ← statistics) { + compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + for (stats ← statistics) { + comparePercentilesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) + } } sb.append("