Merge branch 'master' into wip-fix-futures-√

This commit is contained in:
Viktor Klang 2012-01-31 16:10:14 +01:00
commit 8cd033bb17
40 changed files with 5042 additions and 645 deletions

View file

@ -3,53 +3,30 @@
*/
package akka.actor
import org.scalatest.BeforeAndAfterAll
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException
import akka.testkit._
import akka.dispatch.Await
import akka.util.Timeout
import akka.pattern.{ ask, AskTimeoutException }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
class ActorTimeoutSpec extends AkkaSpec {
val defaultTimeout = system.settings.ActorTimeout.duration
val testTimeout = if (system.settings.ActorTimeout.duration < 400.millis) 500 millis else 100 millis
val testTimeout = 200.millis.dilated
"An Actor-based Future" must {
"use the global default timeout if no implicit in scope" in {
within(defaultTimeout - 100.millis, defaultTimeout + 400.millis) {
val echo = system.actorOf(Props.empty)
try {
val d = system.settings.ActorTimeout.duration
val f = echo ? "hallo"
intercept[AskTimeoutException] { Await.result(f, d + d) }
} finally { system.stop(echo) }
}
}
"use implicitly supplied timeout" in {
implicit val timeout = Timeout(testTimeout)
within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = system.actorOf(Props.empty)
try {
val f = (echo ? "hallo").mapTo[String]
intercept[AskTimeoutException] { Await.result(f, testTimeout + testTimeout) }
} finally { system.stop(echo) }
}
val f = (echo ? "hallo")
intercept[AskTimeoutException] { Await.result(f, testTimeout * 2) }
}
"use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = system.actorOf(Props.empty)
val f = echo.?("hallo")(testTimeout)
try {
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }
} finally { system.stop(echo) }
}
intercept[AskTimeoutException] { Await.result(f, testTimeout * 2) }
}
}
}

View file

@ -9,6 +9,8 @@ object ConsistencySpec {
consistency-dispatcher {
throughput = 1
keep-alive-time = 1 ms
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
max-pool-size-min = 10
@ -16,6 +18,7 @@ object ConsistencySpec {
task-queue-type = array
task-queue-size = 7
}
}
"""
class CacheMisaligned(var value: Long, var padding1: Long, var padding2: Long, var padding3: Int) //Vars, no final fences

View file

@ -14,11 +14,14 @@ object LocalActorRefProviderSpec {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 16
core-pool-size-max = 16
}
}
}
}
"""
}

View file

