From df260f8939422385ec2bb7d52d9d399819a44fc6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 21 Dec 2011 19:02:06 +0100 Subject: [PATCH] Improvements based on feedback. See #1458 --- .../akka/actor/dispatch/ActorModelSpec.scala | 47 +++++------ .../akka/actor/dispatch/DispatchersSpec.scala | 19 +++-- .../CallingThreadDispatcherModelSpec.scala | 14 ++-- .../src/main/scala/akka/actor/ActorCell.scala | 4 +- .../main/scala/akka/actor/ActorSystem.scala | 3 +- .../src/main/scala/akka/actor/Props.scala | 10 +-- .../akka/dispatch/AbstractDispatcher.scala | 7 +- .../akka/dispatch/BalancingDispatcher.scala | 4 +- .../main/scala/akka/dispatch/Dispatcher.scala | 2 +- .../scala/akka/dispatch/Dispatchers.scala | 83 ++++++++++--------- .../akka/dispatch/PinnedDispatcher.scala | 4 +- .../src/main/scala/akka/routing/Pool.scala | 2 +- .../akka/docs/testkit/TestkitDocSpec.scala | 2 +- .../actor/mailbox/DurableMailboxSpec.scala | 2 +- .../testkit/CallingThreadDispatcher.scala | 4 +- .../scala/akka/testkit/TestActorRef.scala | 2 +- .../src/main/scala/akka/testkit/TestKit.scala | 2 +- .../test/scala/akka/testkit/AkkaSpec.scala | 5 +- 18 files changed, 113 insertions(+), 103 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index e80c2e29bf..3536dcadcc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -18,6 +18,7 @@ import akka.actor.ActorSystem import akka.util.duration._ import akka.event.Logging.Error import com.typesafe.config.Config +import java.util.concurrent.atomic.AtomicInteger object ActorModelSpec { @@ -239,7 +240,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "must dynamically handle its own life cycle" in { implicit val dispatcher = registerInterceptedDispatcher() assertDispatcher(dispatcher)(stops = 0) - val a = newTestActor(dispatcher.key) + val a = newTestActor(dispatcher.id) assertDispatcher(dispatcher)(stops = 0) system.stop(a) assertDispatcher(dispatcher)(stops = 1) @@ -257,7 +258,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } assertDispatcher(dispatcher)(stops = 2) - val a2 = newTestActor(dispatcher.key) + val a2 = newTestActor(dispatcher.id) val futures2 = for (i ← 1 to 10) yield Future { i } assertDispatcher(dispatcher)(stops = 2) @@ -269,7 +270,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "process messages one at a time" in { implicit val dispatcher = registerInterceptedDispatcher() val start, oneAtATime = new CountDownLatch(1) - val a = newTestActor(dispatcher.key) + val a = newTestActor(dispatcher.id) a ! CountDown(start) assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") @@ -288,7 +289,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "handle queueing from multiple threads" in { implicit val dispatcher = registerInterceptedDispatcher() val counter = new CountDownLatch(200) - val a = newTestActor(dispatcher.key) + val a = newTestActor(dispatcher.id) for (i ← 1 to 10) { spawn { @@ -318,7 +319,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "not process messages for a suspended actor" in { implicit val dispatcher = registerInterceptedDispatcher() - val a = newTestActor(dispatcher.key).asInstanceOf[LocalActorRef] + val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef] val done = new CountDownLatch(1) a.suspend a ! CountDown(done) @@ -337,7 +338,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "handle waves of actors" in { val dispatcher = registerInterceptedDispatcher() - val props = Props[DispatcherActor].withDispatcher(dispatcher.key) + val props = Props[DispatcherActor].withDispatcher(dispatcher.id) def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) @@ -384,7 +385,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa filterEvents(EventFilter[InterruptedException](), EventFilter[akka.event.Logging.EventHandlerException]()) { implicit val dispatcher = registerInterceptedDispatcher() implicit val timeout = Timeout(5 seconds) - val a = newTestActor(dispatcher.key) + val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(ActorInterruptedException(ie)) } @@ -404,7 +405,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "continue to process messages when exception is thrown" in { filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) { implicit val dispatcher = registerInterceptedDispatcher() - val a = newTestActor(dispatcher.key) + val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") val f3 = a ? ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException")) @@ -438,24 +439,23 @@ object DispatcherModelSpec { class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) { import ActorModelSpec._ - var dispatcherCount = 0 + val dispatcherCount = new AtomicInteger() override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new key for each invocation, since the MessageDispatcherInterceptor holds state - dispatcherCount += 1 - val key = "dispatcher-" + dispatcherCount + // use new id for each invocation, since the MessageDispatcherInterceptor holds state + val id = "dispatcher-" + dispatcherCount.incrementAndGet() val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatcherFactory.prerequisites) { val instance = { ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(system.dispatcherFactory.prerequisites, key, key, system.settings.DispatcherThroughput, + new Dispatcher(system.dispatcherFactory.prerequisites, id, id, system.settings.DispatcherThroughput, system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor, ThreadPoolConfig()).build } override def dispatcher(): MessageDispatcher = instance } - system.dispatcherFactory.register(key, dispatcherConfigurator) - system.dispatcherFactory.lookup(key).asInstanceOf[MessageDispatcherInterceptor] + system.dispatcherFactory.register(id, dispatcherConfigurator) + system.dispatcherFactory.lookup(id).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Dispatcher" @@ -464,7 +464,7 @@ class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) { "process messages in parallel" in { implicit val dispatcher = registerInterceptedDispatcher() val aStart, aStop, bParallel = new CountDownLatch(1) - val a, b = newTestActor(dispatcher.key) + val a, b = newTestActor(dispatcher.id) a ! Meet(aStart, aStop) assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") @@ -500,16 +500,15 @@ object BalancingDispatcherModelSpec { class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherModelSpec.config) { import ActorModelSpec._ - var dispatcherCount = 0 + val dispatcherCount = new AtomicInteger() override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new key for each invocation, since the MessageDispatcherInterceptor holds state - dispatcherCount += 1 - val key = "dispatcher-" + dispatcherCount + // use new id for each invocation, since the MessageDispatcherInterceptor holds state + val id = "dispatcher-" + dispatcherCount.incrementAndGet() val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatcherFactory.prerequisites) { val instance = { ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(system.dispatcherFactory.prerequisites, key, key, 1, // TODO check why 1 here? (came from old test) + new BalancingDispatcher(system.dispatcherFactory.prerequisites, id, id, 1, // TODO check why 1 here? (came from old test) system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor, ThreadPoolConfig()).build @@ -517,8 +516,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherMod override def dispatcher(): MessageDispatcher = instance } - system.dispatcherFactory.register(key, dispatcherConfigurator) - system.dispatcherFactory.lookup(key).asInstanceOf[MessageDispatcherInterceptor] + system.dispatcherFactory.register(id, dispatcherConfigurator) + system.dispatcherFactory.lookup(id).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Balancing Dispatcher" @@ -527,7 +526,7 @@ class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherMod "process messages in parallel" in { implicit val dispatcher = registerInterceptedDispatcher() val aStart, aStop, bParallel = new CountDownLatch(1) - val a, b = newTestActor(dispatcher.key) + val a, b = newTestActor(dispatcher.id) a ! Meet(aStart, aStop) assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 1670c8a4a9..3cc8c275e3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -32,6 +32,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { val maxpoolsizefactor = "max-pool-size-factor" val allowcoretimeout = "allow-core-timeout" val throughput = "throughput" + val id = "id" def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher def ofType[T <: MessageDispatcher: Manifest]: (MessageDispatcher) ⇒ Boolean = _.getClass == manifest[T].erasure @@ -46,7 +47,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher") lazy val allDispatchers: Map[String, MessageDispatcher] = { - validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t, "key" -> t).asJava). + validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t, id -> t).asJava). withFallback(defaultDispatcherConfig)))).toMap } @@ -62,19 +63,25 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { dispatcher.name must be("mydispatcher") } - "use specific key" in { + "use specific id" in { val dispatcher = lookup("myapp.mydispatcher") - dispatcher.key must be("myapp.mydispatcher") + dispatcher.id must be("myapp.mydispatcher") } - "use default dispatcher" in { + "use default dispatcher for missing config" in { val dispatcher = lookup("myapp.other-dispatcher") dispatcher must be === defaultGlobalDispatcher } + "have only one default dispatcher" in { + val dispatcher = lookup(Dispatchers.DefaultDispatcherId) + dispatcher must be === defaultGlobalDispatcher + dispatcher must be === system.dispatcher + } + "throw IllegalArgumentException if type does not exist" in { intercept[IllegalArgumentException] { - from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", "key" -> "invalid-dispatcher").asJava). + from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava). withFallback(defaultDispatcherConfig)) } } @@ -84,7 +91,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1)))) } - "provide lookup of dispatchers by key" in { + "provide lookup of dispatchers by id" in { val d1 = lookup("myapp.mydispatcher") val d2 = lookup("myapp.mydispatcher") d1 must be === d2 diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index e1580e1fc6..14c23d432a 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -5,6 +5,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger import org.junit.{ After, Test } import com.typesafe.config.Config import akka.dispatch.DispatcherPrerequisites @@ -23,20 +24,19 @@ object CallingThreadDispatcherModelSpec { class CallingThreadDispatcherModelSpec extends ActorModelSpec(CallingThreadDispatcherModelSpec.config) { import ActorModelSpec._ - var dispatcherCount = 0 + val dispatcherCount = new AtomicInteger() override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new key for each invocation, since the MessageDispatcherInterceptor holds state - dispatcherCount += 1 - val confKey = "test-calling-thread" + dispatcherCount + // use new id for each invocation, since the MessageDispatcherInterceptor holds state + val dispatcherId = "test-calling-thread" + dispatcherCount.incrementAndGet() val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatcherFactory.defaultDispatcherConfig, system.dispatcherFactory.prerequisites) { val instance = new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { - override def key: String = confKey + override def id: String = dispatcherId } override def dispatcher(): MessageDispatcher = instance } - system.dispatcherFactory.register(confKey, dispatcherConfigurator) - system.dispatcherFactory.lookup(confKey).asInstanceOf[MessageDispatcherInterceptor] + system.dispatcherFactory.register(dispatcherId, dispatcherConfigurator) + system.dispatcherFactory.lookup(dispatcherId).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Calling Thread Dispatcher" diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index e803df806f..4ce727c420 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -258,9 +258,7 @@ private[akka] class ActorCell( } @inline - final def dispatcher: MessageDispatcher = - if (props.dispatcher == Props.defaultDispatcherKey) system.dispatcher - else system.dispatcherFactory.lookup(props.dispatcher) + final def dispatcher: MessageDispatcher = system.dispatcherFactory.lookup(props.dispatcher) /** * UntypedActorContext impl diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 9472f83f48..cbc3c47f09 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -269,8 +269,7 @@ abstract class ActorSystem extends ActorRefFactory { //#scheduler /** - * Helper object for creating new dispatchers and passing in all required - * information. + * Helper object for looking up configured dispatchers. */ def dispatcherFactory: Dispatchers diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 701919f3cd..e7eba69bdf 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -21,7 +21,7 @@ object Props { import FaultHandlingStrategy._ final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") - final val defaultDispatcherKey: String = null + final val defaultDispatcherId: String = null final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop @@ -125,7 +125,7 @@ object Props { */ case class Props( creator: () ⇒ Actor = Props.defaultCreator, - dispatcher: String = Props.defaultDispatcherKey, + dispatcher: String = Props.defaultDispatcherId, timeout: Timeout = Props.defaultTimeout, faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, routerConfig: RouterConfig = Props.defaultRoutedProps) { @@ -135,7 +135,7 @@ case class Props( */ def this() = this( creator = Props.defaultCreator, - dispatcher = Props.defaultDispatcherKey, + dispatcher = Props.defaultDispatcherId, timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler) @@ -144,7 +144,7 @@ case class Props( */ def this(factory: UntypedActorFactory) = this( creator = () ⇒ factory.create(), - dispatcher = Props.defaultDispatcherKey, + dispatcher = Props.defaultDispatcherId, timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler) @@ -153,7 +153,7 @@ case class Props( */ def this(actorClass: Class[_ <: Actor]) = this( creator = () ⇒ actorClass.newInstance, - dispatcher = Props.defaultDispatcherKey, + dispatcher = Props.defaultDispatcherId, timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler, routerConfig = Props.defaultRoutedProps) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 5ac8b30422..d6addcb144 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -102,9 +102,10 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def name: String /** - * Configuration key of this dispatcher + * Identfier of this dispatcher, corresponds to the full key + * of the dispatcher configuration. */ - def key: String + def id: String /** * Attaches the specified actor instance to this dispatcher @@ -274,6 +275,8 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit /** * Returns an instance of MessageDispatcher given the configuration. + * Depending on the needs the implementation may return a new instance for + * each invocation or return the same instance every time. */ def dispatcher(): MessageDispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 6152d627d9..78308d46ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -32,13 +32,13 @@ import akka.util.Duration class BalancingDispatcher( _prerequisites: DispatcherPrerequisites, _name: String, - _key: String, + _id: String, throughput: Int, throughputDeadlineTime: Duration, mailboxType: MailboxType, config: ThreadPoolConfig, _shutdownTimeout: Duration) - extends Dispatcher(_prerequisites, _name, _key, throughput, throughputDeadlineTime, mailboxType, config, _shutdownTimeout) { + extends Dispatcher(_prerequisites, _name, _id, throughput, throughputDeadlineTime, mailboxType, config, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) val rebalance = new AtomicBoolean(false) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 2c1d9f7dfc..cbe77a0328 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -63,7 +63,7 @@ import java.util.concurrent._ class Dispatcher( _prerequisites: DispatcherPrerequisites, val name: String, - val key: String, + val id: String, val throughput: Int, val throughputDeadlineTime: Duration, val mailboxType: MailboxType, diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 1f7185c923..c5d9831096 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -16,8 +16,8 @@ import akka.actor.ActorSystem.Settings import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.config.ConfigurationException -import akka.event.Logging -import akka.event.Logging.Debug +import akka.event.Logging.Warning +import akka.actor.Props trait DispatcherPrerequisites { def eventStream: EventStream @@ -30,6 +30,10 @@ case class DefaultDispatcherPrerequisites( val deadLetterMailbox: Mailbox, val scheduler: Scheduler) extends DispatcherPrerequisites +object Dispatchers { + final val DefaultDispatcherId = "akka.actor.default-dispatcher" +} + /** * It is recommended to define the dispatcher in configuration to allow for tuning * for different environments. Use the `lookup` method to create @@ -64,19 +68,12 @@ case class DefaultDispatcherPrerequisites( */ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites) { - val MailboxType: MailboxType = - if (settings.MailboxCapacity < 1) UnboundedMailbox() - else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout) + import Dispatchers._ - val defaultDispatcherConfig = { - val key = "akka.actor.default-dispatcher" - keyConfig(key).withFallback(settings.config.getConfig(key)) - } + val defaultDispatcherConfig: Config = + idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId)) - private lazy val defaultDispatcherConfigurator: MessageDispatcherConfigurator = - configuratorFrom(defaultDispatcherConfig) - - lazy val defaultGlobalDispatcher: MessageDispatcher = defaultDispatcherConfigurator.dispatcher() + def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId) // FIXME: Dispatchers registered here are are not removed, see ticket #1494 private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] @@ -86,50 +83,52 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * the default dispatcher. The same dispatcher instance is returned for subsequent * lookups. */ - def lookup(key: String): MessageDispatcher = { - val configurator = dispatcherConfigurators.get(key) match { + def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() + + private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { + val lookupId = if (id == Props.defaultDispatcherId) DefaultDispatcherId else id + dispatcherConfigurators.get(lookupId) match { case null ⇒ // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. - // That shouldn't happen often and in case it does the actual dispatcher isn't + // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. val newConfigurator = - if (settings.config.hasPath(key)) { - configuratorFrom(config(key)) + if (settings.config.hasPath(lookupId)) { + configuratorFrom(config(lookupId)) } else { - // FIXME Remove println - println("#### Dispatcher [%s] not configured, using default-dispatcher".format(key)) - prerequisites.eventStream.publish(Debug("Dispatchers", - "Dispatcher [%s] not configured, using default-dispatcher".format(key))) - defaultDispatcherConfigurator + // Note that the configurator of the default dispatcher will be registered for this id, + // so this will only be logged once, which is crucial. + prerequisites.eventStream.publish(Warning("Dispatchers", + "Dispatcher [%s] not configured, using default-dispatcher".format(lookupId))) + lookupConfigurator(DefaultDispatcherId) } - dispatcherConfigurators.putIfAbsent(key, newConfigurator) match { + dispatcherConfigurators.putIfAbsent(lookupId, newConfigurator) match { case null ⇒ newConfigurator case existing ⇒ existing } case existing ⇒ existing } - configurator.dispatcher() } // FIXME #1458: Not sure if we should have this, but needed it temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec - def register(key: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = { - dispatcherConfigurators.putIfAbsent(key, dispatcherConfigurator) + def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = { + dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator) } - private def config(key: String): Config = { + private def config(id: String): Config = { import scala.collection.JavaConverters._ - def simpleName = key.substring(key.lastIndexOf('.') + 1) - keyConfig(key) - .withFallback(settings.config.getConfig(key)) + def simpleName = id.substring(id.lastIndexOf('.') + 1) + idConfig(id) + .withFallback(settings.config.getConfig(id)) .withFallback(ConfigFactory.parseMap(Map("name" -> simpleName).asJava)) .withFallback(defaultDispatcherConfig) } - private def keyConfig(key: String): Config = { + private def idConfig(id: String): Config = { import scala.collection.JavaConverters._ - ConfigFactory.parseMap(Map("key" -> key).asJava) + ConfigFactory.parseMap(Map("id" -> id).asJava) } // FIXME #1458: Remove these newDispatcher methods, but still need them temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec @@ -161,6 +160,10 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) + val MailboxType: MailboxType = + if (settings.MailboxCapacity < 1) UnboundedMailbox() + else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout) + /* * Creates of obtains a dispatcher from a Config according to the format below. * @@ -175,9 +178,9 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * allow-core-timeout = on # Allow core threads to time out * throughput = 5 # Throughput for Dispatcher * } - * ex: from(config.getConfig(key)) + * ex: from(config.getConfig(id)) * - * The Config must also contain a `key` property, which is the identifying key of the dispatcher. + * The Config must also contain a `id` property, which is the identifier of the dispatcher. * * Throws: IllegalArgumentException if the value of "type" is not valid * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator @@ -187,7 +190,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { - if (!cfg.hasPath("key")) throw new IllegalArgumentException("Missing dispatcher 'key' property in config: " + cfg.root.render) + if (!cfg.hasPath("id")) throw new IllegalArgumentException("Missing dispatcher 'id' property in config: " + cfg.root.render) cfg.getString("type") match { case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites) @@ -202,7 +205,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + "make sure it has constructor with [com.typesafe.config.Config] and " + "[akka.dispatch.DispatcherPrerequisites] parameters") - .format(fqn, cfg.getString("key")), exception) + .format(fqn, cfg.getString("id")), exception) } } } @@ -215,7 +218,7 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(prerequisites, config.getString("name"), - config.getString("key"), + config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, @@ -235,7 +238,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher(prerequisites, config.getString("name"), - config.getString("key"), + config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, @@ -254,7 +257,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer * Creates new dispatcher for each invocation. */ override def dispatcher(): MessageDispatcher = - new PinnedDispatcher(prerequisites, null, config.getString("name"), config.getString("key"), mailboxType, + new PinnedDispatcher(prerequisites, null, config.getString("name"), config.getString("id"), mailboxType, Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) } diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index b3406b3d81..4915afe8a2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -19,12 +19,12 @@ class PinnedDispatcher( _prerequisites: DispatcherPrerequisites, _actor: ActorCell, _name: String, - _key: String, + _id: String, _mailboxType: MailboxType, _shutdownTimeout: Duration) extends Dispatcher(_prerequisites, _name, - _key, + _id, Int.MaxValue, Duration.Zero, _mailboxType, diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 90a2cd6a9a..138be5e902 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -93,7 +93,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ protected[akka] var _delegates = Vector[ActorRef]() - val defaultProps: Props = Props.default.withDispatcher(this.context.dispatcher.key) + val defaultProps: Props = Props.default.withDispatcher(this.context.dispatcher.id) override def preStart() { resizeIfAppropriate() diff --git a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala index fcd2b3cdd3..07c785df0c 100644 --- a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala @@ -227,7 +227,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { "demonstrate " in { //#calling-thread-dispatcher import akka.testkit.CallingThreadDispatcher - val ref = system.actorOf(Props[MyActor].withDispatcher(CallingThreadDispatcher.ConfigKey)) + val ref = system.actorOf(Props[MyActor].withDispatcher(CallingThreadDispatcher.Id)) //#calling-thread-dispatcher } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 54629e6321..9603f72b39 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -26,7 +26,7 @@ object DurableMailboxSpecActorFactory { /** * Subclass must define dispatcher in the supplied config for the specific backend. - * The key of the dispatcher must be the same as the `-dispatcher`. + * The id of the dispatcher must be the same as the `-dispatcher`. */ abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) with BeforeAndAfterEach { import DurableMailboxSpecActorFactory._ diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index b68a0a4051..784bb6f184 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -94,7 +94,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { } object CallingThreadDispatcher { - val ConfigKey = "akka.test.calling-thread-dispatcher" + val Id = "akka.test.calling-thread-dispatcher" } /** @@ -129,7 +129,7 @@ class CallingThreadDispatcher( val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher") - def key: String = ConfigKey + override def id: String = Id protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index cb134d4ac2..c0ed2383d6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -30,7 +30,7 @@ class TestActorRef[T <: Actor]( name: String) extends LocalActorRef( _system, - _props.withDispatcher(CallingThreadDispatcher.ConfigKey), + _props.withDispatcher(CallingThreadDispatcher.Id), _supervisor, _supervisor.path / name, false) { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 4a021ed329..69411f23ea 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -104,7 +104,7 @@ class TestKit(_system: ActorSystem) { lazy val testActor: ActorRef = { val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? impl.systemActorOf(Props(new TestActor(queue)) - .withDispatcher(CallingThreadDispatcher.ConfigKey), + .withDispatcher(CallingThreadDispatcher.Id), "testActor" + TestKit.testActorId.incrementAndGet) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index b2713e6577..b98937b126 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -16,6 +16,7 @@ import akka.actor.CreateChild import akka.actor.DeadLetter import java.util.concurrent.TimeoutException import akka.dispatch.{ Await, MessageDispatcher } +import akka.dispatch.Dispatchers object TimingTest extends Tag("timing") @@ -74,8 +75,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atTermination() {} - def spawn(dispatcherKey: String = system.dispatcherFactory.defaultGlobalDispatcher.key)(body: ⇒ Unit) { - system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcherKey)) ! "go" + def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) { + system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcherId)) ! "go" } }