Ticket 981: Refactoring, renamed and consolidated

This commit is contained in:
Patrik Nordwall 2011-07-04 20:03:25 +02:00
parent 8d724c1b5a
commit 2c3b6ba8b3
20 changed files with 267 additions and 344 deletions

View file

@ -1,59 +0,0 @@
package akka.performance.trading.akka
import akka.actor._
import akka.dispatch.Future
import akka.dispatch.FutureTimeoutException
import akka.dispatch.MessageDispatcher
import akka.performance.trading.common.MatchingEngine
import akka.performance.trading.domain._
import akka.performance.trading.domain.SupportedOrderbooksReq
import akka.dispatch.MessageDispatcher
import akka.actor.ActorRef
class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook], disp: Option[MessageDispatcher]) extends Actor with MatchingEngine {
for (d disp) {
self.dispatcher = d
}
var standby: Option[ActorRef] = None
def receive = {
case standbyRef: ActorRef
standby = Some(standbyRef)
case SupportedOrderbooksReq
self.channel ! orderbooks
case order: Order
handleOrder(order)
case unknown
println("Received unknown message: " + unknown)
}
def handleOrder(order: Order) {
orderbooksMap.get(order.orderbookSymbol) match {
case Some(orderbook)
// println(meId + " " + order)
val pendingStandbyReply: Option[Future[_]] =
for (s standby) yield { s ? order }
orderbook.addOrder(order)
orderbook.matchOrders()
// wait for standby reply
pendingStandbyReply.foreach(waitForStandby(_))
self.channel ! new Rsp(true)
case None
println("Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol)
self.channel ! new Rsp(false)
}
}
def waitForStandby(pendingStandbyFuture: Future[_]) {
try {
pendingStandbyFuture.await
} catch {
case e: FutureTimeoutException println("### standby timeout: " + e)
}
}
}

View file

@ -1,38 +0,0 @@
package akka.performance.trading.akka
import akka.performance.trading.common.OrderReceiver
import akka.actor._
import akka.dispatch.MessageDispatcher
import akka.performance.trading.domain._
class AkkaOrderReceiver(val matchingEngines: List[ActorRef], disp: Option[MessageDispatcher])
extends Actor with OrderReceiver {
type ME = ActorRef
for (d disp) {
self.dispatcher = d
}
def receive = {
case order: Order placeOrder(order)
case unknown println("Received unknown message: " + unknown)
}
override def supportedOrderbooks(me: ActorRef): List[Orderbook] = {
(me ? SupportedOrderbooksReq).get.asInstanceOf[List[Orderbook]]
}
def placeOrder(order: Order) = {
if (matchingEnginePartitionsIsStale) refreshMatchingEnginePartitions()
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
case Some(m)
// println("receiver " + order)
m.forward(order)
case None
println("Unknown orderbook: " + order.orderbookSymbol)
self.channel ! new Rsp(false)
}
}
}

View file

