diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala index 1466c9e969..a38f98c8f7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala @@ -75,5 +75,23 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri "not fail verification with a ConfigurationException if also configured with a Router" in { system.actorOf(Props[TestActor].withDispatcher("pinned-dispatcher").withRouter(RoundRobinRouter(2))) } + + "fail verification if the dispatcher cannot be found" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("does not exist")) + } + } + + "fail verification if the dispatcher cannot be found for the head of a router" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(1, routerDispatcher = "does not exist"))) + } + } + + "fail verification if the dispatcher cannot be found for the routees of a router" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("does not exist").withRouter(RoundRobinRouter(1))) + } + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index b4ec85b44f..4ced9cd03c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -3,18 +3,15 @@ */ package akka.actor.dispatch -import language.postfixOps -import java.util.concurrent.{ CountDownLatch, TimeUnit } +import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.reflect.ClassTag -import akka.dispatch._ -import akka.testkit.AkkaSpec -import akka.testkit.ImplicitSender -import scala.collection.JavaConverters._ + import com.typesafe.config.ConfigFactory -import akka.actor.Actor -import akka.actor.Props -import scala.concurrent.duration._ -import akka.actor.ActorRef + +import akka.ConfigurationException +import akka.actor.{ Actor, ActorRef, Props } +import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher } +import akka.testkit.{ AkkaSpec, ImplicitSender } object DispatchersSpec { val config = """ @@ -101,9 +98,10 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend dispatcher.id must be("myapp.mydispatcher") } - "use default dispatcher for missing config" in { - val dispatcher = lookup("myapp.other-dispatcher") - dispatcher must be === defaultGlobalDispatcher + "complain about missing config" in { + intercept[ConfigurationException] { + lookup("myapp.other-dispatcher") + } } "have only one default dispatcher" in { @@ -112,8 +110,8 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend dispatcher must be === system.dispatcher } - "throw IllegalArgumentException if type does not exist" in { - intercept[IllegalArgumentException] { + "throw ConfigurationException if type does not exist" in { + intercept[ConfigurationException] { from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava). withFallback(defaultDispatcherConfig)) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index b222112717..62a1735893 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -16,6 +16,7 @@ import scala.util.{ Success, Failure } import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.annotation.implicitNotFound +import akka.ConfigurationException /** * Interface for all ActorRef providers to implement. @@ -708,6 +709,9 @@ private[akka] class LocalActorRefProvider private[akka] ( } else props + if (!system.dispatchers.hasDispatcher(props2.dispatcher)) + throw new ConfigurationException(s"Dispatcher [${props2.dispatcher}] not configured for path $path") + if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async) else new LocalActorRef(system, props2, supervisor, path) @@ -715,7 +719,12 @@ private[akka] class LocalActorRefProvider private[akka] ( val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) - new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async) + val p = props.withRouter(d.routerConfig) + if (!system.dispatchers.hasDispatcher(p.dispatcher)) + throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for routees of $path") + if (!system.dispatchers.hasDispatcher(d.routerConfig.routerDispatcher)) + throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for router of $path") + new RoutedActorRef(system, p, supervisor, path).initialize(async) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index bca09ce56f..57b626a119 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -10,6 +10,7 @@ import akka.actor.{ Scheduler, DynamicAccess, ActorSystem } import akka.event.Logging.Warning import akka.event.EventStream import scala.concurrent.duration.Duration +import akka.ConfigurationException /** * DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher @@ -65,12 +66,21 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] /** - * Returns a dispatcher as specified in configuration, or if not defined it uses - * the default dispatcher. Please note that this method _may_ create and return a NEW dispatcher, - * _every_ call. + * Returns a dispatcher as specified in configuration. Please note that this + * method _may_ create and return a NEW dispatcher, _every_ call. + * + * @throws ConfigurationException if the specified dispatcher cannot be found in the configuration */ def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() + /** + * Checks that the configuration provides a section for the given dispatcher. + * This does not guarantee that no ConfigurationException will be thrown when + * using this dispatcher, because the details can only be checked by trying + * to instantiate it, which might be undesirable when just checking. + */ + def hasDispatcher(id: String): Boolean = settings.config.hasPath(id) + private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { dispatcherConfigurators.get(id) match { case null ⇒ @@ -78,15 +88,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. val newConfigurator = - if (settings.config.hasPath(id)) { - configuratorFrom(config(id)) - } else { - // Note that the configurator of the default dispatcher will be registered for this id, - // so this will only be logged once, which is crucial. - prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass, - "Dispatcher [%s] not configured, using default-dispatcher".format(id))) - lookupConfigurator(DefaultDispatcherId) - } + if (settings.config.hasPath(id)) configuratorFrom(config(id)) + else throw new ConfigurationException(s"Dispatcher [$id] not configured") dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { case null ⇒ newConfigurator @@ -140,7 +143,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator */ private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { - if (!cfg.hasPath("id")) throw new IllegalArgumentException("Missing dispatcher 'id' property in config: " + cfg.root.render) + if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) cfg.getString("type") match { case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites) @@ -150,7 +153,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ case exception ⇒ - throw new IllegalArgumentException( + throw new ConfigurationException( ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + "make sure it has constructor with [com.typesafe.config.Config] and " + "[akka.dispatch.DispatcherPrerequisites] parameters") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 176cfc5ec9..08c598754b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -8,7 +8,6 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigObject import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.ConfigurationException import akka.actor.Address import akka.actor.AddressFromURIString import akka.dispatch.Dispatchers diff --git a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java index 3fb64a1f6b..e3b4632913 100644 --- a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java @@ -34,12 +34,15 @@ import akka.routing.RoundRobinRouter; import akka.routing.RouteeProvider; import akka.testkit.AkkaSpec; import akka.util.Timeout; +import com.typesafe.config.ConfigFactory; public class CustomRouterDocTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("CustomRouterDocTest", AkkaSpec.testConf()); + new AkkaJUnitActorSystemResource( + "CustomRouterDocTest", ConfigFactory.load(ConfigFactory.parseString( + "head{}\nworkers{}").withFallback(AkkaSpec.testConf()))); private final ActorSystem system = actorSystemResource.getSystem(); diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index 19497c3838..501e8d44d5 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -50,6 +50,21 @@ ActorContext & ActorRefFactory dispatcher The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``. +Removed fallback to default dispatcher +====================================== + +If deploying an actor with a specific dispatcher, e.g. +``Props(...).withDispatcher("d")``, then it would previously fall back to +``akka.actor.default-dispatcher`` if no configuration section for ``d`` could +be found. + +This was beneficial for preparing later deployment choices during development +by grouping actors on dispatcher IDs but not immediately configuring those. +Akka 2.2 introduces the possibility to add dispatcher configuration to the +``akka.actor.deployment`` section, making this unnecessary. + +The fallback was removed because in many cases its application was neither +intended nor noticed. API changes to FSM and TestFSMRef ================================= diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index 0df50cb494..793155b79e 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -265,7 +265,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { // ActorSystem is a heavy object: create only one per application val system = ActorSystem("mySystem") - val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor2") + val myActor = system.actorOf(Props[MyActor], "myactor2") //#system-actorOf shutdown(system) } diff --git a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala index 33b34eb49f..350b56fbd4 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala @@ -16,7 +16,10 @@ object RouterDocSpec { } } -class RouterDocSpec extends AkkaSpec { +class RouterDocSpec extends AkkaSpec(""" +router {} +workers {} +""") { import RouterDocSpec._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 01c45e06b3..2873e4720d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -205,10 +205,13 @@ private[akka] class RemoteActorRefProvider( system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher") def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, - systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = { + systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async) else { + if (!system.dispatchers.hasDispatcher(props.dispatcher)) + throw new ConfigurationException(s"Dispatcher [${props.dispatcher}] not configured for path $path") + /* * This needs to deal with “mangled” paths, which are created by remote * deployment, also in this method. The scheme is the following: @@ -281,7 +284,6 @@ private[akka] class RemoteActorRefProvider( case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) } } - } @deprecated("use actorSelection instead of actorFor", "2.2") def actorFor(path: ActorPath): InternalActorRef = {