diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala deleted file mode 100644 index 1ef92549c2..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala +++ /dev/null @@ -1,169 +0,0 @@ -package akka.performance.microbench - -import akka.performance.workbench.PerformanceSpec -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import akka.actor._ -import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } -import akka.dispatch._ -import java.util.concurrent.ThreadPoolExecutor.AbortPolicy -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue -import akka.util.Duration -import akka.util.duration._ - -// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TellThroughput10000PerformanceSpec extends PerformanceSpec { - import TellThroughput10000PerformanceSpec._ - - val repeat = 30000L * repeatFactor - - "Tell" must { - "warmup" in { - runScenario(4, warmup = true) - } - "warmup more" in { - runScenario(4, warmup = true) - } - "perform with load 1" in { - runScenario(1) - } - "perform with load 2" in { - runScenario(2) - } - "perform with load 4" in { - runScenario(4) - } - "perform with load 6" in { - runScenario(6) - } - "perform with load 8" in { - runScenario(8) - } - "perform with load 10" in { - runScenario(10) - } - "perform with load 12" in { - runScenario(12) - } - "perform with load 14" in { - runScenario(14) - } - "perform with load 16" in { - runScenario(16) - } - "perform with load 18" in { - runScenario(18) - } - "perform with load 20" in { - runScenario(20) - } - "perform with load 22" in { - runScenario(22) - } - "perform with load 24" in { - runScenario(24) - } - "perform with load 26" in { - runScenario(26) - } - "perform with load 28" in { - runScenario(28) - } - "perform with load 30" in { - runScenario(30) - } - "perform with load 32" in { - runScenario(32) - } - "perform with load 34" in { - runScenario(34) - } - "perform with load 36" in { - runScenario(36) - } - "perform with load 38" in { - runScenario(38) - } - "perform with load 40" in { - runScenario(40) - } - "perform with load 42" in { - runScenario(42) - } - "perform with load 44" in { - runScenario(44) - } - "perform with load 46" in { - runScenario(46) - } - "perform with load 48" in { - runScenario(48) - } - - def runScenario(numberOfClients: Int, warmup: Boolean = false) { - if (acceptClients(numberOfClients)) { - - val dispatcherKey = "benchmark.high-throughput-dispatcher" - val latch = new CountDownLatch(numberOfClients) - val repeatsPerClient = repeat / numberOfClients - val destinations = for (i ← 0 until numberOfClients) - yield system.actorOf(Props(new Destination).withDispatcher(dispatcherKey)) - val clients = for ((dest, j) ← destinations.zipWithIndex) - yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(dispatcherKey)) - - val start = System.nanoTime - clients.foreach(_ ! Run) - val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS) - val durationNs = (System.nanoTime - start) - - if (!warmup) { - ok must be(true) - logMeasurement(numberOfClients, durationNs, repeat) - } - clients.foreach(system.stop(_)) - destinations.foreach(system.stop(_)) - - } - } - } -} - -object TellThroughput10000PerformanceSpec { - - case object Run - case object Msg - - class Destination extends Actor { - def receive = { - case Msg ⇒ sender ! Msg - } - } - - class Client( - actor: ActorRef, - latch: CountDownLatch, - repeat: Long) extends Actor { - - var sent = 0L - var received = 0L - - def receive = { - case Msg ⇒ - received += 1 - if (sent < repeat) { - actor ! Msg - sent += 1 - } else if (received >= repeat) { - latch.countDown() - } - case Run ⇒ - for (i ← 0L until math.min(20000L, repeat)) { - actor ! Msg - sent += 1 - } - } - - } - -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala index 0b47a1f722..4bee0c8655 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala @@ -100,15 +100,14 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec { def runScenario(numberOfClients: Int, warmup: Boolean = false) { if (acceptClients(numberOfClients)) { - val clientDispatcher = "benchmark.client-dispatcher" - val destinationDispatcher = "benchmark.destination-dispatcher" + val throughputDispatcher = "benchmark.throughput-dispatcher" val latch = new CountDownLatch(numberOfClients) val repeatsPerClient = repeat / numberOfClients val destinations = for (i ← 0 until numberOfClients) - yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher)) + yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher)) val clients = for (dest ← destinations) - yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher)) + yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher)) val start = System.nanoTime clients.foreach(_ ! Run) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 552dbf62e9..f028fec6b0 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -16,10 +16,10 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { "Tell" must { "warmup" in { - runScenario(4, warmup = true) + runScenario(8, warmup = true) } "warmup more" in { - runScenario(4, warmup = true) + runScenario(8, warmup = true) } "perform with load 1" in { runScenario(1) @@ -48,19 +48,66 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { "perform with load 16" in { runScenario(16) } + "perform with load 18" in { + runScenario(18) + } + "perform with load 20" in { + runScenario(20) + } + "perform with load 22" in { + runScenario(22) + } + "perform with load 24" in { + runScenario(24) + } + "perform with load 26" in { + runScenario(26) + } + "perform with load 28" in { + runScenario(28) + } + "perform with load 30" in { + runScenario(30) + } + "perform with load 32" in { + runScenario(32) + } + "perform with load 34" in { + runScenario(34) + } + "perform with load 36" in { + runScenario(36) + } + "perform with load 38" in { + runScenario(38) + } + "perform with load 40" in { + runScenario(40) + } + "perform with load 42" in { + runScenario(42) + } + "perform with load 44" in { + runScenario(44) + } + "perform with load 46" in { + runScenario(46) + } + "perform with load 48" in { + runScenario(48) + } def runScenario(numberOfClients: Int, warmup: Boolean = false) { if (acceptClients(numberOfClients)) { - val clientDispatcher = "benchmark.client-dispatcher" - val destinationDispatcher = "benchmark.destination-dispatcher" + val throughputDispatcher = "benchmark.throughput-dispatcher" val latch = new CountDownLatch(numberOfClients) val repeatsPerClient = repeat / numberOfClients val destinations = for (i ← 0 until numberOfClients) - yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher)) + yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher)) val clients = for (dest ← destinations) - yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher)) + yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher)) val start = System.nanoTime clients.foreach(_ ! Run) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPinnedDispatchersPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPinnedDispatchersPerformanceSpec.scala deleted file mode 100644 index 4d9ad3eef1..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPinnedDispatchersPerformanceSpec.scala +++ /dev/null @@ -1,171 +0,0 @@ -package akka.performance.microbench - -import akka.performance.workbench.PerformanceSpec -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import akka.actor._ -import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } -import akka.dispatch._ -import java.util.concurrent.ThreadPoolExecutor.AbortPolicy -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue -import akka.util.Duration -import akka.util.duration._ - -// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TellThroughputPinnedDispatchersPerformanceSpec extends PerformanceSpec { - import TellThroughputPinnedDispatchersPerformanceSpec._ - - val repeat = 30000L * repeatFactor - - "Tell" must { - "warmup" in { - runScenario(4, warmup = true) - } - "warmup more" in { - runScenario(4, warmup = true) - } - "perform with load 1" in { - runScenario(1) - } - "perform with load 2" in { - runScenario(2) - } - "perform with load 4" in { - runScenario(4) - } - "perform with load 6" in { - runScenario(6) - } - "perform with load 8" in { - runScenario(8) - } - "perform with load 10" in { - runScenario(10) - } - "perform with load 12" in { - runScenario(12) - } - "perform with load 14" in { - runScenario(14) - } - "perform with load 16" in { - runScenario(16) - } - "perform with load 18" in { - runScenario(18) - } - "perform with load 20" in { - runScenario(20) - } - "perform with load 22" in { - runScenario(22) - } - "perform with load 24" in { - runScenario(24) - } - "perform with load 26" in { - runScenario(26) - } - "perform with load 28" in { - runScenario(28) - } - "perform with load 30" in { - runScenario(30) - } - "perform with load 32" in { - runScenario(32) - } - "perform with load 34" in { - runScenario(34) - } - "perform with load 36" in { - runScenario(36) - } - "perform with load 38" in { - runScenario(38) - } - "perform with load 40" in { - runScenario(40) - } - "perform with load 42" in { - runScenario(42) - } - "perform with load 44" in { - runScenario(44) - } - "perform with load 46" in { - runScenario(46) - } - "perform with load 48" in { - runScenario(48) - } - - def runScenario(numberOfClients: Int, warmup: Boolean = false) { - if (acceptClients(numberOfClients)) { - - val pinnedDispatcher = "benchmark.pinned-dispatcher" - - val latch = new CountDownLatch(numberOfClients) - val repeatsPerClient = repeat / numberOfClients - - val destinations = for (i ← 0 until numberOfClients) - yield system.actorOf(Props(new Destination).withDispatcher(pinnedDispatcher)) - val clients = for ((dest, j) ← destinations.zipWithIndex) - yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(pinnedDispatcher)) - - val start = System.nanoTime - clients.foreach(_ ! Run) - val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS) - val durationNs = (System.nanoTime - start) - - if (!warmup) { - ok must be(true) - logMeasurement(numberOfClients, durationNs, repeat) - } - clients.foreach(system.stop(_)) - destinations.foreach(system.stop(_)) - - } - } - } -} - -object TellThroughputPinnedDispatchersPerformanceSpec { - - case object Run - case object Msg - - class Destination extends Actor { - def receive = { - case Msg ⇒ sender ! Msg - } - } - - class Client( - actor: ActorRef, - latch: CountDownLatch, - repeat: Long) extends Actor { - - var sent = 0L - var received = 0L - - def receive = { - case Msg ⇒ - received += 1 - if (sent < repeat) { - actor ! Msg - sent += 1 - } else if (received >= repeat) { - latch.countDown() - } - case Run ⇒ - for (i ← 0L until math.min(1000L, repeat)) { - actor ! Msg - sent += 1 - } - } - - } - -} diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala index 9ba77e71e8..58b2e7e315 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala @@ -84,7 +84,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec { } yield Bid(s + i, 100 - i, 1000) val orders = askOrders.zip(bidOrders).map(x ⇒ Seq(x._1, x._2)).flatten - val clientDispatcher = "benchmark.client-dispatcher" + val latencyDispatcher = "benchmark.trading-dispatcher" val ordersPerClient = repeat * orders.size / numberOfClients val totalNumberOfOrders = ordersPerClient * numberOfClients @@ -93,7 +93,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec { val start = System.nanoTime val clients = (for (i ← 0 until numberOfClients) yield { val receiver = receivers(i % receivers.size) - val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(clientDispatcher) + val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(latencyDispatcher) system.actorOf(props) }) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala index 7fe2783a9a..1adb2ecbc7 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingSystem.scala @@ -39,11 +39,9 @@ class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem { val orDispatcher = orderReceiverDispatcher val meDispatcher = matchingEngineDispatcher - // by default we use default-dispatcher - def orderReceiverDispatcher: Option[String] = None + def orderReceiverDispatcher: Option[String] = Some("benchmark.trading-dispatcher") - // by default we use default-dispatcher - def matchingEngineDispatcher: Option[String] = None + def matchingEngineDispatcher: Option[String] = Some("benchmark.trading-dispatcher") override val orderbooksGroupedByMatchingEngine: List[List[Orderbook]] = for (groupOfSymbols: List[String] ← OrderbookRepository.orderbookSymbolsGroupedByMatchingEngine) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala index 7092f87666..a1033d7682 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala @@ -81,7 +81,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec { } yield Bid(s + i, 100 - i, 1000) val orders = askOrders.zip(bidOrders).map(x ⇒ Seq(x._1, x._2)).flatten - val clientDispatcher = "benchmark.client-dispatcher" + val throughputDispatcher = "benchmark.trading-dispatcher" val ordersPerClient = repeat * orders.size / numberOfClients val totalNumberOfOrders = ordersPerClient * numberOfClients @@ -90,7 +90,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec { val start = System.nanoTime val clients = (for (i ← 0 until numberOfClients) yield { val receiver = receivers(i % receivers.size) - val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(clientDispatcher) + val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(throughputDispatcher) system.actorOf(props) }) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala index 65294d014a..e31e667678 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala @@ -20,41 +20,30 @@ object BenchmarkConfig { resultDir = "target/benchmark" useDummyOrderbook = false - client-dispatcher { - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + throughput-dispatcher { + throughput = 5 + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = ${benchmark.maxClients} + parallelism-max = ${benchmark.maxClients} } } - destination-dispatcher { - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} - } - } - - high-throughput-dispatcher { - throughput = 10000 - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} - } - } - - pinned-dispatcher { - type = PinnedDispatcher - } - latency-dispatcher { throughput = 1 - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = ${benchmark.maxClients} + parallelism-max = ${benchmark.maxClients} + } + } + + trading-dispatcher { + throughput = 5 + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = ${benchmark.maxClients} + parallelism-max = ${benchmark.maxClients} } } } @@ -62,8 +51,9 @@ object BenchmarkConfig { private val longRunningBenchmarkConfig = ConfigFactory.parseString(""" benchmark { longRunning = true + minClients = 4 maxClients = 48 - repeatFactor = 150 + repeatFactor = 2000 maxRunDuration = 120 seconds useDummyOrderbook = true } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala index 3d27f8a303..ca6e42d67f 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala @@ -31,7 +31,8 @@ abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends Akk def compareResultWith: Option[String] = None def acceptClients(numberOfClients: Int): Boolean = { - (minClients <= numberOfClients && numberOfClients <= maxClients) + (minClients <= numberOfClients && numberOfClients <= maxClients && + (maxClients <= 16 || numberOfClients % 4 == 0)) } def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 4612fdca1f..5be5f1b0e1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -5,10 +5,20 @@ package akka.dispatch import java.util.Collection -import java.util.concurrent.atomic.AtomicLong import akka.util.Duration -import java.util.concurrent._ import akka.jsr166y._ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.RejectedExecutionHandler +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.ThreadFactory +import java.util.concurrent.ThreadPoolExecutor object ThreadPoolConfig { type QueueFactory = () ⇒ BlockingQueue[Runnable]