Separate latency and throughput measurement in performance tests. Fixes #1333

* TradingThroughputPerformanceSpec and microbench.TellThroughputPerformanceSpec for throughput with high load
* TradingLatencyPerformanceSpec and microbench.TellLatencyPerformanceSpec for latency with moderate load
* Removed usage of JUnit
* Removed all usage of latches for flow control, instead replies with ordinary tell. This means that trading sample generates 4 msg for each transaction.
* Removed req-rsp test from trading, since it adds complexity and isn't realistic
This commit is contained in:
Patrik Nordwall 2011-10-31 16:15:53 +01:00
parent 1e3ab2645f
commit 3681d0fe4e
25 changed files with 765 additions and 724 deletions

View file

@ -1,90 +0,0 @@
package akka.performance.microbench
import scala.collection.immutable.TreeMap
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
import org.scalatest.BeforeAndAfterEach
import akka.actor.simpleName
import akka.performance.workbench.BenchResultRepository
import akka.performance.workbench.Report
import akka.performance.workbench.Stats
import akka.testkit.AkkaSpec
import akka.AkkaApplication
trait PerformanceSpec extends AkkaSpec with BeforeAndAfterEach {
def app: AkkaApplication
def isBenchmark() = System.getProperty("benchmark") == "true"
def minClients() = System.getProperty("benchmark.minClients", "1").toInt;
def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt;
def repeatFactor() = {
val defaultRepeatFactor = if (isBenchmark) "150" else "2"
System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt
}
def sampling = {
System.getProperty("benchmark.sampling", "200").toInt
}
var stat: DescriptiveStatistics = _
override def beforeEach() {
stat = new SynchronizedDescriptiveStatistics
}
val resultRepository = BenchResultRepository()
lazy val report = new Report(app, resultRepository, compareResultWith)
/**
* To compare two tests with each other you can override this method, in
* the test. For example Some("OneWayPerformanceTest")
*/
def compareResultWith: Option[String] = None
def logMeasurement(scenario: String, numberOfClients: Int, durationNs: Long) {
try {
val name = simpleName(this)
val durationS = durationNs.toDouble / 1000000000.0
val percentiles = TreeMap[Int, Long](
5 -> (stat.getPercentile(5.0) / 1000).toLong,
25 -> (stat.getPercentile(25.0) / 1000).toLong,
50 -> (stat.getPercentile(50.0) / 1000).toLong,
75 -> (stat.getPercentile(75.0) / 1000).toLong,
95 -> (stat.getPercentile(95.0) / 1000).toLong)
val n = stat.getN * sampling
val stats = Stats(
name,
load = numberOfClients,
timestamp = TestStart.startTime,
durationNanos = durationNs,
n = n,
min = (stat.getMin / 1000).toLong,
max = (stat.getMax / 1000).toLong,
mean = (stat.getMean / 1000).toLong,
tps = (n.toDouble / durationS),
percentiles)
resultRepository.add(stats)
report.html(resultRepository.get(name))
} catch {
// don't fail test due to problems saving bench report
case e: Exception app.eventHandler.error(this, e.getMessage)
}
}
}
object TestStart {
val startTime = System.currentTimeMillis
}

View file

@ -0,0 +1,140 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.junit.runner.RunWith
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import java.util.Random
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellLatencyPerformanceSpec extends PerformanceSpec {
import TellLatencyPerformanceSpec._
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(8)
.build
val repeat = 200L * repeatFactor
def clientDelayMicros = {
System.getProperty("benchmark.clientDelayMicros", "250").toInt
}
var stat: DescriptiveStatistics = _
override def beforeEach() {
stat = new SynchronizedDescriptiveStatistics
}
"Tell" must {
"warmup" in {
runScenario(2, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val clients = (for (i 0 until numberOfClients) yield {
val destination = app.actorOf[Destination]
val w4 = app.actorOf(new Waypoint(destination))
val w3 = app.actorOf(new Waypoint(w4))
val w2 = app.actorOf(new Waypoint(w3))
val w1 = app.actorOf(new Waypoint(w2))
Props(new Client(w1, latch, repeatsPerClient, clientDelayMicros, stat)).withDispatcher(clientDispatcher)
}).toList.map(app.actorOf(_))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, stat)
}
clients.foreach(_ ! PoisonPill)
}
}
}
}
object TellLatencyPerformanceSpec {
val random: Random = new Random(0)
case object Run
case class Msg(nanoTime: Long = System.nanoTime)
class Waypoint(next: ActorRef) extends Actor {
def receive = {
case msg: Msg next forward msg
}
}
class Destination extends Actor {
def receive = {
case msg: Msg sender ! msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long,
delayMicros: Int,
stat: DescriptiveStatistics) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg(sendTime)
val duration = System.nanoTime - sendTime
stat.addValue(duration)
received += 1
if (sent < repeat) {
PerformanceSpec.shortDelay(delayMicros, received)
actor ! Msg()
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
// random initial delay to spread requests
val initialDelay = random.nextInt(20)
Thread.sleep(initialDelay)
actor ! Msg()
sent += 1
}
}
}

