From 1ea7653a79f3e3e22e05d9b3e968cec20137281b Mon Sep 17 00:00:00 2001 From: dormidon Date: Thu, 10 Apr 2014 14:50:00 +0400 Subject: [PATCH] =act #13964 make BalancingPool mailbox configurable --- .../akka/actor/dispatch/DispatchersSpec.scala | 50 +++++++++++++++++-- .../scala/akka/routing/BalancingSpec.scala | 2 +- .../scala/akka/dispatch/Dispatchers.scala | 8 ++- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 8f742d84a3..cbf00f7883 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -5,14 +5,15 @@ package akka.actor.dispatch import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.reflect.ClassTag +import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.ConfigurationException -import akka.actor.{ Actor, ActorRef, Props } -import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher } +import akka.actor._ +import akka.dispatch._ import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.routing.FromConfig -import akka.actor.Identify -import akka.actor.ActorIdentity +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicBoolean object DispatchersSpec { val config = """ @@ -30,6 +31,9 @@ object DispatchersSpec { balancing-dispatcher { type = "akka.dispatch.BalancingDispatcherConfigurator" } + mymailbox { + mailbox-type = "akka.actor.dispatch.DispatchersSpec$OneShotMailboxType" + } } akka.actor.deployment { /echo1 { @@ -46,6 +50,15 @@ object DispatchersSpec { fork-join-executor.parallelism-max = 3 } } + /balanced { + router = balancing-pool + nr-of-instances = 3 + pool-dispatcher { + mailbox = myapp.mymailbox + fork-join-executor.parallelism-min = 3 + fork-join-executor.parallelism-max = 3 + } + } } """ @@ -54,6 +67,24 @@ object DispatchersSpec { case _ ⇒ sender() ! Thread.currentThread.getName } } + + class OneShotMailboxType(settings: ActorSystem.Settings, config: Config) + extends MailboxType with ProducesMessageQueue[DoublingMailbox] { + val created = new AtomicBoolean(false) + override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = + if (created.compareAndSet(false, true)) { + new DoublingMailbox(owner) + } else + throw new IllegalStateException("I've already created the mailbox.") + } + + class DoublingMailbox(owner: Option[ActorRef]) extends UnboundedQueueBasedMessageQueue { + final val queue = new ConcurrentLinkedQueue[Envelope]() + override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { + queue add handle + queue add handle + } + } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -192,6 +223,15 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend } } + "use balancing-pool router with special routees mailbox of deployment config" in { + system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "balanced") ! "what's the name?" + val Expected = """(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""".r + expectMsgPF() { + case Expected(x) ⇒ + } + expectMsgPF() { + case Expected(x) ⇒ + } + } } - } diff --git a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala index e997720f1f..3f197545bb 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala @@ -79,7 +79,7 @@ class BalancingSpec extends AkkaSpec( "deliver messages in a balancing fashion when defined in config" in { val latch = TestLatch(1) - val pool = system.actorOf(BalancingPool(1).props(routeeProps = + val pool = system.actorOf(FromConfig().props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-2") test(pool, latch) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 413658e386..98ce07cf72 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -245,7 +245,13 @@ class BalancingDispatcherConfigurator(_config: Config, _prerequisites: Dispatche "BalancingDispatcher must have 'mailbox-requirement' which implements akka.dispatch.MultipleConsumerSemantics; " + s"dispatcher [$id] has [$requirement]") val mailboxType = - if (config.hasPath("mailbox-type")) { + if (config.hasPath("mailbox")) { + val mt = mailboxes.lookup(config.getString("mailbox")) + if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt))) + throw new IllegalArgumentException( + s"BalancingDispatcher [$id] has 'mailbox' [${mt.getClass}] which is incompatible with 'mailbox-requirement' [$requirement]") + mt + } else if (config.hasPath("mailbox-type")) { val mt = mailboxes.lookup(id) if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt))) throw new IllegalArgumentException(