Merge remote-tracking branch 'origin/master' into wip-improve-docs-rk

This commit is contained in:
Roland 2011-12-23 21:56:46 +01:00
commit ddefec8daa
14 changed files with 416 additions and 22 deletions

View file

@ -1,5 +1,6 @@
package akka.performance.trading.domain
import akka.performance.workbench.BenchmarkConfig
import akka.actor.ActorSystem
abstract class Orderbook(val symbol: String) {
var bidSide: List[Bid] = Nil
@ -52,10 +53,12 @@ object Orderbook {
val useDummyOrderbook = BenchmarkConfig.config.getBoolean("benchmark.useDummyOrderbook")
def apply(symbol: String, standby: Boolean): Orderbook = (useDummyOrderbook, standby) match {
def apply(symbol: String, standby: Boolean, _system: ActorSystem): Orderbook = (useDummyOrderbook, standby) match {
case (false, false) new Orderbook(symbol) with NopTradeObserver
case (false, true) new Orderbook(symbol) with TotalTradeObserver
case (true, _) new DummyOrderbook(symbol) with NopTradeObserver
case (false, true) new Orderbook(symbol) with TotalTradeObserver {
override def system = _system
}
case (true, _) new DummyOrderbook(symbol) with NopTradeObserver
}
}

View file

@ -1,14 +1,21 @@
package akka.performance.trading.domain
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl
import akka.actor.ActorSystem
abstract trait TradeObserver {
def trade(bid: Bid, ask: Ask)
}
trait TotalTradeObserver extends TradeObserver {
def system: ActorSystem
private lazy val counter: TotalTradeCounter = TotalTradeCounterExtension(system)
override def trade(bid: Bid, ask: Ask) {
TotalTradeCounter.counter.incrementAndGet
counter.increment()
}
}
@ -17,10 +24,19 @@ trait NopTradeObserver extends TradeObserver {
}
}
object TotalTradeCounter {
val counter = new AtomicInteger
class TotalTradeCounter extends Extension {
private val counter = new AtomicInteger
def increment() = counter.incrementAndGet()
def reset() {
counter.set(0)
}
def count: Int = counter.get
}
object TotalTradeCounterExtension
extends ExtensionId[TotalTradeCounter]
with ExtensionIdProvider {
override def lookup = TotalTradeCounterExtension
override def createExtension(system: ActorSystemImpl) = new TotalTradeCounter
}

View file

@ -15,6 +15,7 @@ import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.PerformanceSpec
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain.TotalTradeCounterExtension
// -server -Xms512M -Xmx1024M -XX:+UseConcMarkSweepGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -25,12 +26,14 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
var stat: DescriptiveStatistics = _
val random: Random = new Random(0)
def totalTradeCounter: TotalTradeCounter = TotalTradeCounterExtension(system)
override def beforeEach() {
super.beforeEach()
stat = new SynchronizedDescriptiveStatistics
tradingSystem = new AkkaTradingSystem(system)
tradingSystem.start()
TotalTradeCounter.reset()
totalTradeCounter.reset()
stat = new SynchronizedDescriptiveStatistics
}
@ -101,7 +104,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
if (!warmup) {
ok must be(true)
if (!Orderbook.useDummyOrderbook) {
TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2)
totalTradeCounter.count must be(totalNumberOfOrders / 2)
}
logMeasurement(numberOfClients, durationNs, stat)
}

View file

@ -13,9 +13,7 @@ trait TradingSystem {
val allOrderbookSymbols: List[String] = OrderbookRepository.allOrderbookSymbols
val orderbooksGroupedByMatchingEngine: List[List[Orderbook]] =
for (groupOfSymbols: List[String] OrderbookRepository.orderbookSymbolsGroupedByMatchingEngine)
yield groupOfSymbols map (s Orderbook(s, false))
def orderbooksGroupedByMatchingEngine: List[List[Orderbook]]
def useStandByEngines: Boolean = true
@ -47,6 +45,10 @@ class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem {
// by default we use default-dispatcher
def matchingEngineDispatcher: Option[String] = None
override val orderbooksGroupedByMatchingEngine: List[List[Orderbook]] =
for (groupOfSymbols: List[String] OrderbookRepository.orderbookSymbolsGroupedByMatchingEngine)
yield groupOfSymbols map (s Orderbook(s, false, system))
var matchingEngineForOrderbook: Map[String, ActorRef] = Map()
override def createMatchingEngines: List[MatchingEngineInfo] = {
@ -55,7 +57,7 @@ class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem {
n = i + 1
} yield {
val me = createMatchingEngine("ME" + n, orderbooks)
val orderbooksCopy = orderbooks map (o Orderbook(o.symbol, true))
val orderbooksCopy = orderbooks map (o Orderbook(o.symbol, true, system))
val standbyOption =
if (useStandByEngines) {
val meStandby = createMatchingEngine("ME" + n + "s", orderbooksCopy)

View file

@ -15,6 +15,7 @@ import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.PerformanceSpec
import akka.performance.trading.domain.Orderbook
import akka.performance.trading.domain.TotalTradeCounterExtension
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -Dbenchmark.useDummyOrderbook=true
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -22,11 +23,13 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
var tradingSystem: AkkaTradingSystem = _
def totalTradeCounter: TotalTradeCounter = TotalTradeCounterExtension(system)
override def beforeEach() {
super.beforeEach()
tradingSystem = new AkkaTradingSystem(system)
tradingSystem.start()
TotalTradeCounter.reset()
totalTradeCounter.reset()
}
override def afterEach() {
@ -98,7 +101,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
if (!warmup) {
ok must be(true)
if (!Orderbook.useDummyOrderbook) {
TotalTradeCounter.counter.get must be(totalNumberOfOrders / 2)
totalTradeCounter.count must be(totalNumberOfOrders / 2)
}
logMeasurement(numberOfClients, durationNs, totalNumberOfOrders)
}

View file

@ -50,25 +50,25 @@ class FileBenchResultRepository extends BenchResultRepository {
case class Key(name: String, load: Int)
def add(stats: Stats) {
def add(stats: Stats): Unit = synchronized {
val values = statsByName.getOrElseUpdate(stats.name, IndexedSeq.empty)
statsByName(stats.name) = values :+ stats
save(stats)
}
def get(name: String): Seq[Stats] = {
def get(name: String): Seq[Stats] = synchronized {
statsByName.getOrElse(name, IndexedSeq.empty)
}
def get(name: String, load: Int): Option[Stats] = {
def get(name: String, load: Int): Option[Stats] = synchronized {
get(name).find(_.load == load)
}
def isBaseline(stats: Stats): Boolean = {
def isBaseline(stats: Stats): Boolean = synchronized {
baselineStats.get(Key(stats.name, stats.load)) == Some(stats)
}
def getWithHistorical(name: String, load: Int): Seq[Stats] = {
def getWithHistorical(name: String, load: Int): Seq[Stats] = synchronized {
val key = Key(name, load)
val historical = historicalStats.getOrElse(key, IndexedSeq.empty)
val baseline = baselineStats.get(key)

View file

@ -0,0 +1,138 @@
REMOTE CALCULATOR
=================
Requirements
------------
To build and run remote calculator you need [Simple Build Tool][sbt] (sbt).
The Sample Explained
--------------------
In order to showcase the remote capabilities of Akka 2.0 we thought a remote calculator could do the trick.
There are three actor systems used in the sample:
* CalculatorApplication : the actor system performing the number crunching
* LookupApplication : illustrates how to look up an actor on a remote node and and how communicate with that actor
* CreationApplication : illustrates how to create an actor on a remote node and how to communicate with that actor
The CalculatorApplication contains an actor, SimpleCalculatorActor, which can handle simple math operations such as
addition and subtraction. This actor is looked up and used from the LookupApplication.
The CreationApplication wants to use more "advanced" mathematical operations, such as multiplication and division,
but as the CalculatorApplication does not have any actor that can perform those type of calculations the
CreationApplication has to remote deploy an actor that can (which in our case is AdvancedCalculatorActor).
So this actor is deployed, over the network, onto the CalculatorApplication actor system and thereafter the
CreationApplication will send messages to it.
It is important to point out that as the actor system run on different ports it is possible to run all three in parallel.
See the next section for more information of how to run the sample application.
Running
-------
In order to run all three actor systems you have to start SBT in three different terminal windows.
We start off by running the CalculatorApplication:
First type 'sbt' to start SBT interactively, the run 'update' and 'run':
> cd $AKKA_HOME
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.CalcApp" which in the case below is number 3:
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 3
You should see something similar to this::
[info] Running sample.remote.calculator.CalcApp
[INFO] [12/22/2011 14:21:51.631] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://CalculatorApplication@127.0.0.1:2552
[INFO] [12/22/2011 14:21:51.632] [run-main] [Remote] Starting remote server on [akka://CalculatorApplication@127.0.0.1:2552]
Started Calculator Application - waiting for messages
[INFO] [12/22/2011 14:22:39.894] [New I/O server worker #1-1] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2553
Open up a new terminal window and run SBT once more:
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1:
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 1
Now you should see something like this:
[info] Running sample.remote.calculator.LookupApp
[INFO] [12/22/2011 14:54:38.630] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://LookupApplication@127.0.0.1:2553
[INFO] [12/22/2011 14:54:38.632] [run-main] [Remote] Starting remote server on [akka://LookupApplication@127.0.0.1:2553]
Started Lookup Application
[INFO] [12/22/2011 14:54:38.801] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552
Sub result: 4 - 30 = -26
Add result: 17 + 1 = 18
Add result: 37 + 43 = 80
Add result: 68 + 66 = 134
Congrats! You have now successfully looked up a remote actor and communicated with it.
The next step is to have an actor deployed on a remote note.
Once more you should open a new terminal window and run SBT:
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2:
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 2
Now you should see something like this:
[info] Running sample.remote.calculator.CreationApp
[INFO] [12/22/2011 14:57:02.150] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://RemoteCreation@127.0.0.1:2554
[INFO] [12/22/2011 14:57:02.151] [run-main] [Remote] Starting remote server on [akka://RemoteCreation@127.0.0.1:2554]
[INFO] [12/22/2011 14:57:02.267] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552
Started Creation Application
Mul result: 14 * 17 = 238
Div result: 3764 / 80 = 47.00
Mul result: 16 * 5 = 80
Mul result: 1 * 18 = 18
Mul result: 8 * 13 = 104
That's it!
Notice
------
The sample application is just that, i.e. a sample. Parts of it are not the way you would do a "real" application.
Some improvements are to remove all hard coded addresses from the code as they reduce the flexibility of how and
where the application can be run. We leave this to the astute reader to refine the sample into a real-world app.
[akka]: http://akka.io
[sbt]: http://code.google.com/p/simple-build-tool/

View file

@ -0,0 +1,35 @@
calculator {
include "common"
akka {
remote.server.port = 2552
cluster.nodename = "n1"
}
}
remotelookup {
include "common"
akka {
remote.server.port = 2553
cluster.nodename = "n2"
}
}
remotecreation {
include "common"
akka {
actor {
deployment { /advancedCalculator {
remote = "akka://CalculatorApplication@127.0.0.1:2552"
}
}
}
remote.server.port = 2554
cluster.nodename = "n3"
}
}

View file

@ -0,0 +1,14 @@
akka {
version = "2.0-SNAPSHOT"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteSupport"
server {
hostname = "127.0.0.1"
}
}
}

View file

@ -0,0 +1,38 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator
import akka.kernel.Bootable
import akka.actor.{ Props, Actor, ActorSystem }
import com.typesafe.config.ConfigFactory
class SimpleCalculatorActor extends Actor {
def receive = {
case Add(n1, n2)
println("Calculating %d + %d".format(n1, n2))
sender ! AddResult(n1, n2, n1 + n2)
case Subtract(n1, n2)
println("Calculating %d - %d".format(n1, n2))
sender ! SubtractResult(n1, n2, n1 - n2)
}
}
class CalculatorApplication extends Bootable {
val system = ActorSystem("CalculatorApplication", ConfigFactory.load.getConfig("calculator"))
val actor = system.actorOf(Props[SimpleCalculatorActor], "simpleCalculator")
def startup() {
}
def shutdown() {
system.shutdown()
}
}
object CalcApp {
def main(args: Array[String]) {
new CalculatorApplication
println("Started Calculator Application - waiting for messages")
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator
import akka.kernel.Bootable
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor._
class CreationApplication extends Bootable {
val system = ActorSystem("RemoteCreation", ConfigFactory.load.getConfig("remotecreation"))
val localActor = system.actorOf(Props[CreationActor], "creationActor")
val remoteActor = system.actorOf(Props[AdvancedCalculatorActor], "advancedCalculator")
def doSomething(op: MathOp) = {
localActor ! (remoteActor, op)
}
def startup() {
}
def shutdown() {
system.shutdown()
}
}
class CreationActor extends Actor {
def receive = {
case (actor: ActorRef, op: MathOp) actor ! op
case result: MathResult result match {
case MultiplicationResult(n1, n2, r) println("Mul result: %d * %d = %d".format(n1, n2, r))
case DivisionResult(n1, n2, r) println("Div result: %.0f / %d = %.2f".format(n1, n2, r))
}
}
}
object CreationApp {
def main(args: Array[String]) {
val app = new CreationApplication
println("Started Creation Application")
while (true) {
if (Random.nextInt(100) % 2 == 0) app.doSomething(Multiply(Random.nextInt(20), Random.nextInt(20)))
else app.doSomething(Divide(Random.nextInt(10000), (Random.nextInt(99) + 1)))
Thread.sleep(200)
}
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator
import akka.kernel.Bootable
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor.{ ActorRef, Props, Actor, ActorSystem }
class LookupApplication extends Bootable {
val system = ActorSystem("LookupApplication", ConfigFactory.load.getConfig("remotelookup"))
val actor = system.actorOf(Props[LookupActor], "lookupActor")
val remoteActor = system.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator")
def doSomething(op: MathOp) = {
actor ! (remoteActor, op)
}
def startup() {
}
def shutdown() {
system.shutdown()
}
}
class LookupActor extends Actor {
def receive = {
case (actor: ActorRef, op: MathOp) actor ! op
case result: MathResult result match {
case AddResult(n1, n2, r) println("Add result: %d + %d = %d".format(n1, n2, r))
case SubtractResult(n1, n2, r) println("Sub result: %d - %d = %d".format(n1, n2, r))
}
}
}
object LookupApp {
def main(args: Array[String]) {
val app = new LookupApplication
println("Started Lookup Application")
while (true) {
if (Random.nextInt(100) % 2 == 0) app.doSomething(Add(Random.nextInt(100), Random.nextInt(100)))
else app.doSomething(Subtract(Random.nextInt(100), Random.nextInt(100)))
Thread.sleep(200)
}
}
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator
import akka.actor.Actor
trait MathOp
case class Add(nbr1: Int, nbr2: Int) extends MathOp
case class Subtract(nbr1: Int, nbr2: Int) extends MathOp
case class Multiply(nbr1: Int, nbr2: Int) extends MathOp
case class Divide(nbr1: Int, nbr2: Int) extends MathOp
trait MathResult
case class AddResult(nbr: Int, nbr2: Int, result: Int) extends MathResult
case class SubtractResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
case class MultiplicationResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
case class DivisionResult(nbr1: Double, nbr2: Int, result: Double) extends MathResult
class AdvancedCalculatorActor extends Actor {
def receive = {
case Multiply(n1, n2)
println("Calculating %d * %d".format(n1, n2))
sender ! MultiplicationResult(n1, n2, n1 * n2)
case Divide(n1, n2)
println("Calculating %d / %d".format(n1, n2))
sender ! DivisionResult(n1, n2, n1 / n2)
}
}

View file

@ -25,7 +25,7 @@ object AkkaBuild extends Build {
id = "akka",
base = file("."),
settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq(
parallelExecution in GlobalScope := true,
parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "true").toBoolean,
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
@ -224,7 +224,7 @@ object AkkaBuild extends Build {
id = "akka-samples",
base = file("akka-samples"),
settings = parentSettings,
aggregate = Seq(fsmSample, helloSample, helloKernelSample)
aggregate = Seq(fsmSample, helloSample, helloKernelSample, remoteSample)
)
lazy val fsmSample = Project(
@ -248,6 +248,13 @@ object AkkaBuild extends Build {
settings = defaultSettings
)
lazy val remoteSample = Project(
id = "akka-sample-remote",
base = file("akka-samples/akka-sample-remote"),
dependencies = Seq(actor, remote, kernel),
settings = defaultSettings
)
lazy val tutorials = Project(
id = "akka-tutorials",
base = file("akka-tutorials"),
@ -311,7 +318,7 @@ object AkkaBuild extends Build {
unmanagedClasspath in Runtime <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
unmanagedClasspath in Test <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
parallelExecution in Test := true,
parallelExecution in Test := System.getProperty("akka.parallelExecution", "true").toBoolean,
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)
excludeTestNames := {