View file

@ -1,125 +0,0 @@
package akka.performance.microbench
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.junit.runner.RunWith
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellPerformanceSpec extends PerformanceSpec {
import TellPerformanceSpec._
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build
val repeat = repeatFactor * 30000
"Tell" must {
"warmup" in {
runScenario(2, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (numberOfClients <= maxClients) {
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val clients = (for (i 0 until numberOfClients) yield {
val c = app.actorOf[Destination]
val b = app.actorOf(new Waypoint(c))
val a = app.actorOf(new Waypoint(b))
Props(new Client(a, latch, repeatsPerClient, sampling, stat)).withDispatcher(clientDispatcher)
}).toList.map(app.actorOf(_))
val start = System.nanoTime
clients.foreach(_ ! Run)
latch.await(30, TimeUnit.SECONDS) must be(true)
val durationNs = (System.nanoTime - start)
if (!warmup) {
logMeasurement("one-way tell", numberOfClients, durationNs)
}
clients.foreach(_ ! PoisonPill)
}
}
}
}
object TellPerformanceSpec {
case object Run
case class Msg(latch: Option[CountDownLatch])
class Waypoint(next: ActorRef) extends Actor {
def receive = {
case msg: Msg next ! msg
}
}
class Destination extends Actor {
def receive = {
case Msg(latch) latch.foreach(_.countDown())
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Int,
sampling: Int,
stat: DescriptiveStatistics) extends Actor {
def receive = {
case Run
val msgWithoutLatch = Msg(None)
for (n 1 to repeat) {
if (measureLatency(n)) {
val t0 = System.nanoTime
tellAndAwait()
val duration = System.nanoTime - t0
stat.addValue(duration)
} else if (measureLatency(n + 1) || n == repeat) {
tellAndAwait()
} else {
actor ! msgWithoutLatch
}
}
latch.countDown()
}
def tellAndAwait() {
val msgLatch = new CountDownLatch(1)
actor ! Msg(Some(msgLatch))
val ok = msgLatch.await(10, TimeUnit.SECONDS)
if (!ok) app.eventHandler.error(this, "Too long delay")
}
def measureLatency(n: Int) = (n % sampling == 0)
}
}

View file

@ -0,0 +1,132 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.junit.runner.RunWith
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.dispatch.Dispatchers
import akka.dispatch.Dispatcher
import akka.dispatch.Dispatchers
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughputPerformanceSpec extends PerformanceSpec {
import TellThroughputPerformanceSpec._
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build
val destinationDispatcher = app.dispatcherFactory.newDispatcher("destination-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build
val repeat = 30000L * repeatFactor
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
"perform with load 12" in {
runScenario(12)
}
"perform with load 14" in {
runScenario(14)
}
"perform with load 16" in {
runScenario(16)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield app.actorOf(Props(new Destination).withDispatcher(destinationDispatcher))
val clients = for (dest destinations)
yield app.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat)
}
clients.foreach(_ ! PoisonPill)
destinations.foreach(_ ! PoisonPill)
}
}
}
}
object TellThroughputPerformanceSpec {
case object Run
case object Msg
class Destination extends Actor {
def receive = {
case Msg sender ! Msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg
received += 1
if (sent < repeat) {
actor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
for (i 0L until math.min(1000L, repeat)) {
actor ! Msg
sent += 1
}
}
}
}

View file

@ -1,86 +0,0 @@
package akka.performance.trading.common
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.performance.trading.domain._
import akka.performance.trading.common._
import akka.actor.{ Props, ActorRef, Actor, PoisonPill }
import akka.AkkaApplication
abstract class AkkaPerformanceTest(val app: AkkaApplication) extends BenchmarkScenarios {
type TS = AkkaTradingSystem
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.setMaxPoolSize(maxClients)
.build
override def createTradingSystem: TS = new AkkaTradingSystem(app)
/**
* Implemented in subclass
*/
def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp
override def runScenario(scenario: String, orders: List[Order], repeat: Int, numberOfClients: Int, delayMs: Int) = {
val totalNumberOfRequests = orders.size * repeat
val repeatsPerClient = repeat / numberOfClients
val oddRepeats = repeat - (repeatsPerClient * numberOfClients)
val latch = new CountDownLatch(numberOfClients)
val receivers = tradingSystem.orderReceivers.toIndexedSeq
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
Props(new Client(receiver, orders, latch, repeatsPerClient + (if (i < oddRepeats) 1 else 0), sampling, delayMs)).withDispatcher(clientDispatcher)
}).toList.map(app.actorOf(_))
clients.foreach(_ ! "run")
val ok = latch.await((5000 + (2 + delayMs) * totalNumberOfRequests) * timeDilation, TimeUnit.MILLISECONDS)
val durationNs = (System.nanoTime - start)
assert(ok)
assert((orders.size / 2) * repeat == TotalTradeCounter.counter.get)
logMeasurement(scenario, numberOfClients, durationNs)
clients.foreach(_ ! PoisonPill)
}
class Client(
orderReceiver: ActorRef,
orders: List[Order],
latch: CountDownLatch,
repeat: Int,
sampling: Int,
delayMs: Int = 0) extends Actor {
def receive = {
case "run"
var n = 0
for (r 1 to repeat; o orders) {
n += 1
val rsp =
if (measureLatency(n)) {
val t0 = System.nanoTime
val rsp = placeOrder(orderReceiver, o, await = true)
val duration = System.nanoTime - t0
stat.addValue(duration)
rsp
} else {
val await = measureLatency(n + 1) || (r == repeat)
placeOrder(orderReceiver, o, await)
}
if (!rsp.status) {
app.eventHandler.error(this, "Invalid rsp")
}
delay(delayMs)
}
latch.countDown()
}
def measureLatency(n: Int) = (n % sampling == 0)
}
}