@ -1,75 +0,0 @@
package akka.performance.trading.akka
import akka.performance.trading.common._
import akka.performance.trading.domain.Orderbook
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.dispatch.MessageDispatcher
import akka.actor.PoisonPill
class AkkaTradingSystem extends TradingSystem {
type ME = ActorRef
type OR = ActorRef
val orDispatcher = createOrderReceiverDispatcher
val meDispatcher = createMatchingEngineDispatcher
// by default we use default-dispatcher that is defined in akka.conf
def createOrderReceiverDispatcher: Option[MessageDispatcher] = None
// by default we use default-dispatcher that is defined in akka.conf
def createMatchingEngineDispatcher: Option[MessageDispatcher] = None
var matchingEngineForOrderbook: Map[String, ActorRef] = Map()
override def createMatchingEngines = {
var i = 0
val pairs =
for (orderbooks: List[Orderbook] orderbooksGroupedByMatchingEngine) yield {
i = i + 1
val me = createMatchingEngine("ME" + i, orderbooks)
val orderbooksCopy = orderbooks map (o Orderbook(o.symbol, true))
val standbyOption =
if (useStandByEngines) {
val meStandby = createMatchingEngine("ME" + i + "s", orderbooksCopy)
Some(meStandby)
} else {
None
}
(me, standbyOption)
}
Map() ++ pairs;
}
def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
actorOf(new AkkaMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceivers: List[ActorRef] = {
val primaryMatchingEngines = matchingEngines.map(pair pair._1).toList
(1 to 10).toList map (i createOrderReceiver(primaryMatchingEngines))
}
def createOrderReceiver(matchingEngines: List[ActorRef]) =
actorOf(new AkkaOrderReceiver(matchingEngines, orDispatcher))
override def start() {
for ((p, s) matchingEngines) {
p.start()
// standby is optional
s.foreach(_.start())
s.foreach(p ! _)
}
orderReceivers.foreach(_.start())
}
override def shutdown() {
orderReceivers.foreach(_ ! PoisonPill)
for ((p, s) matchingEngines) {
p ! PoisonPill
// standby is optional
s.foreach(_ ! PoisonPill)
}
}
}

View file

@ -1,16 +0,0 @@
package akka.performance.trading.akkabang
import akka.performance.trading.akka._
import akka.performance.trading.domain.Orderbook
import akka.actor.Actor._
import akka.actor.ActorRef
class AkkaBangTradingSystem extends AkkaTradingSystem {
override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
actorOf(new AkkaBangMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceiver(matchingEngines: List[ActorRef]) =
actorOf(new AkkaBangOrderReceiver(matchingEngines, orDispatcher))
}

View file

@ -1,4 +1,4 @@
package akka.performance.trading.akka
package akka.performance.trading.common
import org.junit._
import Assert._
@ -12,8 +12,8 @@ import akka.actor.Actor.actorOf
import akka.dispatch.Dispatchers
import akka.actor.PoisonPill
class AkkaPerformanceTest extends BenchmarkScenarios // with OtherPerformanceScenarios
{
abstract class AkkaPerformanceTest extends BenchmarkScenarios {
type TS = AkkaTradingSystem
val clientDispatcher = Dispatchers.newDispatcher("client-dispatcher")
@ -24,13 +24,10 @@ class AkkaPerformanceTest extends BenchmarkScenarios // with OtherPerformanceSce
override def createTradingSystem: TS = new AkkaTradingSystem
override def placeOrder(orderReceiver: ActorRef, order: Order): Rsp = {
(orderReceiver ? order).get.asInstanceOf[Rsp]
}
// need this so that junit will detect this as a test case
@Test
def dummy {}
/**
* Implemented in subclass
*/
def placeOrder(orderReceiver: ActorRef, order: Order): Rsp
override def runScenario(scenario: String, orders: List[Order], repeat: Int, numberOfClients: Int, delayMs: Int) = {
val totalNumberOfRequests = orders.size * repeat

View file

@ -1,6 +1,10 @@
package akka.performance.trading.common
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain._
import akka.actor._
import akka.dispatch.Future
import akka.dispatch.FutureTimeoutException
import akka.dispatch.MessageDispatcher
trait MatchingEngine {
val meId: String
@ -10,3 +14,56 @@ trait MatchingEngine {
Map() ++ (orderbooks map (o (o.symbol, o)))
}
class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook], disp: Option[MessageDispatcher])
extends Actor with MatchingEngine {
for (d disp) {
self.dispatcher = d
}
var standby: Option[ActorRef] = None
def receive = {
case standbyRef: ActorRef
standby = Some(standbyRef)
case SupportedOrderbooksReq
self.channel ! orderbooks
case order: Order
handleOrder(order)
case unknown
println("Received unknown message: " + unknown)
}
def handleOrder(order: Order) {
orderbooksMap.get(order.orderbookSymbol) match {
case Some(orderbook)
// println(meId + " " + order)
val pendingStandbyReply: Option[Future[_]] =
for (s standby) yield { s ? order }
orderbook.addOrder(order)
orderbook.matchOrders()
// wait for standby reply
pendingStandbyReply.foreach(waitForStandby(_))
done(true)
case None
println("Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol)
done(false)
}
}
def done(status: Boolean) {
self.channel ! new Rsp(status)
}
def waitForStandby(pendingStandbyFuture: Future[_]) {
try {
pendingStandbyFuture.await
} catch {
case e: FutureTimeoutException println("### standby timeout: " + e)
}
}
}

View file

@ -1,6 +1,8 @@
package akka.performance.trading.common
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain._
import akka.actor._
import akka.dispatch.MessageDispatcher
trait OrderReceiver {
type ME
@ -22,3 +24,34 @@ trait OrderReceiver {
def supportedOrderbooks(me: ME): List[Orderbook]
}
class AkkaOrderReceiver(val matchingEngines: List[ActorRef], disp: Option[MessageDispatcher])
extends Actor with OrderReceiver {
type ME = ActorRef
for (d disp) {
self.dispatcher = d
}
def receive = {
case order: Order placeOrder(order)
case unknown println("Received unknown message: " + unknown)
}
override def supportedOrderbooks(me: ActorRef): List[Orderbook] = {
(me ? SupportedOrderbooksReq).get.asInstanceOf[List[Orderbook]]
}
def placeOrder(order: Order) = {
if (matchingEnginePartitionsIsStale) refreshMatchingEnginePartitions()
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
case Some(m)
// println("receiver " + order)
m.forward(order)
case None
println("Unknown orderbook: " + order.orderbookSymbol)
self.channel ! new Rsp(false)
}
}
}

View file

@ -1,70 +0,0 @@
package akka.performance.trading.common
import org.junit._
import akka.performance.trading.domain._
trait OtherPerformanceScenarios extends PerformanceTest {
@Test
def simpleScenario {
val repeat = 300 * repeatFactor
val numberOfClients = tradingSystem.orderReceivers.size
val bid = new Bid("A1", 100, 1000)
val ask = new Ask("A1", 100, 1000)
val orders = bid :: ask :: Nil
runScenario("simpleScenario", orders, repeat, numberOfClients, 0)
}
@Test
def manyOrderbooks {
val repeat = 2 * repeatFactor
val numberOfClients = tradingSystem.orderReceivers.size
val orderbooks = tradingSystem.allOrderbookSymbols
val askOrders = for (o orderbooks) yield new Ask(o, 100, 1000)
val bidOrders = for (o orderbooks) yield new Bid(o, 100, 1000)
val orders = askOrders ::: bidOrders
runScenario("manyOrderbooks", orders, repeat, numberOfClients, 5)
}
@Test
def manyClients {
val repeat = 1 * repeatFactor
val numberOfClients = tradingSystem.orderReceivers.size * 10
val orderbooks = tradingSystem.allOrderbookSymbols
val askOrders = for (o orderbooks) yield new Ask(o, 100, 1000)
val bidOrders = for (o orderbooks) yield new Bid(o, 100, 1000)
val orders = askOrders ::: bidOrders
runScenario("manyClients", orders, repeat, numberOfClients, 5)
}
@Test
def oneClient {
val repeat = 10000 * repeatFactor
val numberOfClients = 1
val bid = new Bid("A1", 100, 1000)
val ask = new Ask("A1", 100, 1000)
val orders = bid :: ask :: Nil
runScenario("oneClient", orders, repeat, numberOfClients, 0)
}
@Test
def oneSlowClient {
val repeat = 300 * repeatFactor
val numberOfClients = 1
val bid = new Bid("A1", 100, 1000)
val ask = new Ask("A1", 100, 1000)
val orders = bid :: ask :: Nil
runScenario("oneSlowClient", orders, repeat, numberOfClients, 5)
}
}

View file

@ -2,6 +2,10 @@ package akka.performance.trading.common
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain.OrderbookRepository
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.dispatch.MessageDispatcher
trait TradingSystem {
type ME
@ -29,3 +33,70 @@ trait TradingSystem {
def shutdown()
}
class AkkaTradingSystem extends TradingSystem {
type ME = ActorRef
type OR = ActorRef
val orDispatcher = createOrderReceiverDispatcher
val meDispatcher = createMatchingEngineDispatcher
// by default we use default-dispatcher that is defined in akka.conf
def createOrderReceiverDispatcher: Option[MessageDispatcher] = None
// by default we use default-dispatcher that is defined in akka.conf
def createMatchingEngineDispatcher: Option[MessageDispatcher] = None
var matchingEngineForOrderbook: Map[String, ActorRef] = Map()
override def createMatchingEngines = {
var i = 0
val pairs =
for (orderbooks: List[Orderbook] orderbooksGroupedByMatchingEngine) yield {
i = i + 1
val me = createMatchingEngine("ME" + i, orderbooks)
val orderbooksCopy = orderbooks map (o Orderbook(o.symbol, true))
val standbyOption =
if (useStandByEngines) {
val meStandby = createMatchingEngine("ME" + i + "s", orderbooksCopy)
Some(meStandby)
} else {
None
}
(me, standbyOption)
}
Map() ++ pairs;
}
def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
actorOf(new AkkaMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceivers: List[ActorRef] = {
val primaryMatchingEngines = matchingEngines.map(pair pair._1).toList
(1 to 10).toList map (i createOrderReceiver(primaryMatchingEngines))
}
def createOrderReceiver(matchingEngines: List[ActorRef]) =
actorOf(new AkkaOrderReceiver(matchingEngines, orDispatcher))
override def start() {
for ((p, s) matchingEngines) {
p.start()
// standby is optional
s.foreach(_.start())
s.foreach(p ! _)
}
orderReceivers.foreach(_.start())
}
override def shutdown() {
orderReceivers.foreach(_ ! PoisonPill)
for ((p, s) matchingEngines) {
p ! PoisonPill
// standby is optional
s.foreach(_ ! PoisonPill)
}
}
}

View file

@ -1,23 +0,0 @@
package akka.performance.trading.domain
abstract class DummyOrderbook(symbol: String) extends Orderbook(symbol) {
var count = 0
var bid: Bid = _
var ask: Ask = _
override def addOrder(order: Order) {
count += 1
order match {
case b: Bid bid = b
case a: Ask ask = a
}
}
override def matchOrders() {
if (count % 2 == 0)
trade(bid, ask)
}
def trade(bid: Bid, ask: Ask)
}

View file

@ -14,6 +14,7 @@ abstract class Orderbook(val symbol: String) {
}
}
// this is by intention not tuned for performance to simulate some work
def matchOrders() {
if (!bidSide.isEmpty && !askSide.isEmpty) {
val topOfBook = (bidSide.head, askSide.head)
@ -57,3 +58,25 @@ object Orderbook {
case true if useDummyOrderbook new DummyOrderbook(symbol) with StandbyTradeObserver
}
}
abstract class DummyOrderbook(symbol: String) extends Orderbook(symbol) {
var count = 0
var bid: Bid = _
var ask: Ask = _
override def addOrder(order: Order) {
count += 1
order match {
case b: Bid bid = b
case a: Ask ask = a
}
}
override def matchOrders() {
if (count % 2 == 0)
trade(bid, ask)
}
def trade(bid: Bid, ask: Ask)
}

View file

@ -1,9 +0,0 @@
package akka.performance.trading.domain
trait SimpleTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
val c = TotalTradeCounter.counter.incrementAndGet
// println("trade " + c + " " + bid + " -- " + ask)
}
}

View file

@ -1,7 +0,0 @@
package akka.performance.trading.domain
trait StandbyTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
}
}

View file

@ -1,11 +0,0 @@
package akka.performance.trading.domain
import java.util.concurrent.atomic.AtomicInteger
object TotalTradeCounter {
val counter = new AtomicInteger
def reset() {
counter.set(0)
}
}

View file

@ -1,7 +1,27 @@
package akka.performance.trading.domain
import java.util.concurrent.atomic.AtomicInteger
abstract trait TradeObserver {
def trade(bid: Bid, ask: Ask)
}
trait SimpleTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
val c = TotalTradeCounter.counter.incrementAndGet
// println("trade " + c + " " + bid + " -- " + ask)
}
}
trait StandbyTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
}
}
object TotalTradeCounter {
val counter = new AtomicInteger
def reset() {
counter.set(0)
}
}