@ -25,11 +25,14 @@ object TypedActorSpec {
val config = """
pooled-dispatcher {
type = BalancingDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 60
core-pool-size-max = 60
max-pool-size-min = 60
max-pool-size-max = 60
}
}
"""
class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] {

View file

@ -448,16 +448,14 @@ object DispatcherModelSpec {
class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = {
configureThreadPool(config,
threadPoolConfig new Dispatcher(prerequisites,
private val instance: MessageDispatcher =
new Dispatcher(prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build
}
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
override def dispatcher(): MessageDispatcher = instance
}
@ -522,16 +520,14 @@ object BalancingDispatcherModelSpec {
class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = {
configureThreadPool(config,
threadPoolConfig new BalancingDispatcher(prerequisites,
private val instance: MessageDispatcher =
new BalancingDispatcher(prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build
}
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
override def dispatcher(): MessageDispatcher = instance
}

View file

@ -16,15 +16,21 @@ object DispatcherActorSpec {
}
test-throughput-dispatcher {
throughput = 101
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
}
}
test-throughput-deadline-dispatcher {
throughput = 2
throughput-deadline-time = 100 milliseconds
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
}
}
"""
class TestActor extends Actor {

View file

@ -18,27 +18,14 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
val settings = system.settings
val config = settings.config
{
import config._
getString("akka.version") must equal("2.0-SNAPSHOT")
settings.ConfigVersion must equal("2.0-SNAPSHOT")
getBoolean("akka.daemonic") must equal(false)
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000)
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0)
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(3.0)
getInt("akka.actor.default-dispatcher.task-queue-size") must equal(-1)
getString("akka.actor.default-dispatcher.task-queue-type") must equal("linked")
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getString("akka.actor.default-dispatcher.mailboxType") must be("")
getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout") must equal(1 * 1000)
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)
getBoolean("akka.actor.serialize-messages") must equal(false)
settings.SerializeAllMessages must equal(false)
@ -48,5 +35,45 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getMilliseconds("akka.scheduler.tickDuration") must equal(100)
settings.SchedulerTickDuration must equal(100 millis)
}
{
val c = config.getConfig("akka.actor.default-dispatcher")
//General dispatcher config
{
c.getString("type") must equal("Dispatcher")
c.getString("executor") must equal("fork-join-executor")
c.getInt("mailbox-capacity") must equal(-1)
c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000)
c.getString("mailboxType") must be("")
c.getMilliseconds("shutdown-timeout") must equal(1 * 1000)
c.getInt("throughput") must equal(5)
c.getMilliseconds("throughput-deadline-time") must equal(0)
}
//Fork join executor config
{
val pool = c.getConfig("fork-join-executor")
pool.getInt("parallelism-min") must equal(8)
pool.getDouble("parallelism-factor") must equal(3.0)
pool.getInt("parallelism-max") must equal(64)
}
//Thread pool executor config
{
val pool = c.getConfig("thread-pool-executor")
import pool._
getMilliseconds("keep-alive-time") must equal(60 * 1000)
getDouble("core-pool-size-factor") must equal(3.0)
getDouble("max-pool-size-factor") must equal(3.0)
getInt("task-queue-size") must equal(-1)
getString("task-queue-type") must equal("linked")
getBoolean("allow-core-timeout") must equal(true)
}
}
}
}
}

View file

@ -1,169 +0,0 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughput10000PerformanceSpec extends PerformanceSpec {
import TellThroughput10000PerformanceSpec._
val repeat = 30000L * repeatFactor
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
"perform with load 12" in {
runScenario(12)
}
"perform with load 14" in {
runScenario(14)
}
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val dispatcherKey = "benchmark.high-throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(dispatcherKey))
val clients = for ((dest, j) destinations.zipWithIndex)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(dispatcherKey))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat)
}
clients.foreach(system.stop(_))
destinations.foreach(system.stop(_))
}
}
}
}
object TellThroughput10000PerformanceSpec {
case object Run
case object Msg
class Destination extends Actor {
def receive = {
case Msg sender ! Msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg
received += 1
if (sent < repeat) {
actor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
for (i 0L until math.min(20000L, repeat)) {
actor ! Msg
sent += 1
}
}
}
}

View file

@ -100,15 +100,14 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val clientDispatcher = "benchmark.client-dispatcher"
val destinationDispatcher = "benchmark.destination-dispatcher"
val throughputDispatcher = "benchmark.throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher))
yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher))
val clients = for (dest destinations)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher))
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)

View file

@ -16,10 +16,10 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
runScenario(8, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
runScenario(8, warmup = true)
}
"perform with load 1" in {
runScenario(1)
@ -48,19 +48,66 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val clientDispatcher = "benchmark.client-dispatcher"
val destinationDispatcher = "benchmark.destination-dispatcher"
val throughputDispatcher = "benchmark.throughput-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(destinationDispatcher))
yield system.actorOf(Props(new Destination).withDispatcher(throughputDispatcher))
val clients = for (dest destinations)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(clientDispatcher))
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(throughputDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)

View file

@ -1,171 +0,0 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughputPinnedDispatchersPerformanceSpec extends PerformanceSpec {
import TellThroughputPinnedDispatchersPerformanceSpec._
val repeat = 30000L * repeatFactor
"Tell" must {
"warmup" in {
runScenario(4, warmup = true)
}
"warmup more" in {
runScenario(4, warmup = true)
}
"perform with load 1" in {
runScenario(1)
}
"perform with load 2" in {
runScenario(2)
}
"perform with load 4" in {
runScenario(4)
}
"perform with load 6" in {
runScenario(6)
}
"perform with load 8" in {
runScenario(8)
}
"perform with load 10" in {
runScenario(10)
}
"perform with load 12" in {
runScenario(12)
}
"perform with load 14" in {
runScenario(14)
}
"perform with load 16" in {
runScenario(16)
}
"perform with load 18" in {
runScenario(18)
}
"perform with load 20" in {
runScenario(20)
}
"perform with load 22" in {
runScenario(22)
}
"perform with load 24" in {
runScenario(24)
}
"perform with load 26" in {
runScenario(26)
}
"perform with load 28" in {
runScenario(28)
}
"perform with load 30" in {
runScenario(30)
}
"perform with load 32" in {
runScenario(32)
}
"perform with load 34" in {
runScenario(34)
}
"perform with load 36" in {
runScenario(36)
}
"perform with load 38" in {
runScenario(38)
}
"perform with load 40" in {
runScenario(40)
}
"perform with load 42" in {
runScenario(42)
}
"perform with load 44" in {
runScenario(44)
}
"perform with load 46" in {
runScenario(46)
}
"perform with load 48" in {
runScenario(48)
}
def runScenario(numberOfClients: Int, warmup: Boolean = false) {
if (acceptClients(numberOfClients)) {
val pinnedDispatcher = "benchmark.pinned-dispatcher"
val latch = new CountDownLatch(numberOfClients)
val repeatsPerClient = repeat / numberOfClients
val destinations = for (i 0 until numberOfClients)
yield system.actorOf(Props(new Destination).withDispatcher(pinnedDispatcher))
val clients = for ((dest, j) destinations.zipWithIndex)
yield system.actorOf(Props(new Client(dest, latch, repeatsPerClient)).withDispatcher(pinnedDispatcher))
val start = System.nanoTime
clients.foreach(_ ! Run)
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
val durationNs = (System.nanoTime - start)
if (!warmup) {
ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat)
}
clients.foreach(system.stop(_))
destinations.foreach(system.stop(_))
}
}
}
}
object TellThroughputPinnedDispatchersPerformanceSpec {
case object Run
case object Msg
class Destination extends Actor {
def receive = {
case Msg sender ! Msg
}
}
class Client(
actor: ActorRef,
latch: CountDownLatch,
repeat: Long) extends Actor {
var sent = 0L
var received = 0L
def receive = {
case Msg
received += 1
if (sent < repeat) {
actor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
}
case Run
for (i 0L until math.min(1000L, repeat)) {
actor ! Msg
sent += 1
}
}
}
}

View file

@ -84,7 +84,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val clientDispatcher = "benchmark.client-dispatcher"
val latencyDispatcher = "benchmark.trading-dispatcher"
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
@ -93,7 +93,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(clientDispatcher)
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(latencyDispatcher)
system.actorOf(props)
})

View file

@ -39,11 +39,9 @@ class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem {
val orDispatcher = orderReceiverDispatcher
val meDispatcher = matchingEngineDispatcher
// by default we use default-dispatcher
def orderReceiverDispatcher: Option[String] = None
def orderReceiverDispatcher: Option[String] = Some("benchmark.trading-dispatcher")
// by default we use default-dispatcher
def matchingEngineDispatcher: Option[String] = None
def matchingEngineDispatcher: Option[String] = Some("benchmark.trading-dispatcher")
override val orderbooksGroupedByMatchingEngine: List[List[Orderbook]] =
for (groupOfSymbols: List[String] OrderbookRepository.orderbookSymbolsGroupedByMatchingEngine)

View file

@ -81,7 +81,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
} yield Bid(s + i, 100 - i, 1000)
val orders = askOrders.zip(bidOrders).map(x Seq(x._1, x._2)).flatten
val clientDispatcher = "benchmark.client-dispatcher"
val throughputDispatcher = "benchmark.trading-dispatcher"
val ordersPerClient = repeat * orders.size / numberOfClients
val totalNumberOfOrders = ordersPerClient * numberOfClients
@ -90,7 +90,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
val start = System.nanoTime
val clients = (for (i 0 until numberOfClients) yield {
val receiver = receivers(i % receivers.size)
val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(clientDispatcher)
val props = Props(new Client(receiver, orders, latch, ordersPerClient)).withDispatcher(throughputDispatcher)
system.actorOf(props)
})

View file

@ -20,38 +20,40 @@ object BenchmarkConfig {
resultDir = "target/benchmark"
useDummyOrderbook = false
client-dispatcher {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
throughput-dispatcher {
throughput = 5
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
destination-dispatcher {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
high-throughput-dispatcher {
throughput = 10000
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
pinned-dispatcher {
type = PinnedDispatcher
}
latency-dispatcher {
throughput = 1
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
}
trading-dispatcher {
throughput = 5
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = ${benchmark.maxClients}
parallelism-max = ${benchmark.maxClients}
}
}
}
""")
private val longRunningBenchmarkConfig = ConfigFactory.parseString("""
benchmark {
longRunning = true
minClients = 4
maxClients = 48
repeatFactor = 150
repeatFactor = 2000
maxRunDuration = 120 seconds
useDummyOrderbook = true
}

View file

@ -31,7 +31,8 @@ abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends Akk
def compareResultWith: Option[String] = None
def acceptClients(numberOfClients: Int): Boolean = {
(minClients <= numberOfClients && numberOfClients <= maxClients)
(minClients <= numberOfClients && numberOfClients <= maxClients &&
(maxClients <= 16 || numberOfClients % 4 == 0))
}
def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) {

View file

@ -13,11 +13,14 @@ object ConfiguredLocalRoutingSpec {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-max = 16
}
}
}
}
"""
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,119 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.jsr166y;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
* in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*/
final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
final ForkJoinPool pool; // the pool this thread works in
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super(pool.nextWorkerName());
setDaemon(true);
Thread.UncaughtExceptionHandler ueh = pool.ueh;
if (ueh != null)
setUncaughtExceptionHandler(ueh);
this.pool = pool;
this.workQueue = new ForkJoinPool.WorkQueue(this, pool.localMode);
pool.registerWorker(this);
}
/**
* Returns the pool hosting this thread.
*
* @return the pool
*/
public ForkJoinPool getPool() {
return pool;
}
/**
* Returns the index number of this thread in its pool. The
* returned value ranges from zero to the maximum number of
* threads (minus one) that have ever been created in the pool.
* This method may be useful for applications that track status or
* collect results per-worker rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
return workQueue.poolIndex;
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
}
/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.runWorker(this);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}

View file

@ -0,0 +1,164 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.jsr166y;
/**
* A recursive resultless {@link ForkJoinTask}. This class
* establishes conventions to parameterize resultless actions as
* {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
* only valid value of type {@code Void}, methods such as {@code join}
* always return {@code null} upon completion.
*
* <p><b>Sample Usages.</b> Here is a simple but complete ForkJoin
* sort that sorts a given {@code long[]} array:
*
* <pre> {@code
* static class SortTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* SortTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* SortTask(long[] array) { this(array, 0, array.length); }
* protected void compute() {
* if (hi - lo < THRESHOLD)
* sortSequentially(lo, hi);
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new SortTask(array, lo, mid),
* new SortTask(array, mid, hi));
* merge(lo, mid, hi);
* }
* }
* // implementation details follow:
* final static int THRESHOLD = 1000;
* void sortSequentially(int lo, int hi) {
* Arrays.sort(array, lo, hi);
* }
* void merge(int lo, int mid, int hi) {
* long[] buf = Arrays.copyOfRange(array, lo, mid);
* for (int i = 0, j = lo, k = mid; i < buf.length; j++)
* array[j] = (k == hi || buf[i] < array[k]) ?
* buf[i++] : array[k++];
* }
* }}</pre>
*
* You could then sort {@code anArray} by creating {@code new
* SortTask(anArray)} and invoking it in a ForkJoinPool. As a more
* concrete simple example, the following task increments each element
* of an array:
* <pre> {@code
* class IncrementTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* IncrementTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* protected void compute() {
* if (hi - lo < THRESHOLD) {
* for (int i = lo; i < hi; ++i)
* array[i]++;
* }
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new IncrementTask(array, lo, mid),
* new IncrementTask(array, mid, hi));
* }
* }
* }}</pre>
*
* <p>The following example illustrates some refinements and idioms
* that may lead to better performance: RecursiveActions need not be
* fully recursive, so long as they maintain the basic
* divide-and-conquer approach. Here is a class that sums the squares
* of each element of a double array, by subdividing out only the
* right-hand-sides of repeated divisions by two, and keeping track of
* them with a chain of {@code next} references. It uses a dynamic
* threshold based on method {@code getSurplusQueuedTaskCount}, but
* counterbalances potential excess partitioning by directly
* performing leaf actions on unstolen tasks rather than further
* subdividing.
*
* <pre> {@code
* double sumOfSquares(ForkJoinPool pool, double[] array) {
* int n = array.length;
* Applyer a = new Applyer(array, 0, n, null);
* pool.invoke(a);
* return a.result;
* }
*
* class Applyer extends RecursiveAction {
* final double[] array;
* final int lo, hi;
* double result;
* Applyer next; // keeps track of right-hand-side tasks
* Applyer(double[] array, int lo, int hi, Applyer next) {
* this.array = array; this.lo = lo; this.hi = hi;
* this.next = next;
* }
*
* double atLeaf(int l, int h) {
* double sum = 0;
* for (int i = l; i < h; ++i) // perform leftmost base step
* sum += array[i] * array[i];
* return sum;
* }
*
* protected void compute() {
* int l = lo;
* int h = hi;
* Applyer right = null;
* while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
* int mid = (l + h) >>> 1;
* right = new Applyer(array, mid, h, right);
* right.fork();
* h = mid;
* }
* double sum = atLeaf(l, h);
* while (right != null) {
* if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi);
* else {
* right.join();
* sum += right.result;
* }
* right = right.next;
* }
* result = sum;
* }
* }}</pre>
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }
/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}

View file

@ -0,0 +1,68 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.jsr166y;
/**
* A recursive result-bearing {@link ForkJoinTask}.
*
* <p>For a classic example, here is a task computing Fibonacci numbers:
*
* <pre> {@code
* class Fibonacci extends RecursiveTask<Integer> {
* final int n;
* Fibonacci(int n) { this.n = n; }
* Integer compute() {
* if (n <= 1)
* return n;
* Fibonacci f1 = new Fibonacci(n - 1);
* f1.fork();
* Fibonacci f2 = new Fibonacci(n - 2);
* return f2.compute() + f1.join();
* }
* }}</pre>
*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}

View file

@ -158,16 +158,38 @@ akka {
# parameters
type = "Dispatcher"
# Which kind of ExecutorService to use for this dispatcher
# Valid options:
# "fork-join-executor" requires a "fork-join-executor" section
# "thread-pool-executor" requires a "thread-pool-executor" section
# or
# A FQCN of a class extending ExecutorServiceConfigurator
executor = "fork-join-executor"
# This will be used if you have set "executor = "fork-join-executor""
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 3.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# This will be used if you have set "executor = "thread-pool-executor""
thread-pool-executor {
# Keep alive time for threads
keep-alive-time = 60s
# minimum number of threads to cap factor-based core number to
# Min number of threads to cap factor-based core number to
core-pool-size-min = 8
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 3.0
# maximum number of threads to cap factor-based number to
# Max number of threads to cap factor-based number to
core-pool-size-max = 64
# Hint: max-pool-size is only used for bounded task queues
@ -177,7 +199,7 @@ akka {
# Max no of threads ... ceil(available processors * factor)
max-pool-size-factor = 3.0
# maximum number of threads to cap factor-based max number to
# Max number of threads to cap factor-based max number to
max-pool-size-max = 64
# Specifies the bounded capacity of the task queue (< 1 == unbounded)
@ -189,6 +211,7 @@ akka {
# Allow core threads to time out
allow-core-timeout = on
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s

View file

@ -7,6 +7,7 @@ package akka.actor
import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import scala.collection.immutable.Stack
import java.util.regex.Pattern
/**
@ -112,6 +113,7 @@ object Actor {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("Empty behavior apply()")
}
}
/**
@ -172,7 +174,7 @@ trait Actor {
type Receive = Actor.Receive
/**
* Stores the context for this actor, including self, sender, and hotswap.
* Stores the context for this actor, including self, and sender.
* It is implicit to support operations such as `forward`.
*
* [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
@ -281,15 +283,37 @@ trait Actor {
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
/**
* For Akka internal use only.
*/
private[akka] final def apply(msg: Any) = {
val behaviorStack = context.asInstanceOf[ActorCell].hotswap
msg match {
case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) behaviorStack.head.apply(msg)
case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) processingBehavior.apply(msg)
case unknown unhandled(unknown)
}
// TODO would it be more efficient to assume that most messages are matched and catch MatchError instead of using isDefinedAt?
val head = behaviorStack.head
if (head.isDefinedAt(msg)) head.apply(msg) else unhandled(msg)
}
private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior
/**
* For Akka internal use only.
*/
private[akka] def pushBehavior(behavior: Receive): Unit = {
behaviorStack = behaviorStack.push(behavior)
}
/**
* For Akka internal use only.
*/
private[akka] def popBehavior(): Unit = {
val original = behaviorStack
val popped = original.pop
behaviorStack = if (popped.isEmpty) original else popped
}
/**
* For Akka internal use only.
*/
private[akka] def clearBehaviorStack(): Unit =
behaviorStack = Stack.empty[Receive].push(behaviorStack.last)
private var behaviorStack: Stack[Receive] = Stack.empty[Receive].push(receive)
}

View file

@ -174,8 +174,7 @@ private[akka] class ActorCell(
val self: InternalActorRef,
val props: Props,
@volatile var parent: InternalActorRef,
/*no member*/ _receiveTimeout: Option[Duration],
var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext {
/*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext {
import ActorCell._
@ -209,10 +208,10 @@ private[akka] class ActorCell(
/**
* In milliseconds
*/
final var receiveTimeoutData: (Long, Cancellable) =
var receiveTimeoutData: (Long, Cancellable) =
if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData
final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
private def _actorOf(props: Props, name: String): ActorRef = {
if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
@ -255,16 +254,16 @@ private[akka] class ActorCell(
a.stop()
}
final var currentMessage: Envelope = null
var currentMessage: Envelope = null
final var actor: Actor = _
var actor: Actor = _
final var stopping = false
var stopping = false
@volatile //This must be volatile since it isn't protected by the mailbox status
var mailbox: Mailbox = _
final var nextNameSequence: Long = 0
var nextNameSequence: Long = 0
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell
final protected def randomName(): String = {
@ -389,7 +388,6 @@ private[akka] class ActorCell(
}
}
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
hotswap = Props.noHotSwap // Reset the behavior
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
@ -509,9 +507,9 @@ private[akka] class ActorCell(
}
}
def become(behavior: Actor.Receive, discardOld: Boolean = true) {
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = {
if (discardOld) unbecome()
hotswap = hotswap.push(behavior)
actor.pushBehavior(behavior)
}
/**
@ -527,10 +525,7 @@ private[akka] class ActorCell(
become(newReceive, discardOld)
}
def unbecome() {
val h = hotswap
if (h.nonEmpty) hotswap = h.pop
}
def unbecome(): Unit = actor.popBehavior()
def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive)
@ -547,9 +542,9 @@ private[akka] class ActorCell(
}
private def doTerminate() {
try {
try {
val a = actor
try {
try {
if (a ne null) a.postStop()
} finally {
dispatcher.detach(this)
@ -563,7 +558,7 @@ private[akka] class ActorCell(
} finally {
currentMessage = null
clearActorFields()
hotswap = Props.noHotSwap
if (a ne null) a.clearBehaviorStack()
}
}
}

View file

@ -224,8 +224,7 @@ private[akka] class LocalActorRef private[akka] (
_supervisor: InternalActorRef,
val path: ActorPath,
val systemService: Boolean = false,
_receiveTimeout: Option[Duration] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
_receiveTimeout: Option[Duration] = None)
extends InternalActorRef with LocalRef {
/*
@ -238,7 +237,7 @@ private[akka] class LocalActorRef private[akka] (
* us to use purely factory methods for creating LocalActorRefs.
*/
@volatile
private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout, _hotswap)
private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout)
actorCell.start()
protected def newActorCell(
@ -246,9 +245,8 @@ private[akka] class LocalActorRef private[akka] (
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef,
receiveTimeout: Option[Duration],
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap)
receiveTimeout: Option[Duration]): ActorCell =
new ActorCell(system, ref, props, supervisor, receiveTimeout)
protected def actorContext: ActorContext = actorCell

View file

@ -22,7 +22,6 @@ object Props {
final val defaultRoutedProps: RouterConfig = NoRouter
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
final val empty = new Props(() new Actor { def receive = Actor.emptyBehavior })
/**

View file

@ -14,6 +14,7 @@ import akka.event.EventStream
import com.typesafe.config.Config
import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
import akka.jsr166y.ForkJoinPool
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
@ -292,6 +293,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
protected[akka] def shutdown(): Unit
}
abstract class ExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceFactoryProvider
/**
* Base class to be used for hooking in new dispatchers into Dispatchers.
*/
@ -333,14 +336,30 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
}
}
def configureThreadPool(
config: Config,
createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
def configureExecutor(): ExecutorServiceConfigurator = {
config.getString("executor") match {
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn
val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites), prerequisites.classloader) match {
case Right(instance) instance
case Left(exception) throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}
}
}
}
//Apply the following options to the config if they are present in the config
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
import ThreadPoolConfigBuilder.conf_?
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig())
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
@ -356,4 +375,27 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case _ None
})(queueFactory _.setQueueFactory(queueFactory)))
}
def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
threadPoolConfig.createExecutorServiceFactory(name, threadFactory)
}
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct
case x throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
}
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true)
}
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ForkJoinExecutorServiceFactory(
validate(threadFactory),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")))
}

View file

@ -31,9 +31,9 @@ class BalancingDispatcher(
throughput: Int,
throughputDeadlineTime: Duration,
mailboxType: MailboxType,
config: ThreadPoolConfig,
_executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
_shutdownTimeout: Duration)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, config, _shutdownTimeout) {
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val rebalance = new AtomicBoolean(false)

View file

@ -158,15 +158,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance =
configureThreadPool(config,
threadPoolConfig new Dispatcher(prerequisites,
private val instance = new Dispatcher(
prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))).build
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
/**
* Returns the same dispatcher instance for each invocation
@ -182,14 +181,13 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance =
configureThreadPool(config,
threadPoolConfig new BalancingDispatcher(prerequisites,
private val instance = new BalancingDispatcher(
prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))).build
mailboxType, configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
/**
* Returns the same dispatcher instance for each invocation
@ -204,13 +202,23 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP
*/
class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
val threadPoolConfig: ThreadPoolConfig = configureExecutor() match {
case e: ThreadPoolExecutorConfigurator e.threadPoolConfig
case other
prerequisites.eventStream.publish(
Warning("PinnedDispatcherConfigurator",
this.getClass,
"PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
config.getString("id"))))
ThreadPoolConfig()
}
/**
* Creates new dispatcher for each invocation.
*/
override def dispatcher(): MessageDispatcher = configureThreadPool(config,
threadPoolConfig
new PinnedDispatcher(prerequisites, null, config.getString("id"), mailboxType,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
threadPoolConfig)).build
override def dispatcher(): MessageDispatcher =
new PinnedDispatcher(
prerequisites, null, config.getString("id"), mailboxType,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)
}