View file

@ -1,62 +0,0 @@
package akka.performance.trading.common
import org.junit._
import akka.performance.trading.domain._
trait BenchmarkScenarios extends PerformanceTest {
@Test
def complexScenario1 = complexScenario(1)
@Test
def complexScenario2 = complexScenario(2)
@Test
def complexScenario4 = complexScenario(4)
@Test
def complexScenario6 = complexScenario(6)
@Test
def complexScenario8 = complexScenario(8)
@Test
def complexScenario10 = complexScenario(10)
@Test
def complexScenario20 = complexScenario(20)
@Test
def complexScenario30 = complexScenario(30)
@Test
def complexScenario40 = complexScenario(40)
@Test
def complexScenario60 = complexScenario(60)
@Test
def complexScenario80 = complexScenario(80)
@Test
def complexScenario100 = complexScenario(100)
/*
@Test
def complexScenario200 = complexScenario(200)
@Test
def complexScenario300 = complexScenario(300)
@Test
def complexScenario400 = complexScenario(400)
*/
def complexScenario(numberOfClients: Int) {
Assume.assumeTrue(numberOfClients >= minClients)
Assume.assumeTrue(numberOfClients <= maxClients)
val repeat = 500 * repeatFactor
val prefixes = "A" :: "B" :: "C" :: "D" :: "E" :: Nil
val askOrders = for {
s prefixes
i 1 to 3
} yield new Ask(s + i, 100 - i, 1000)
val bidOrders = for {
s prefixes
i 1 to 3
} yield new Bid(s + i, 100 - i, 1000)
val orders = askOrders ::: bidOrders
runScenario("benchmark", orders, repeat, numberOfClients, 0)
}
}

View file

