Merge pull request #464 from akka/wip-verify-balancing-dispatcher-is-not-used-with-any-kind-of-router-jboner

Added verification that a BalancingDispatcher can not be used with any kind of Router.
This commit is contained in:
Jonas Bonér 2012-05-23 04:30:28 -07:00
commit 63e4b6e96e
8 changed files with 106 additions and 42 deletions

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.Await
import akka.util.duration._
import akka.routing._
import akka.config.ConfigurationException
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitSuite
object ActorConfigurationVerificationSpec {
class TestActor extends Actor {
def receive: Receive = {
case _
}
}
val config = """
balancing-dispatcher {
type = BalancingDispatcher
throughput = 1
}
pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVerificationSpec.config) with DefaultTimeout with BeforeAndAfterEach {
import ActorConfigurationVerificationSpec._
override def atStartup {
system.eventStream.publish(Mute(EventFilter[ConfigurationException]("")))
}
"An Actor configured with a BalancingDispatcher" must {
"fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in {
intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RoundRobinRouter(2)))
}
}
"fail verification with a ConfigurationException if also configured with a BroadcastRouter" in {
intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(BroadcastRouter(2)))
}
}
"fail verification with a ConfigurationException if also configured with a RandomRouter" in {
intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RandomRouter(2)))
}
}
"fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in {
intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(SmallestMailboxRouter(2)))
}
}
"fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in {
intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds)))
}
}
"not fail verification with a ConfigurationException also not configured with a Router" in {
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher"))
}
}
"An Actor configured with a non-balancing dispatcher" must {
"not fail verification with a ConfigurationException if also configured with a Router" in {
system.actorOf(Props[TestActor].withDispatcher("pinned-dispatcher").withRouter(RoundRobinRouter(2)))
}
}
}

View file

@ -128,35 +128,6 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
current.routees.size must be(2)
}
"resize when busy" in {
val busy = new TestLatch(1)
val resizer = DefaultResizer(
lowerBound = 1,
upperBound = 3,
pressureThreshold = 0,
messagesPerResize = 1)
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp"))
val latch1 = new TestLatch(1)
router ! (latch1, busy)
Await.ready(latch1, 2 seconds)
val latch2 = new TestLatch(1)
router ! (latch2, busy)
Await.ready(latch2, 2 seconds)
val latch3 = new TestLatch(1)
router ! (latch3, busy)
Await.ready(latch3, 2 seconds)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
busy.countDown()
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors

View file

@ -120,9 +120,9 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*/
private[akka] def from(cfg: Config): MessageDispatcher = {
configuratorFrom(cfg).dispatcher()
}
private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher()
private[akka] def isBalancingDispatcher(id: String): Boolean = settings.config.hasPath(id) && config(id).getString("type") == "BalancingDispatcher"
/*
* Creates a MessageDispatcherConfigurator from a Config.

View file

@ -31,6 +31,12 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
_supervisor,
_path) {
// verify that a BalancingDispatcher is not used with a Router
if (_system.dispatchers.isBalancingDispatcher(_props.dispatcher) && _props.routerConfig != NoRouter)
throw new ConfigurationException(
"Configuration for actor [" + _path.toString +
"] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'")
/*
* CAUTION: RoutedActorRef is PROBLEMATIC
* ======================================

View file

@ -85,6 +85,8 @@ There are 4 different types of message dispatchers:
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
- Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification.
* CallingThreadDispatcher
- This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads,

View file

@ -375,7 +375,8 @@ The dispatcher for created children of the router will be taken from
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher``
together with any kind of ``Router``, trying to do so will make your actor fail verification.
The “head” router, of course, cannot run on the same balancing dispatcher,
because it does not process the same messages, hence this special actor does

View file

@ -86,6 +86,8 @@ There are 4 different types of message dispatchers:
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
- Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification.
* CallingThreadDispatcher
- This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads,

View file

@ -375,7 +375,9 @@ The dispatcher for created children of the router will be taken from
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher``
together with any kind of ``Router``, trying to do so will make your actor fail verification.
.. note::