Merge pull request #2127 from dormidon/3964-unable-to-configure-mailbox-for-routees-under-balancingpool-in-akka-2-3-1

Mailbox configuration for routees under BalancingPool - Issue 3964 fix
This commit is contained in:
Roland Kuhn 2014-05-12 10:08:59 +02:00
commit e43e48329c
3 changed files with 53 additions and 7 deletions

View file

@ -5,14 +5,15 @@ package akka.actor.dispatch
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.reflect.ClassTag
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.ConfigurationException
import akka.actor.{ Actor, ActorRef, Props }
import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher }
import akka.actor._
import akka.dispatch._
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.routing.FromConfig
import akka.actor.Identify
import akka.actor.ActorIdentity
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
object DispatchersSpec {
val config = """
@ -30,6 +31,9 @@ object DispatchersSpec {
balancing-dispatcher {
type = "akka.dispatch.BalancingDispatcherConfigurator"
}
mymailbox {
mailbox-type = "akka.actor.dispatch.DispatchersSpec$OneShotMailboxType"
}
}
akka.actor.deployment {
/echo1 {
@ -46,6 +50,15 @@ object DispatchersSpec {
fork-join-executor.parallelism-max = 3
}
}
/balanced {
router = balancing-pool
nr-of-instances = 3
pool-dispatcher {
mailbox = myapp.mymailbox
fork-join-executor.parallelism-min = 3
fork-join-executor.parallelism-max = 3
}
}
}
"""
@ -54,6 +67,24 @@ object DispatchersSpec {
case _ sender() ! Thread.currentThread.getName
}
}
class OneShotMailboxType(settings: ActorSystem.Settings, config: Config)
extends MailboxType with ProducesMessageQueue[DoublingMailbox] {
val created = new AtomicBoolean(false)
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) =
if (created.compareAndSet(false, true)) {
new DoublingMailbox(owner)
} else
throw new IllegalStateException("I've already created the mailbox.")
}
class DoublingMailbox(owner: Option[ActorRef]) extends UnboundedQueueBasedMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
queue add handle
queue add handle
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -192,6 +223,15 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
}
}
"use balancing-pool router with special routees mailbox of deployment config" in {
system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "balanced") ! "what's the name?"
val Expected = """(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""".r
expectMsgPF() {
case Expected(x)
}
expectMsgPF() {
case Expected(x)
}
}
}
}

View file

@ -79,7 +79,7 @@ class BalancingSpec extends AkkaSpec(
"deliver messages in a balancing fashion when defined in config" in {
val latch = TestLatch(1)
val pool = system.actorOf(BalancingPool(1).props(routeeProps =
val pool = system.actorOf(FromConfig().props(routeeProps =
Props(classOf[Worker], latch)), name = "balancingPool-2")
test(pool, latch)
}

View file

@ -245,7 +245,13 @@ class BalancingDispatcherConfigurator(_config: Config, _prerequisites: Dispatche
"BalancingDispatcher must have 'mailbox-requirement' which implements akka.dispatch.MultipleConsumerSemantics; " +
s"dispatcher [$id] has [$requirement]")
val mailboxType =
if (config.hasPath("mailbox-type")) {
if (config.hasPath("mailbox")) {
val mt = mailboxes.lookup(config.getString("mailbox"))
if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt)))
throw new IllegalArgumentException(
s"BalancingDispatcher [$id] has 'mailbox' [${mt.getClass}] which is incompatible with 'mailbox-requirement' [$requirement]")
mt
} else if (config.hasPath("mailbox-type")) {
val mt = mailboxes.lookup(id)
if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt)))
throw new IllegalArgumentException(