@ -1,160 +0,0 @@
package akka.performance.trading.common
import java.util.Random
import scala.collection.immutable.TreeMap
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
import org.junit.After
import org.junit.Before
import org.scalatest.junit.JUnitSuite
import akka.performance.trading.domain.Ask
import akka.performance.trading.domain.Bid
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.BenchResultRepository
import akka.performance.workbench.Report
import akka.performance.workbench.Stats
import akka.AkkaApplication
import akka.actor.simpleName
trait PerformanceTest extends JUnitSuite {
def app: AkkaApplication
var isWarm = false
def isBenchmark() = System.getProperty("benchmark") == "true"
def minClients() = System.getProperty("benchmark.minClients", "1").toInt;
def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt;
def repeatFactor() = {
val defaultRepeatFactor = if (isBenchmark) "150" else "2"
System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt
}
def warmupRepeatFactor() = {
val defaultRepeatFactor = if (isBenchmark) "200" else "1"
System.getProperty("benchmark.warmupRepeatFactor", defaultRepeatFactor).toInt
}
def randomSeed() = {
System.getProperty("benchmark.randomSeed", "0").toInt
}
def timeDilation() = {
System.getProperty("benchmark.timeDilation", "1").toLong
}
def sampling = {
System.getProperty("benchmark.sampling", "200").toInt
}
var stat: DescriptiveStatistics = _
val resultRepository = BenchResultRepository()
lazy val report = new Report(app, resultRepository, compareResultWith)
type TS <: TradingSystem
var tradingSystem: TS = _
val random: Random = new Random(randomSeed)
def createTradingSystem(): TS
def placeOrder(orderReceiver: TS#OR, order: Order, await: Boolean): Rsp
def runScenario(scenario: String, orders: List[Order], repeat: Int, numberOfClients: Int, delayMs: Int)
@Before
def setUp() {
stat = new SynchronizedDescriptiveStatistics
tradingSystem = createTradingSystem()
tradingSystem.start()
warmUp()
TotalTradeCounter.reset()
stat = new SynchronizedDescriptiveStatistics
}
@After
def tearDown() {
tradingSystem.shutdown()
stat = null
}
def warmUp() {
val bid = new Bid("A1", 100, 1000)
val ask = new Ask("A1", 100, 1000)
val orderReceiver = tradingSystem.orderReceivers.head
val loopCount = if (isWarm) 1 else 10 * warmupRepeatFactor
for (i 1 to loopCount) {
placeOrder(orderReceiver, bid, true)
placeOrder(orderReceiver, ask, true)
}
isWarm = true
}
/**
* To compare two tests with each other you can override this method, in
* the test. For example Some("OneWayPerformanceTest")
*/
def compareResultWith: Option[String] = None
def logMeasurement(scenario: String, numberOfClients: Int, durationNs: Long) {
try {
val name = simpleName(this)
val durationS = durationNs.toDouble / 1000000000.0
val percentiles = TreeMap[Int, Long](
5 -> (stat.getPercentile(5.0) / 1000).toLong,
25 -> (stat.getPercentile(25.0) / 1000).toLong,
50 -> (stat.getPercentile(50.0) / 1000).toLong,
75 -> (stat.getPercentile(75.0) / 1000).toLong,
95 -> (stat.getPercentile(95.0) / 1000).toLong)
val n = stat.getN * sampling
val stats = Stats(
name,
load = numberOfClients,
timestamp = TestStart.startTime,
durationNanos = durationNs,
n = n,
min = (stat.getMin / 1000).toLong,
max = (stat.getMax / 1000).toLong,
mean = (stat.getMean / 1000).toLong,
tps = (n.toDouble / durationS),
percentiles)
resultRepository.add(stats)
report.html(resultRepository.get(name))
} catch {
// don't fail test due to problems saving bench report
case e: Exception app.eventHandler.error(this, e.getMessage)
}
}
def delay(delayMs: Int) {
val adjustedDelay =
if (delayMs >= 5) {
val dist = 0.2 * delayMs
(delayMs + random.nextGaussian * dist).intValue
} else {
delayMs
}
if (adjustedDelay > 0) {
Thread.sleep(adjustedDelay)
}
}
}
object TestStart {
val startTime = System.currentTimeMillis
}

View file

@ -1,3 +0,0 @@
package akka.performance.trading.common
case class Rsp(status: Boolean)

View file

@ -1,16 +0,0 @@
package akka.performance.trading.domain
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
trait LatchMessage {
val count: Int
lazy val latch: CountDownLatch = new CountDownLatch(count)
}
object LatchOrder {
def apply(order: Order) = order match {
case bid: Bid new Bid(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 }
case ask: Ask new Ask(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 }
}
}

View file

@ -4,26 +4,34 @@ trait Order {
def orderbookSymbol: String
def price: Long
def volume: Long
def nanoTime: Long
def withNanoTime: Order
}
case class Bid(
orderbookSymbol: String,
price: Long,
volume: Long)
volume: Long,
nanoTime: Long = 0L)
extends Order {
def split(newVolume: Long) = {
new Bid(orderbookSymbol, price, newVolume)
}
def withNanoTime: Bid = copy(nanoTime = System.nanoTime)
}
case class Ask(
orderbookSymbol: String,
price: Long,
volume: Long)
volume: Long,
nanoTime: Long = 0L)
extends Order {
def split(newVolume: Long) = {
new Ask(orderbookSymbol, price, newVolume)
}
def withNanoTime: Ask = copy(nanoTime = System.nanoTime)
}

