remove fallback to default-dispatcher, see #3394
This commit is contained in:
parent
58756be937
commit
f802c94b9b
10 changed files with 86 additions and 36 deletions
|
|
@ -75,5 +75,23 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri
|
||||||
"not fail verification with a ConfigurationException if also configured with a Router" in {
|
"not fail verification with a ConfigurationException if also configured with a Router" in {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("pinned-dispatcher").withRouter(RoundRobinRouter(2)))
|
system.actorOf(Props[TestActor].withDispatcher("pinned-dispatcher").withRouter(RoundRobinRouter(2)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"fail verification if the dispatcher cannot be found" in {
|
||||||
|
intercept[ConfigurationException] {
|
||||||
|
system.actorOf(Props[TestActor].withDispatcher("does not exist"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail verification if the dispatcher cannot be found for the head of a router" in {
|
||||||
|
intercept[ConfigurationException] {
|
||||||
|
system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(1, routerDispatcher = "does not exist")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail verification if the dispatcher cannot be found for the routees of a router" in {
|
||||||
|
intercept[ConfigurationException] {
|
||||||
|
system.actorOf(Props[TestActor].withDispatcher("does not exist").withRouter(RoundRobinRouter(1)))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,18 +3,15 @@
|
||||||
*/
|
*/
|
||||||
package akka.actor.dispatch
|
package akka.actor.dispatch
|
||||||
|
|
||||||
import language.postfixOps
|
import scala.collection.JavaConverters.mapAsJavaMapConverter
|
||||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import akka.dispatch._
|
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
import akka.testkit.ImplicitSender
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
|
||||||
import akka.actor.Props
|
import akka.ConfigurationException
|
||||||
import scala.concurrent.duration._
|
import akka.actor.{ Actor, ActorRef, Props }
|
||||||
import akka.actor.ActorRef
|
import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher }
|
||||||
|
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||||
|
|
||||||
object DispatchersSpec {
|
object DispatchersSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -101,9 +98,10 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
|
||||||
dispatcher.id must be("myapp.mydispatcher")
|
dispatcher.id must be("myapp.mydispatcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
"use default dispatcher for missing config" in {
|
"complain about missing config" in {
|
||||||
val dispatcher = lookup("myapp.other-dispatcher")
|
intercept[ConfigurationException] {
|
||||||
dispatcher must be === defaultGlobalDispatcher
|
lookup("myapp.other-dispatcher")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"have only one default dispatcher" in {
|
"have only one default dispatcher" in {
|
||||||
|
|
@ -112,8 +110,8 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
|
||||||
dispatcher must be === system.dispatcher
|
dispatcher must be === system.dispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
"throw IllegalArgumentException if type does not exist" in {
|
"throw ConfigurationException if type does not exist" in {
|
||||||
intercept[IllegalArgumentException] {
|
intercept[ConfigurationException] {
|
||||||
from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava).
|
from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava).
|
||||||
withFallback(defaultDispatcherConfig))
|
withFallback(defaultDispatcherConfig))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import scala.util.{ Success, Failure }
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||||
import scala.annotation.implicitNotFound
|
import scala.annotation.implicitNotFound
|
||||||
|
import akka.ConfigurationException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for all ActorRef providers to implement.
|
* Interface for all ActorRef providers to implement.
|
||||||
|
|
@ -709,6 +710,9 @@ private[akka] class LocalActorRefProvider private[akka] (
|
||||||
|
|
||||||
} else props
|
} else props
|
||||||
|
|
||||||
|
if (!system.dispatchers.hasDispatcher(props2.dispatcher))
|
||||||
|
throw new ConfigurationException(s"Dispatcher [${props2.dispatcher}] not configured for path $path")
|
||||||
|
|
||||||
if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async)
|
if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async)
|
||||||
else new LocalActorRef(system, props2, supervisor, path)
|
else new LocalActorRef(system, props2, supervisor, path)
|
||||||
|
|
||||||
|
|
@ -716,7 +720,12 @@ private[akka] class LocalActorRefProvider private[akka] (
|
||||||
val lookup = if (lookupDeploy) deployer.lookup(path) else None
|
val lookup = if (lookupDeploy) deployer.lookup(path) else None
|
||||||
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
|
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
|
||||||
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a)
|
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a)
|
||||||
new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async)
|
val p = props.withRouter(d.routerConfig)
|
||||||
|
if (!system.dispatchers.hasDispatcher(p.dispatcher))
|
||||||
|
throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for routees of $path")
|
||||||
|
if (!system.dispatchers.hasDispatcher(d.routerConfig.routerDispatcher))
|
||||||
|
throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for router of $path")
|
||||||
|
new RoutedActorRef(system, p, supervisor, path).initialize(async)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import akka.actor.{ Scheduler, DynamicAccess, ActorSystem }
|
||||||
import akka.event.Logging.Warning
|
import akka.event.Logging.Warning
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
import akka.ConfigurationException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher
|
* DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher
|
||||||
|
|
@ -65,12 +66,21 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
|
private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a dispatcher as specified in configuration, or if not defined it uses
|
* Returns a dispatcher as specified in configuration. Please note that this
|
||||||
* the default dispatcher. Please note that this method _may_ create and return a NEW dispatcher,
|
* method _may_ create and return a NEW dispatcher, _every_ call.
|
||||||
* _every_ call.
|
*
|
||||||
|
* @throws ConfigurationException if the specified dispatcher cannot be found in the configuration
|
||||||
*/
|
*/
|
||||||
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()
|
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that the configuration provides a section for the given dispatcher.
|
||||||
|
* This does not guarantee that no ConfigurationException will be thrown when
|
||||||
|
* using this dispatcher, because the details can only be checked by trying
|
||||||
|
* to instantiate it, which might be undesirable when just checking.
|
||||||
|
*/
|
||||||
|
def hasDispatcher(id: String): Boolean = settings.config.hasPath(id)
|
||||||
|
|
||||||
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
|
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
|
||||||
dispatcherConfigurators.get(id) match {
|
dispatcherConfigurators.get(id) match {
|
||||||
case null ⇒
|
case null ⇒
|
||||||
|
|
@ -78,15 +88,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
// That shouldn't happen often and in case it does the actual ExecutorService isn't
|
// That shouldn't happen often and in case it does the actual ExecutorService isn't
|
||||||
// created until used, i.e. cheap.
|
// created until used, i.e. cheap.
|
||||||
val newConfigurator =
|
val newConfigurator =
|
||||||
if (settings.config.hasPath(id)) {
|
if (settings.config.hasPath(id)) configuratorFrom(config(id))
|
||||||
configuratorFrom(config(id))
|
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
|
||||||
} else {
|
|
||||||
// Note that the configurator of the default dispatcher will be registered for this id,
|
|
||||||
// so this will only be logged once, which is crucial.
|
|
||||||
prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass,
|
|
||||||
"Dispatcher [%s] not configured, using default-dispatcher".format(id)))
|
|
||||||
lookupConfigurator(DefaultDispatcherId)
|
|
||||||
}
|
|
||||||
|
|
||||||
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
|
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
|
||||||
case null ⇒ newConfigurator
|
case null ⇒ newConfigurator
|
||||||
|
|
@ -140,7 +143,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
|
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
|
||||||
*/
|
*/
|
||||||
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
|
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
|
||||||
if (!cfg.hasPath("id")) throw new IllegalArgumentException("Missing dispatcher 'id' property in config: " + cfg.root.render)
|
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
|
||||||
|
|
||||||
cfg.getString("type") match {
|
cfg.getString("type") match {
|
||||||
case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
|
case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
|
||||||
|
|
@ -150,7 +153,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
|
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
|
||||||
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
|
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
|
||||||
case exception ⇒
|
case exception ⇒
|
||||||
throw new IllegalArgumentException(
|
throw new ConfigurationException(
|
||||||
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
|
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
|
||||||
"make sure it has constructor with [com.typesafe.config.Config] and " +
|
"make sure it has constructor with [com.typesafe.config.Config] and " +
|
||||||
"[akka.dispatch.DispatcherPrerequisites] parameters")
|
"[akka.dispatch.DispatcherPrerequisites] parameters")
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigObject
|
import com.typesafe.config.ConfigObject
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.ConfigurationException
|
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.actor.AddressFromURIString
|
import akka.actor.AddressFromURIString
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
|
|
|
||||||
|
|
@ -34,12 +34,15 @@ import akka.routing.RoundRobinRouter;
|
||||||
import akka.routing.RouteeProvider;
|
import akka.routing.RouteeProvider;
|
||||||
import akka.testkit.AkkaSpec;
|
import akka.testkit.AkkaSpec;
|
||||||
import akka.util.Timeout;
|
import akka.util.Timeout;
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
|
||||||
public class CustomRouterDocTest {
|
public class CustomRouterDocTest {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||||
new AkkaJUnitActorSystemResource("CustomRouterDocTest", AkkaSpec.testConf());
|
new AkkaJUnitActorSystemResource(
|
||||||
|
"CustomRouterDocTest", ConfigFactory.load(ConfigFactory.parseString(
|
||||||
|
"head{}\nworkers{}").withFallback(AkkaSpec.testConf())));
|
||||||
|
|
||||||
private final ActorSystem system = actorSystemResource.getSystem();
|
private final ActorSystem system = actorSystemResource.getSystem();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,21 @@ ActorContext & ActorRefFactory dispatcher
|
||||||
|
|
||||||
The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``.
|
The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``.
|
||||||
|
|
||||||
|
Removed fallback to default dispatcher
|
||||||
|
======================================
|
||||||
|
|
||||||
|
If deploying an actor with a specific dispatcher, e.g.
|
||||||
|
``Props(...).withDispatcher("d")``, then it would previously fall back to
|
||||||
|
``akka.actor.default-dispatcher`` if no configuration section for ``d`` could
|
||||||
|
be found.
|
||||||
|
|
||||||
|
This was beneficial for preparing later deployment choices during development
|
||||||
|
by grouping actors on dispatcher IDs but not immediately configuring those.
|
||||||
|
Akka 2.2 introduces the possibility to add dispatcher configuration to the
|
||||||
|
``akka.actor.deployment`` section, making this unnecessary.
|
||||||
|
|
||||||
|
The fallback was removed because in many cases its application was neither
|
||||||
|
intended nor noticed.
|
||||||
|
|
||||||
API changes to FSM and TestFSMRef
|
API changes to FSM and TestFSMRef
|
||||||
=================================
|
=================================
|
||||||
|
|
|
||||||
|
|
@ -265,7 +265,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
|
|
||||||
// ActorSystem is a heavy object: create only one per application
|
// ActorSystem is a heavy object: create only one per application
|
||||||
val system = ActorSystem("mySystem")
|
val system = ActorSystem("mySystem")
|
||||||
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor2")
|
val myActor = system.actorOf(Props[MyActor], "myactor2")
|
||||||
//#system-actorOf
|
//#system-actorOf
|
||||||
shutdown(system)
|
shutdown(system)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,10 @@ object RouterDocSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class RouterDocSpec extends AkkaSpec {
|
class RouterDocSpec extends AkkaSpec("""
|
||||||
|
router {}
|
||||||
|
workers {}
|
||||||
|
""") {
|
||||||
|
|
||||||
import RouterDocSpec._
|
import RouterDocSpec._
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -205,10 +205,13 @@ private[akka] class RemoteActorRefProvider(
|
||||||
system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher")
|
system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher")
|
||||||
|
|
||||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
||||||
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = {
|
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef =
|
||||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
|
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
|
||||||
else {
|
else {
|
||||||
|
|
||||||
|
if (!system.dispatchers.hasDispatcher(props.dispatcher))
|
||||||
|
throw new ConfigurationException(s"Dispatcher [${props.dispatcher}] not configured for path $path")
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This needs to deal with “mangled” paths, which are created by remote
|
* This needs to deal with “mangled” paths, which are created by remote
|
||||||
* deployment, also in this method. The scheme is the following:
|
* deployment, also in this method. The scheme is the following:
|
||||||
|
|
@ -281,7 +284,6 @@ private[akka] class RemoteActorRefProvider(
|
||||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@deprecated("use actorSelection instead of actorFor", "2.2")
|
@deprecated("use actorSelection instead of actorFor", "2.2")
|
||||||
def actorFor(path: ActorPath): InternalActorRef = {
|
def actorFor(path: ActorPath): InternalActorRef = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue