diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala new file mode 100644 index 0000000000..cdaa421a59 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor + +import akka.testkit._ +import akka.testkit.DefaultTimeout +import akka.testkit.TestEvent._ +import akka.dispatch.Await +import akka.util.duration._ +import akka.routing._ +import akka.config.ConfigurationException +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.BeforeAndAfterEach +import org.scalatest.junit.JUnitSuite + +object ActorConfigurationVerificationSpec { + + class TestActor extends Actor { + def receive: Receive = { + case _ ⇒ + } + } + + val config = """ + balancing-dispatcher { + type = BalancingDispatcher + throughput = 1 + } + pinned-dispatcher { + executor = "thread-pool-executor" + type = PinnedDispatcher + } + """ +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVerificationSpec.config) with DefaultTimeout with BeforeAndAfterEach { + import ActorConfigurationVerificationSpec._ + + override def atStartup { + system.eventStream.publish(Mute(EventFilter[ConfigurationException](""))) + } + + "An Actor configured with a BalancingDispatcher" must { + "fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RoundRobinRouter(2))) + } + } + "fail verification with a ConfigurationException if also configured with a BroadcastRouter" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(BroadcastRouter(2))) + } + } + "fail verification with a ConfigurationException if also configured with a RandomRouter" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RandomRouter(2))) + } + } + "fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(SmallestMailboxRouter(2))) + } + } + "fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in { + intercept[ConfigurationException] { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds))) + } + } + "not fail verification with a ConfigurationException also not configured with a Router" in { + system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher")) + } + } + "An Actor configured with a non-balancing dispatcher" must { + "not fail verification with a ConfigurationException if also configured with a Router" in { + system.actorOf(Props[TestActor].withDispatcher("pinned-dispatcher").withRouter(RoundRobinRouter(2))) + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 457c4ab411..ede4a69d7c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -128,35 +128,6 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with current.routees.size must be(2) } - "resize when busy" in { - - val busy = new TestLatch(1) - - val resizer = DefaultResizer( - lowerBound = 1, - upperBound = 3, - pressureThreshold = 0, - messagesPerResize = 1) - - val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp")) - - val latch1 = new TestLatch(1) - router ! (latch1, busy) - Await.ready(latch1, 2 seconds) - - val latch2 = new TestLatch(1) - router ! (latch2, busy) - Await.ready(latch2, 2 seconds) - - val latch3 = new TestLatch(1) - router ! (latch3, busy) - Await.ready(latch3, 2 seconds) - - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) - - busy.countDown() - } - "grow as needed under pressure" in { // make sure the pool starts at the expected lower limit and grows to the upper as needed // as influenced by the backlog of blocking pooled actors diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 93d44e007d..a81a8e6c2b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -120,9 +120,9 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * Throws: IllegalArgumentException if the value of "type" is not valid * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator */ - private[akka] def from(cfg: Config): MessageDispatcher = { - configuratorFrom(cfg).dispatcher() - } + private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher() + + private[akka] def isBalancingDispatcher(id: String): Boolean = settings.config.hasPath(id) && config(id).getString("type") == "BalancingDispatcher" /* * Creates a MessageDispatcherConfigurator from a Config. diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index fdf14a5b96..58ecbfcdc5 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -31,11 +31,17 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _supervisor, _path) { + // verify that a BalancingDispatcher is not used with a Router + if (_system.dispatchers.isBalancingDispatcher(_props.dispatcher) && _props.routerConfig != NoRouter) + throw new ConfigurationException( + "Configuration for actor [" + _path.toString + + "] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'") + /* * CAUTION: RoutedActorRef is PROBLEMATIC * ====================================== - * - * We are constructing/assembling the children outside of the scope of the + * + * We are constructing/assembling the children outside of the scope of the * Router actor, inserting them in its childrenRef list, which is not at all * synchronized. This is done exactly once at start-up, all other accesses * are done from the Router actor. This means that the only thing which is diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 6ef0d44d7e..90a0e9cb6a 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -70,7 +70,7 @@ There are 4 different types of message dispatchers: * BalancingDispatcher - - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. @@ -85,9 +85,11 @@ There are 4 different types of message dispatchers: "thread-pool-executor" or the FQCN of an ``akka.dispatcher.ExecutorServiceConfigurator`` + - Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification. + * CallingThreadDispatcher - - This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, + - This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, but it can be used from different threads concurrently for the same actor. See :ref:`TestCallingThreadDispatcherRef` for details and restrictions. diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 4d01642a72..e006c7db63 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -375,7 +375,8 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. +stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` +together with any kind of ``Router``, trying to do so will make your actor fail verification. The “head” router, of course, cannot run on the same balancing dispatcher, because it does not process the same messages, hence this special actor does diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 7d6a1f6334..a1cc431643 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -71,7 +71,7 @@ There are 4 different types of message dispatchers: * BalancingDispatcher - - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. @@ -86,9 +86,11 @@ There are 4 different types of message dispatchers: "thread-pool-executor" or the FQCN of an ``akka.dispatcher.ExecutorServiceConfigurator`` + - Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification. + * CallingThreadDispatcher - - This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, + - This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, but it can be used from different threads concurrently for the same actor. See :ref:`TestCallingThreadDispatcherRef` for details and restrictions. @@ -112,8 +114,8 @@ And then using it: .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher -Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` exmaple is -NOT applicable. This is because every actor will have its own thread pool when using ``PinnedDispatcher``, +Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` exmaple is +NOT applicable. This is because every actor will have its own thread pool when using ``PinnedDispatcher``, and that pool will have only one thread. Mailboxes diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 737c9e31e7..0d0625be36 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -375,7 +375,9 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. +stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` +together with any kind of ``Router``, trying to do so will make your actor fail verification. + .. note::