View file

@ -8,7 +8,9 @@ abstract trait TradeObserver {
trait SimpleTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
val c = TotalTradeCounter.counter.incrementAndGet
if (!Orderbook.useDummyOrderbook) {
TotalTradeCounter.counter.incrementAndGet
}
}
}

View file

@ -1,24 +0,0 @@
package akka.performance.trading.oneway
import akka.actor._
import akka.dispatch.MessageDispatcher
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.common.AkkaMatchingEngine
class OneWayMatchingEngine(meId: String, orderbooks: List[Orderbook]) extends AkkaMatchingEngine(meId, orderbooks) {
override def handleOrder(order: Order) {
orderbooksMap.get(order.orderbookSymbol) match {
case Some(orderbook)
standby.foreach(_ ! order)
orderbook.addOrder(order)
orderbook.matchOrders()
case None
app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol)
}
}
}

View file

@ -1,19 +0,0 @@
package akka.performance.trading.oneway
import akka.actor._
import akka.dispatch.MessageDispatcher
import akka.performance.trading.domain._
import akka.performance.trading.common.AkkaOrderReceiver
class OneWayOrderReceiver extends AkkaOrderReceiver {
override def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
case Some(m)
m ! order
case None
app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol)
}
}
}

View file

@ -1,58 +0,0 @@
package akka.performance.trading.oneway
import java.util.concurrent.TimeUnit
import org.junit.Test
import akka.performance.trading.common.AkkaPerformanceTest
import akka.performance.trading.common.Rsp
import akka.performance.trading.domain._
import akka.actor.{ Props, ActorRef }
import akka.AkkaApplication
// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark.useDummyOrderbook=true -Dbenchmark=true -Dbenchmark.minClients=1 -Dbenchmark.maxClients=40 -Dbenchmark.repeatFactor=500
class OneWayPerformanceTest extends AkkaPerformanceTest(AkkaApplication()) {
val Ok = new Rsp(true)
override def createTradingSystem: TS = new OneWayTradingSystem(app) {
override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) = meDispatcher match {
case Some(d) app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks) with LatchMessageCountDown).withDispatcher(d))
case _ app.actorOf(new OneWayMatchingEngine(meId, orderbooks) with LatchMessageCountDown)
}
}
override def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp = {
if (await) {
val newOrder = LatchOrder(order)
orderReceiver ! newOrder
val ok = newOrder.latch.await(10, TimeUnit.SECONDS)
new Rsp(ok)
} else {
orderReceiver ! order
Ok
}
}
// need this so that junit will detect this as a test case
@Test
def dummy {}
override def compareResultWith = Some("RspPerformanceTest")
def createLatchOrder(order: Order) = order match {
case bid: Bid new Bid(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 }
case ask: Ask new Ask(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 }
}
}
trait LatchMessageCountDown extends OneWayMatchingEngine {
override def handleOrder(order: Order) {
super.handleOrder(order)
order match {
case x: LatchMessage x.latch.countDown
case _
}
}
}

View file

@ -1,20 +0,0 @@
package akka.performance.trading.oneway
import akka.performance.trading.common.AkkaTradingSystem
import akka.performance.trading.domain.Orderbook
import akka.actor.{ Props, ActorRef }
import akka.AkkaApplication
class OneWayTradingSystem(_app: AkkaApplication) extends AkkaTradingSystem(_app) {
override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) = meDispatcher match {
case Some(d) app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks)).withDispatcher(d))
case _ app.actorOf(Props(new OneWayMatchingEngine(meId, orderbooks)))
}
override def createOrderReceiver() = orDispatcher match {
case Some(d) app.actorOf(Props[OneWayOrderReceiver].withDispatcher(d))
case _ app.actorOf(Props[OneWayOrderReceiver])
}
}