View file

@ -5,9 +5,20 @@
package akka.dispatch
import java.util.Collection
import java.util.concurrent.atomic.AtomicLong
import akka.util.Duration
import java.util.concurrent._
import akka.jsr166y._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
object ThreadPoolConfig {
type QueueFactory = () BlockingQueue[Runnable]
@ -86,70 +97,65 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
new ThreadPoolExecutorServiceFactory(threadFactory)
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
object ThreadPoolConfigDispatcherBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigDispatcherBuilder ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ThreadPoolConfigDispatcherBuilder] = opt map fun
object ThreadPoolConfigBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigBuilder ThreadPoolConfigBuilder): Option[(ThreadPoolConfigBuilder) ThreadPoolConfigBuilder] = opt map fun
}
/**
* A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor
*/
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
import ThreadPoolConfig._
def build: MessageDispatcher = dispatcherFactory(config)
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue()))
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity)))
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair)))
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
def setCorePoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setMaxPoolSize(scaledPoolSize(min, multiplier, max))
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigBuilder =
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
def setKeepAliveTime(time: Duration): ThreadPoolConfigBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
}
object MonitorableThreadFactory {
@ -161,11 +167,14 @@ case class MonitorableThreadFactory(name: String,
daemonic: Boolean,
contextClassLoader: Option[ClassLoader],
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
extends ThreadFactory {
extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
protected val counter = new AtomicLong
def newThread(runnable: Runnable) = {
val t = new Thread(runnable, name + counter.incrementAndGet())
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + counter.incrementAndGet()))
protected def wire[T <: Thread](t: T): T = {
t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic)
contextClassLoader foreach (t.setContextClassLoader(_))

View file

@ -1,34 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import akka.event.Logging.Error
import java.lang.management.ManagementFactory
import javax.management.{ ObjectInstance, ObjectName, InstanceAlreadyExistsException, InstanceNotFoundException }
import akka.actor.ActorSystem
object JMX {
private val mbeanServer = ManagementFactory.getPlatformMBeanServer
def nameFor(hostname: String, service: String, bean: String): ObjectName =
new ObjectName("akka.%s:type=%s,name=%s".format(hostname, service, bean.replace(":", "_")))
def register(name: ObjectName, mbean: AnyRef)(implicit system: ActorSystem): Option[ObjectInstance] = try {
Some(mbeanServer.registerMBean(mbean, name))
} catch {
case e: InstanceAlreadyExistsException
Some(mbeanServer.getObjectInstance(name))
case e: Exception
system.eventStream.publish(Error(e, "JMX", this.getClass, "Error when registering mbean [%s]".format(mbean)))
None
}
def unregister(mbean: ObjectName)(implicit system: ActorSystem) = try {
mbeanServer.unregisterMBean(mbean)
} catch {
case e: InstanceNotFoundException {}
case e: Exception system.eventStream.publish(Error(e, "JMX", this.getClass, "Error while unregistering mbean [%s]".format(mbean)))
}
}

View file

@ -54,6 +54,12 @@ Loading from Configuration
To be able to load extensions from your Akka configuration you must add FQCNs of implementations of either ``ExtensionId`` or ``ExtensionIdProvider``
in the "akka.extensions" section of the config you provide to your ``ActorSystem``.
::
akka {
extensions = ["akka.docs.extension.ExtensionDocTestBase.CountExtension"]
}
Applicability
=============

View file

@ -22,12 +22,17 @@ object DispatcherDocSpec {
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the number of messages that are processed in a batch before the
# thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 100
@ -37,8 +42,11 @@ object DispatcherDocSpec {
//#my-bounded-config
my-dispatcher-bounded-queue {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 8.0
max-pool-size-factor = 16.0
}
# Specifies the bounded capacity of the mailbox queue
mailbox-capacity = 100
throughput = 3
@ -48,6 +56,11 @@ object DispatcherDocSpec {
//#my-balancing-config
my-balancing-dispatcher {
type = BalancingDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 8.0
max-pool-size-factor = 16.0
}
}
//#my-balancing-config

View file

@ -41,6 +41,15 @@ object CountExtension
//#extensionid
object ExtensionDocSpec {
val config = """
//#config
akka {
extensions = ["akka.docs.extension.CountExtension$"]
}
//#config
"""
//#extension-usage-actor
class MyActor extends Actor {
@ -64,7 +73,7 @@ object ExtensionDocSpec {
//#extension-usage-actor-trait
}
class ExtensionDocSpec extends AkkaSpec {
class ExtensionDocSpec extends AkkaSpec(ExtensionDocSpec.config) {
import ExtensionDocSpec._
"demonstrate how to create an extension in Scala" in {
@ -73,4 +82,10 @@ class ExtensionDocSpec extends AkkaSpec {
//#extension-usage
}
"demonstrate how to lookup a configured extension in Scala" in {
//#extension-lookup
system.extension(CountExtension)
//#extension-lookup
}
}

View file

@ -48,6 +48,11 @@ Loading from Configuration
To be able to load extensions from your Akka configuration you must add FQCNs of implementations of either ``ExtensionId`` or ``ExtensionIdProvider``
in the ``akka.extensions`` section of the config you provide to your ``ActorSystem``.
.. includecode:: code/akka/docs/extension/ExtensionDocSpec.scala
:include: config
Note that in this case ``CountExtension`` is an object and therefore the class name ends with ``$``.
Applicability
=============

View file

@ -41,9 +41,8 @@ class TestActorRef[T <: Actor](
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef,
receiveTimeout: Option[Duration],
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap) {
receiveTimeout: Option[Duration]): ActorCell =
new ActorCell(system, ref, props, supervisor, receiveTimeout) {
override def autoReceiveMessage(msg: Envelope) {
msg.message match {
case InternalGetActor sender ! actor

View file

@ -29,6 +29,8 @@ object AkkaSpec {
stdout-loglevel = "WARNING"
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 2
core-pool-size-min = 8
core-pool-size-max = 8
@ -38,6 +40,7 @@ object AkkaSpec {
}
}
}
}
""")
def mapToConfig(map: Map[String, Any]): Config = {

View file

@ -20,11 +20,14 @@ object CoordinatedIncrement {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 16
}
}
}
}
"""
case class Increment(friends: Seq[ActorRef])

View file

@ -9,11 +9,7 @@ import akka.util.duration._
import akka.actor.{ Cancellable, Actor, Props, ActorRef }
object ConcurrentSocketActorSpec {
val config = """
akka {
extensions = []
}
"""
val config = ""
}
class ConcurrentSocketActorSpec
@ -23,7 +19,7 @@ class ConcurrentSocketActorSpec
val endpoint = "tcp://127.0.0.1:%s" format { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() }
def zmq = system.extension(ZeroMQExtension)
def zmq = ZeroMQExtension(system)
"ConcurrentSocketActor" should {
"support pub-sub connections" in {