View file

@ -1,13 +1,12 @@
package akka.performance.trading.akkabang
package akka.performance.trading.oneway
import akka.actor._
import akka.dispatch.MessageDispatcher
import akka.performance.trading.akka._
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.common.AkkaMatchingEngine
class AkkaBangMatchingEngine(meId: String, orderbooks: List[Orderbook], disp: Option[MessageDispatcher])
class OneWayMatchingEngine(meId: String, orderbooks: List[Orderbook], disp: Option[MessageDispatcher])
extends AkkaMatchingEngine(meId, orderbooks, disp) {
override def handleOrder(order: Order) {

View file

@ -1,12 +1,11 @@
package akka.performance.trading.akkabang
package akka.performance.trading.oneway
import akka.actor._
import akka.dispatch.MessageDispatcher
import akka.performance.trading.akka._
import akka.performance.trading.domain._
import akka.performance.trading.common.AkkaOrderReceiver
class AkkaBangOrderReceiver(matchingEngines: List[ActorRef], disp: Option[MessageDispatcher])
class OneWayOrderReceiver(matchingEngines: List[ActorRef], disp: Option[MessageDispatcher])
extends AkkaOrderReceiver(matchingEngines, disp) {
override def placeOrder(order: Order) = {

View file

@ -1,23 +1,19 @@
package akka.performance.trading.akkabang
package akka.performance.trading.oneway
import org.junit._
import Assert._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.performance.trading.akka._
import akka.performance.trading.domain._
import akka.performance.trading.common._
import org.junit.Test
import akka.actor.ActorRef
import akka.actor.Actor.actorOf
import akka.actor.ActorRef
import akka.performance.trading.common.AkkaPerformanceTest
import akka.performance.trading.domain._
class AkkaBangPerformanceTest extends AkkaPerformanceTest {
class OneWayPerformanceTest extends AkkaPerformanceTest {
override def createTradingSystem: TS = new AkkaBangTradingSystem {
override def createTradingSystem: TS = new OneWayTradingSystem {
override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
actorOf(new AkkaBangMatchingEngine(meId, orderbooks, meDispatcher) with LatchMessageCountDown)
actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher) with LatchMessageCountDown)
}
override def placeOrder(orderReceiver: ActorRef, order: Order): Rsp = {
@ -29,7 +25,7 @@ class AkkaBangPerformanceTest extends AkkaPerformanceTest {
// need this so that junit will detect this as a test case
@Test
override def dummy {}
def dummy {}
def createLatchOrder(order: Order) = order match {
case bid: Bid new Bid(order.orderbookSymbol, order.price, order.volume) with LatchMessage { val count = 2 }
@ -38,7 +34,7 @@ class AkkaBangPerformanceTest extends AkkaPerformanceTest {
}
trait LatchMessageCountDown extends AkkaBangMatchingEngine {
trait LatchMessageCountDown extends OneWayMatchingEngine {
override def handleOrder(order: Order) {
super.handleOrder(order)

View file

@ -0,0 +1,16 @@
package akka.performance.trading.oneway
import akka.actor.Actor.actorOf
import akka.actor.ActorRef
import akka.performance.trading.common.AkkaTradingSystem
import akka.performance.trading.domain.Orderbook
class OneWayTradingSystem extends AkkaTradingSystem {
override def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceiver(matchingEngines: List[ActorRef]) =
actorOf(new OneWayOrderReceiver(matchingEngines, orDispatcher))
}

View file

@ -0,0 +1,20 @@
package akka.performance.trading.response
import org.junit.Test
import akka.actor.ActorRef
import akka.performance.trading.common.AkkaPerformanceTest
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.Rsp
class RspPerformanceTest extends AkkaPerformanceTest {
override def placeOrder(orderReceiver: ActorRef, order: Order): Rsp = {
(orderReceiver ? order).get.asInstanceOf[Rsp]
}
// need this so that junit will detect this as a test case
@Test
def dummy {}
}