View file

@ -1,25 +0,0 @@
package akka.performance.trading.response
import org.junit.Test
import akka.actor.ActorRef
import akka.performance.trading.common.AkkaPerformanceTest
import akka.performance.trading.domain.Order
import akka.performance.trading.common.Rsp
import akka.AkkaApplication
class RspPerformanceTest extends AkkaPerformanceTest(AkkaApplication()) {
implicit def appl = app
override def placeOrder(orderReceiver: ActorRef, order: Order, await: Boolean): Rsp = {
(orderReceiver ? order).get.asInstanceOf[Rsp]
}
// need this so that junit will detect this as a test case
@Test
def dummy {}
override def compareResultWith = Some("OneWayPerformanceTest")
}

View file

@ -1,4 +1,4 @@
package akka.performance.trading.common
package akka.performance.trading.system
import akka.performance.trading.domain._
import akka.actor._
@ -11,7 +11,7 @@ trait MatchingEngine {
val orderbooks: List[Orderbook]
val supportedOrderbookSymbols = orderbooks map (_.symbol)
protected val orderbooksMap: Map[String, Orderbook] =
Map() ++ (orderbooks map (o (o.symbol, o)))
orderbooks.map(o (o.symbol, o)).toMap
}
@ -21,10 +21,10 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook])
var standby: Option[ActorRef] = None
def receive = {
case standbyRef: ActorRef
standby = Some(standbyRef)
case order: Order
handleOrder(order)
case standbyRef: ActorRef
standby = Some(standbyRef)
case unknown
app.eventHandler.warning(this, "Received unknown message: " + unknown)
}
@ -32,30 +32,21 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook])
def handleOrder(order: Order) {
orderbooksMap.get(order.orderbookSymbol) match {
case Some(orderbook)
val pendingStandbyReply: Option[Future[_]] =
for (s standby) yield { s ? order }
standby.foreach(_ forward order)
orderbook.addOrder(order)
orderbook.matchOrders()
// wait for standby reply
pendingStandbyReply.foreach(waitForStandby(_))
done(true)
done(true, order)
case None
app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol)
done(false)
}
}
def done(status: Boolean) {
sender ! new Rsp(status)
}
def waitForStandby(pendingStandbyFuture: Future[_]) {
try {
pendingStandbyFuture.await
} catch {
case e: FutureTimeoutException
app.eventHandler.error(this, "Standby timeout: " + e)
def done(status: Boolean, order: Order) {
if (standby.isEmpty) {
sender ! Rsp(order, status)
}
}

View file

@ -1,4 +1,4 @@
package akka.performance.trading.common
package akka.performance.trading.system
import akka.performance.trading.domain._
import akka.actor._
@ -28,20 +28,20 @@ class AkkaOrderReceiver extends Actor with OrderReceiver {
type ME = ActorRef
def receive = {
case order: Order placeOrder(order)
case routing @ MatchingEngineRouting(mapping)
refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]])
case order: Order placeOrder(order)
case unknown app.eventHandler.warning(this, "Received unknown message: " + unknown)
case unknown app.eventHandler.warning(this, "Received unknown message: " + unknown)
}
def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
case Some(m)
m.forward(order)
m forward order
case None
app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol)
sender ! new Rsp(false)
sender ! Rsp(order, false)
}
}
}

View file

@ -0,0 +1,5 @@
package akka.performance.trading.system
import akka.performance.trading.domain.Order
case class Rsp(order: Order, status: Boolean)

View file

