Merge branch 'wip-1728-fjpool-√' of github.com:jboner/akka into wip-1728-fjpool-√

This commit is contained in:
Viktor Klang 2012-01-31 10:14:39 +01:00
commit 30c45c4a8d
10 changed files with 97 additions and 392 deletions

View file

@ -1,169 +0,0 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughput10000PerformanceSpec extends PerformanceSpec {
import TellThroughput10000PerformanceSpec._
val repeat = 30000L * repeatFactor
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
"perform with load 12" in {
runScenario(12)
}
"perform with load 14" in {
runScenario(14)
}
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val dispatcherKey = "benchmark.high-throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(dispatcherKey))
val clients = for ((dest, j) destinations.zipWithIndex)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(dispatcherKey))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat)
}
clients.foreach(system.stop(_))
destinations.foreach(system.stop(_))
}
}
}
}
object TellThroughput10000PerformanceSpec {
case object Run
case object Msg
class Destination extends Actor {
def receive = {
case Msg sender ! Msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg
received += 1
if (sent < repeat) {
actor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
for (i 0L until math.min(20000L, repeat)) {
actor ! Msg
sent += 1
}
}
}
}

View file

@ -100,15 +100,14 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val clientDispatcher = "benchmark.client-dispatcher"
val destinationDispatcher = "benchmark.destination-dispatcher"
val throughputDispatcher = "benchmark.throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher))
yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher))
val clients = for (dest destinations)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher))
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)

View file

@ -16,10 +16,10 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
runScenario(8, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
runScenario(8, warmup = true)
}
"perform with load 1" in {
runScenario(1)
@ -48,19 +48,66 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val clientDispatcher = "benchmark.client-dispatcher"
val destinationDispatcher = "benchmark.destination-dispatcher"
val throughputDispatcher = "benchmark.throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher))
yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher))
val clients = for (dest destinations)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher))
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)

View file

@ -1,171 +0,0 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughputPinnedDispatchersPerformanceSpec extends PerformanceSpec {
import TellThroughputPinnedDispatchersPerformanceSpec._
val repeat = 30000L * repeatFactor
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
"perform with load 12" in {
runScenario(12)
}
"perform with load 14" in {
runScenario(14)
}
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val pinnedDispatcher = "benchmark.pinned-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(pinnedDispatcher))
val clients = for ((dest, j) destinations.zipWithIndex)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(pinnedDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat)
}
clients.foreach(system.stop(_))
destinations.foreach(system.stop(_))
}
}
}
}
object TellThroughputPinnedDispatchersPerformanceSpec {
case object Run
case object Msg
class Destination extends Actor {
def receive = {
case Msg sender ! Msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg
received += 1
if (sent < repeat) {
actor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
for (i 0L until math.min(1000L, repeat)) {
actor ! Msg
sent += 1
}
}
}
}

View file

@ -84,7 +84,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val clientDispatcher = "benchmark.client-dispatcher"
val latencyDispatcher = "benchmark.trading-dispatcher"
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
@ -93,7 +93,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(clientDispatcher)
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(latencyDispatcher)
system.actorOf(props)
})

View file

@ -39,11 +39,9 @@ class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem {
val orDispatcher = orderReceiverDispatcher
val meDispatcher = matchingEngineDispatcher
// by default we use default-dispatcher
def orderReceiverDispatcher: Option[String] = None
def orderReceiverDispatcher: Option[String] = Some("benchmark.trading-dispatcher")
// by default we use default-dispatcher
def matchingEngineDispatcher: Option[String] = None
def matchingEngineDispatcher: Option[String] = Some("benchmark.trading-dispatcher")
override val orderbooksGroupedByMatchingEngine: List[List[Orderbook]] =
for (groupOfSymbols: List[String] OrderbookRepository.orderbookSymbolsGroupedByMatchingEngine)

View file

@ -81,7 +81,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val clientDispatcher = "benchmark.client-dispatcher"
val throughputDispatcher = "benchmark.trading-dispatcher"
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
@ -90,7 +90,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(clientDispatcher)
val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(throughputDispatcher)
system.actorOf(props)
})

View file

@ -20,41 +20,30 @@ object BenchmarkConfig {
resultDir = "target/benchmark"
useDummyOrderbook = false
client-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
throughput-dispatcher {
throughput = 5
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
}
destination-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
high-throughput-dispatcher {
throughput = 10000
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
pinned-dispatcher {
type = PinnedDispatcher
}
latency-dispatcher {
throughput = 1
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
}
trading-dispatcher {
throughput = 5
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
}
}
@ -62,8 +51,9 @@ object BenchmarkConfig {
private val longRunningBenchmarkConfig = ConfigFactory.parseString("""
benchmark {
longRunning = true
minClients = 4
maxClients = 48
repeatFactor = 150
repeatFactor = 2000
maxRunDuration = 120 seconds
useDummyOrderbook = true
}

View file

@ -31,7 +31,8 @@ abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends Akk
def compareResultWith: Option[String] = None
def acceptClients(numberOfClients: Int): Boolean = {
(minClients <= numberOfClients && numberOfClients <= maxClients)
(minClients <= numberOfClients && numberOfClients <= maxClients &&
(maxClients <= 16 || numberOfClients % 4 == 0))
}
def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) {

View file

@ -5,10 +5,20 @@
package akka.dispatch
import java.util.Collection
import java.util.concurrent.atomic.AtomicLong
import akka.util.Duration
import java.util.concurrent._
import akka.jsr166y._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
object ThreadPoolConfig {
type QueueFactory = () BlockingQueue[Runnable]