@ -0,0 +1,169 @@
package akka.performance.trading.system
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.Random
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
import org.junit.runner.RunWith
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.performance.trading.domain.Ask
import akka.performance.trading.domain.Bid
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.PerformanceSpec
import akka.performance.trading.domain.Orderbook
// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TradingLatencyPerformanceSpec extends PerformanceSpec {
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build
var tradingSystem: AkkaTradingSystem = _
var stat: DescriptiveStatistics = _
val random: Random = new Random(0)
def clientDelayMicros = {
System.getProperty("benchmark.clientDelayMicros", "250").toInt
}
override def beforeEach() {
super.beforeEach()
stat = new SynchronizedDescriptiveStatistics
tradingSystem = new AkkaTradingSystem(app)
tradingSystem.start()
TotalTradeCounter.reset()
stat = new SynchronizedDescriptiveStatistics
}
override def afterEach() {
super.afterEach()
tradingSystem.shutdown()
stat = null
}
getClass.getSimpleName must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val repeat = 4L * repeatFactor
val prefixes = "A" :: "B" :: "C" :: "D" :: Nil
val askOrders = for {
s prefixes
i 1 to 3
} yield Ask(s + i, 100 - i, 1000)
val bidOrders = for {
s prefixes
i 1 to 3
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
val latch = new CountDownLatch(numberOfClients)
val receivers = tradingSystem.orderReceivers.toIndexedSeq
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelayMicros)).withDispatcher(clientDispatcher)
app.actorOf(props)
})
clients.foreach(_ ! "run")
val ok = latch.await((5000000L + (clientDelayMicros + 500) * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
if (!Orderbook.useDummyOrderbook) {
TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2)
}
logMeasurement(numberOfClients, durationNs, stat)
}
clients.foreach(_ ! PoisonPill)
}
}
class Client(
orderReceiver: ActorRef,
orders: List[Order],
latch: CountDownLatch,
repeat: Long,
delayMicros: Int = 0) extends Actor {
var orderIterator = orders.toIterator
def nextOrder(): Order = {
if (!orderIterator.hasNext) {
orderIterator = orders.toIterator
}
orderIterator.next()
}
var sent = 0L
var received = 0L
def receive = {
case Rsp(order, status)
if (!status) {
app.eventHandler.error(this, "Invalid rsp")
}
val duration = System.nanoTime - order.nanoTime
stat.addValue(duration)
received += 1
if (sent < repeat) {
PerformanceSpec.shortDelay(delayMicros, received)
placeOrder()
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case "run"
// random initial delay to spread requests
val initialDelay = random.nextInt(20)
Thread.sleep(initialDelay)
placeOrder()
sent += 1
}
def placeOrder() {
orderReceiver ! nextOrder().withNanoTime
}
}
}

View file

@ -1,4 +1,4 @@
package akka.performance.trading.common
package akka.performance.trading.system
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain.OrderbookRepository

View file

@ -0,0 +1,157 @@
package akka.performance.trading.system
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.Random
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
import org.junit.runner.RunWith
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.performance.trading.domain.Ask
import akka.performance.trading.domain.Bid
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.PerformanceSpec
import akka.performance.trading.domain.Orderbook
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TradingThroughputPerformanceSpec extends PerformanceSpec {
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build
var tradingSystem: AkkaTradingSystem = _
override def beforeEach() {
super.beforeEach()
tradingSystem = new AkkaTradingSystem(app)
tradingSystem.start()
TotalTradeCounter.reset()
}
override def afterEach() {
super.afterEach()
tradingSystem.shutdown()
}
getClass.getSimpleName must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val repeat = 400L * repeatFactor
val prefixes = "A" :: "B" :: "C" :: "D" :: "E" :: "F" :: Nil
val askOrders = for {
s prefixes
i 1 to 4
} yield Ask(s + i, 100 - i, 1000)
val bidOrders = for {
s prefixes
i 1 to 4
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
val latch = new CountDownLatch(numberOfClients)
val receivers = tradingSystem.orderReceivers.toIndexedSeq
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(clientDispatcher)
app.actorOf(props)
})
clients.foreach(_ ! "run")
val ok = latch.await((5000000L + 500 * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
if (!Orderbook.useDummyOrderbook) {
TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2)
}
logMeasurement(numberOfClients, durationNs, totalNumberOfOrders)
}
clients.foreach(_ ! PoisonPill)
}
}
class Client(
orderReceiver: ActorRef,
orders: List[Order],
latch: CountDownLatch,
repeat: Long) extends Actor {
var orderIterator = orders.toIterator
def nextOrder(): Order = {
if (!orderIterator.hasNext) {
orderIterator = orders.toIterator
}
orderIterator.next()
}
var sent = 0L
var received = 0L
def receive = {
case Rsp(order, status)
if (!status) {
app.eventHandler.error(this, "Invalid rsp")
}
received += 1
if (sent < repeat) {
placeOrder()
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case "run"
for (i 0L until math.min(1000L, repeat)) {
placeOrder()
sent += 1
}
}
def placeOrder() {
orderReceiver ! nextOrder()
}
}
}

View file

@ -0,0 +1,113 @@
package akka.performance.workbench
import scala.collection.immutable.TreeMap
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import org.scalatest.BeforeAndAfterEach
import akka.actor.simpleName
import akka.testkit.AkkaSpec
import akka.AkkaApplication
trait PerformanceSpec extends AkkaSpec with BeforeAndAfterEach {
def app: AkkaApplication
def isBenchmark() = System.getProperty("benchmark") == "true"
def minClients() = System.getProperty("benchmark.minClients", "1").toInt;
def maxClients() = System.getProperty("benchmark.maxClients", "40").toInt;
def repeatFactor() = {
val defaultRepeatFactor = if (isBenchmark) "150" else "2"
System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt
}
def timeDilation() = {
System.getProperty("benchmark.timeDilation", "1").toLong
}
val resultRepository = BenchResultRepository()
lazy val report = new Report(app, resultRepository, compareResultWith)
/**
* To compare two tests with each other you can override this method, in
* the test. For example Some("OneWayPerformanceTest")
*/
def compareResultWith: Option[String] = None
def acceptClients(numberOfClients: Int): Boolean = {
(minClients <= numberOfClients && numberOfClients <= maxClients)
}
def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) {
val name = simpleName(this)
val durationS = durationNs.toDouble / 1000000000.0
val stats = Stats(
name,
load = numberOfClients,
timestamp = TestStart.startTime,
durationNanos = durationNs,
n = n,
tps = (n.toDouble / durationS))
logMeasurement(stats)
}
def logMeasurement(numberOfClients: Int, durationNs: Long, stat: DescriptiveStatistics) {
val name = simpleName(this)
val durationS = durationNs.toDouble / 1000000000.0
val percentiles = TreeMap[Int, Long](
5 -> (stat.getPercentile(5.0) / 1000).toLong,
25 -> (stat.getPercentile(25.0) / 1000).toLong,
50 -> (stat.getPercentile(50.0) / 1000).toLong,
75 -> (stat.getPercentile(75.0) / 1000).toLong,
95 -> (stat.getPercentile(95.0) / 1000).toLong)
val n = stat.getN
val stats = Stats(
name,
load = numberOfClients,
timestamp = TestStart.startTime,
durationNanos = durationNs,
n = n,
min = (stat.getMin / 1000).toLong,
max = (stat.getMax / 1000).toLong,
mean = (stat.getMean / 1000).toLong,
tps = (n.toDouble / durationS),
percentiles)
logMeasurement(stats)
}
def logMeasurement(stats: Stats) {
try {
resultRepository.add(stats)
report.html(resultRepository.get(stats.name))
} catch {
// don't fail test due to problems saving bench report
case e: Exception app.eventHandler.error(e, this, e.getMessage)
}
}
}
object PerformanceSpec {
def shortDelay(micros: Int, n: Long) {
if (micros > 0) {
val sampling = 1000 / micros
if (n % sampling == 0) {
Thread.sleep(1)
}
}
}
}
object TestStart {
val startTime = System.currentTimeMillis
}

View file

@ -33,17 +33,20 @@ class Report(
sb.append(resultTable)
sb.append("\n</pre>\n")
sb.append(img(percentilesAndMeanChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
compareWithHistoricalTpsChart(statistics).foreach(url sb.append(img(url)))
for (stats statistics) {
compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
if (current.max > 0L) {
sb.append(img(percentilesAndMeanChart(current)))
for (stats statistics) {
comparePercentilesAndMeanChart(stats).foreach(url sb.append(img(url)))
for (stats statistics) {
compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
}
sb.append("<hr/>\n")

View file

@ -13,8 +13,17 @@ case class Stats(
max: Long = 0L,
mean: Double = 0.0,
tps: Double = 0.0,
percentiles: TreeMap[Int, Long] = TreeMap.empty) {
percentiles: TreeMap[Int, Long] = Stats.emptyPercentiles) {
def median: Long = percentiles(50)
}
object Stats {
val emptyPercentiles = TreeMap[Int, Long](
5 -> 0L,
25 -> 0L,
50 -> 0L,
75 -> 0L,
95 -> 0L)
}