Allow different types of mailboxes on the same dispatcher. See #2687

This commit is contained in:
Björn Antonsson 2013-04-18 13:35:36 +02:00
parent c86cc0b8f7
commit c3eed374f1
32 changed files with 735 additions and 74 deletions

View file

@ -0,0 +1,171 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import com.typesafe.config.ConfigFactory
import akka.testkit._
import akka.dispatch._
object ActorMailboxSpec {
val mailboxConf = ConfigFactory.parseString("""
unbounded-dispatcher {
mailbox-type = "akka.dispatch.UnboundedMailbox"
}
bounded-dispatcher {
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
mailbox-type = "akka.dispatch.BoundedMailbox"
}
unbounded-mailbox {
mailbox-type = "akka.dispatch.UnboundedMailbox"
}
bounded-mailbox {
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
mailbox-type = "akka.dispatch.BoundedMailbox"
}
akka.actor.deployment {
/default-default {
}
/default-override-from-props {
}
/default-override-from-trait {
}
/default-override-from-stash {
}
/default-bounded {
mailbox = bounded-mailbox
}
/default-unbounded-deque {
mailbox = akka.actor.mailbox.unbounded-deque-based
}
/default-unbounded-deque-override-trait {
mailbox = akka.actor.mailbox.unbounded-deque-based
}
/unbounded-default {
dispatcher = unbounded-dispatcher
}
/unbounded-default-override-trait {
dispatcher = unbounded-dispatcher
}
/unbounded-bounded {
dispatcher= unbounded-dispatcher
mailbox = bounded-mailbox
}
/bounded-default {
dispatcher = bounded-dispatcher
}
/bounded-unbounded {
dispatcher = bounded-dispatcher
mailbox = unbounded-mailbox
}
/bounded-unbounded-override-props {
dispatcher = bounded-dispatcher
mailbox = unbounded-mailbox
}
}
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
""")
class QueueReportingActor extends Actor {
def receive = {
case _ sender ! context.asInstanceOf[ActorCell].mailbox.messageQueue
}
}
class BoundedQueueReportingActor extends QueueReportingActor with RequiresMessageQueue[BoundedMessageQueueSemantics]
class StashQueueReportingActor extends QueueReportingActor with Stash
val UnboundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[UnboundedMessageQueueSemantics])
val BoundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[BoundedMessageQueueSemantics])
val UnboundedDeqMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[DequeBasedMessageQueue],
classOf[UnboundedDequeBasedMessageQueueSemantics])
}
class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with DefaultTimeout with ImplicitSender {
import ActorMailboxSpec._
def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): Unit = {
val actor = system.actorOf(props, name)
actor ! "ping"
val q = expectMsgType[MessageQueue]
types foreach (t assert(t isInstance q, s"Type [${q.getClass}] is not assignable to [${t}]"))
}
"An Actor" must {
"get an unbounded message queue by default" in {
checkMailboxQueue(Props[QueueReportingActor], "default-default", UnboundedMailboxTypes)
}
"get an unbounded dequeu message queue when it is only configured on the props" in {
checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"),
"default-override-from-props", UnboundedDeqMailboxTypes)
}
"get an bounded message queue when it's only configured with RequiresMailbox" in {
checkMailboxQueue(Props[BoundedQueueReportingActor],
"default-override-from-trait", BoundedMailboxTypes)
}
"get an unbounded dequeu message queue when it's only mixed with Stash" in {
checkMailboxQueue(Props[StashQueueReportingActor],
"default-override-from-stash", UnboundedDeqMailboxTypes)
}
"get a bounded message queue when it's configured as mailbox" in {
checkMailboxQueue(Props[QueueReportingActor], "default-bounded", BoundedMailboxTypes)
}
"get an unbounded dequeu message queue when it's configured as mailbox" in {
checkMailboxQueue(Props[QueueReportingActor], "default-unbounded-deque", UnboundedDeqMailboxTypes)
}
"get an unbounded dequeu message queue when it's configured as mailbox overriding RequestMailbox" in {
filterEvents(EventFilter[IllegalArgumentException]()) {
checkMailboxQueue(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait",
UnboundedDeqMailboxTypes)
}
}
"get an unbounded message queue when defined in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "unbounded-default", UnboundedMailboxTypes)
}
"get an unbounded message queue when defined in dispatcher overriding RequestMailbox" in {
filterEvents(EventFilter[IllegalArgumentException]()) {
checkMailboxQueue(Props[BoundedQueueReportingActor], "unbounded-default-override-trait", UnboundedMailboxTypes)
}
}
"get a bounded message queue when it's configured as mailbox overriding unbounded in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "unbounded-bounded", BoundedMailboxTypes)
}
"get a bounded message queue by when defined in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "bounded-default", BoundedMailboxTypes)
}
"get an unbounded message queue when it's configured as mailbox overriding bounded in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "bounded-unbounded", UnboundedMailboxTypes)
}
"get an unbounded message queue overriding configuration on the props" in {
checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"),
"bounded-unbounded-override-props", UnboundedMailboxTypes)
}
}
}

View file

@ -89,6 +89,7 @@ object ActorSystemSpec {
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType,
mailBoxTypeConfigured,
configureExecutor(), configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
val doneIt = new Switch val doneIt = new Switch

View file

@ -28,6 +28,9 @@ object DeployerSpec {
/service3 { /service3 {
dispatcher = my-dispatcher dispatcher = my-dispatcher
} }
/service4 {
mailbox = my-mailbox
}
/service-round-robin { /service-round-robin {
router = round-robin router = round-robin
} }
@ -80,7 +83,8 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment.get.config, deployment.get.config,
NoRouter, NoRouter,
NoScopeGiven, NoScopeGiven,
Deploy.NoDispatcherGiven))) Deploy.NoDispatcherGiven,
Deploy.NoMailboxGiven)))
} }
"use None deployment for undefined service" in { "use None deployment for undefined service" in {
@ -99,7 +103,22 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment.get.config, deployment.get.config,
NoRouter, NoRouter,
NoScopeGiven, NoScopeGiven,
dispatcher = "my-dispatcher"))) dispatcher = "my-dispatcher",
Deploy.NoMailboxGiven)))
}
"be able to parse 'akka.actor.deployment._' with mailbox config" in {
val service = "/service4"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1))
deployment must be(Some(
Deploy(
service,
deployment.get.config,
NoRouter,
NoScopeGiven,
Deploy.NoDispatcherGiven,
mailbox = "my-mailbox")))
} }
"detect invalid number-of-instances" in { "detect invalid number-of-instances" in {

View file

@ -83,6 +83,7 @@ object SupervisorHierarchySpec {
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType,
mailBoxTypeConfigured,
configureExecutor(), configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {

View file

@ -521,6 +521,7 @@ object DispatcherModelSpec {
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType,
mailBoxTypeConfigured,
configureExecutor(), configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
@ -594,6 +595,7 @@ object BalancingDispatcherModelSpec {
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType,
mailBoxTypeConfigured,
configureExecutor(), configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor

View file

@ -32,7 +32,7 @@ object CallingThreadDispatcherModelSpec {
extends MessageDispatcherConfigurator(config, prerequisites) { extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = private val instance: MessageDispatcher =
new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor { new CallingThreadDispatcher(prerequisites, UnboundedMailbox(), false) with MessageDispatcherInterceptor {
override def id: String = config.getString("id") override def id: String = config.getString("id")
} }

View file

@ -99,6 +99,14 @@ akka {
# specified at all. # specified at all.
dispatcher = "" dispatcher = ""
# The id of the mailbox to use for this actor.
# If undefined or empty the default mailbox of the configured dispatcher
# is used or if there is no mailbox configuration the mailbox specified
# in code (Props.withMailbox) is used.
# If there is a mailbox defined in the configured dispatcher then that
# overrides this setting.
mailbox = ""
# routing (load-balance) scheme to use # routing (load-balance) scheme to use
# - available: "from-code", "round-robin", "random", "smallest-mailbox", # - available: "from-code", "round-robin", "random", "smallest-mailbox",
# "scatter-gather", "broadcast" # "scatter-gather", "broadcast"
@ -191,11 +199,6 @@ akka {
} }
} }
# Default dispatcher for Actors that extend Stash
default-stash-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
default-dispatcher { default-dispatcher {
# Must be one of the following # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are # Dispatcher, (BalancingDispatcher, only valid when all actors using it are
@ -308,6 +311,24 @@ akka {
stash-capacity = -1 stash-capacity = -1
} }
mailbox {
# Mapping between message queue semantics and mailbox configurations.
# Used by akka.dispatch.RequiresMessageQueue[T] to enforce different
# mailbox types on actors.
# If your Actor implements RequiresMessageQueue[T], then when you create
# an instance of that actor its mailbox type will be decided by looking
# up a mailbox configuration via T in this mapping
requirements {
"akka.dispatch.DequeBasedMessageQueue" = akka.actor.mailbox.unbounded-deque-based
}
unbounded-deque-based {
# FQCN of the MailboxType, The Class of the FQCN must have a public constructor
# with (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
}
debug { debug {
# enable function of Actor.loggable(), which is to log any received message # enable function of Actor.loggable(), which is to log any received message
# at DEBUG level, see the “Testing Actor Systems” section of the Akka # at DEBUG level, see the “Testing Actor Systems” section of the Akka

View file

@ -279,6 +279,10 @@ private[akka] trait Cell {
* which may be a costly operation, 0 otherwise. * which may be a costly operation, 0 otherwise.
*/ */
def numberOfMessages: Int def numberOfMessages: Int
/**
* The props for this actor cell.
*/
def props: Props
} }
/** /**

View file

@ -68,8 +68,7 @@ import java.util.concurrent.TimeUnit
* be circumvented by shadowing the name `system` within `"fred"`). * be circumvented by shadowing the name `system` within `"fred"`).
* *
* <b>Note:</b> If you want to use an `Act with Stash`, you should use the * <b>Note:</b> If you want to use an `Act with Stash`, you should use the
* `ActWithStash` trait in order to have the actor run on a special dispatcher * `ActWithStash` trait in order to have the actor get the necessary deque-based
* (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based
* mailbox setting. * mailbox setting.
*/ */
object ActorDSL extends dsl.Inbox with dsl.Creators { object ActorDSL extends dsl.Inbox with dsl.Creators {

View file

@ -693,11 +693,20 @@ private[akka] class LocalActorRefProvider private[akka] (
} }
val props2 = val props2 =
if (lookupDeploy) deployer.lookup(path) match { if (lookupDeploy) {
case Some(d) if d.dispatcher != Deploy.NoDispatcherGiven props.withDispatcher(d.dispatcher) // mailbox and dispatcher defined in deploy should override props
case _ props // dispatcher not specified in deployment config deployer.lookup(path) match {
} case Some(d)
else props (d.dispatcher, d.mailbox) match {
case (Deploy.NoDispatcherGiven, Deploy.NoMailboxGiven) props
case (dsp, Deploy.NoMailboxGiven) props.withDispatcher(dsp)
case (Deploy.NoMailboxGiven, mbx) props.withMailbox(mbx)
case (dsp, mbx) props.withDispatcher(dsp).withMailbox(mbx)
}
case _ props // no deployment config found
}
} else props
if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async) if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async)
else new LocalActorRef(system, props2, supervisor, path) else new LocalActorRef(system, props2, supervisor, path)

View file

@ -319,6 +319,11 @@ abstract class ActorSystem extends ActorRefFactory {
*/ */
implicit def dispatcher: ExecutionContext implicit def dispatcher: ExecutionContext
/**
* Helper object for looking up configured mailbox types.
*/
def mailboxes: Mailboxes
/** /**
* Register a block of code (callback) to run after ActorSystem.shutdown has been issued and * Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
* all actors in this actor system have been stopped. * all actors in this actor system have been stopped.
@ -560,6 +565,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess)
val internalCallingThreadExecutionContext: ExecutionContext = val internalCallingThreadExecutionContext: ExecutionContext =
dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse( dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse(
new ExecutionContext with BatchingExecutor { new ExecutionContext with BatchingExecutor {

View file

@ -15,6 +15,7 @@ import scala.annotation.tailrec
object Deploy { object Deploy {
final val NoDispatcherGiven = "" final val NoDispatcherGiven = ""
final val NoMailboxGiven = ""
} }
/** /**
@ -32,13 +33,14 @@ object Deploy {
* val remoteProps = someProps.withDeploy(Deploy(scope = RemoteScope("someOtherNodeName"))) * val remoteProps = someProps.withDeploy(Deploy(scope = RemoteScope("someOtherNodeName")))
* }}} * }}}
*/ */
@SerialVersionUID(1L) @SerialVersionUID(2L)
final case class Deploy( final case class Deploy(
path: String = "", path: String = "",
config: Config = ConfigFactory.empty, config: Config = ConfigFactory.empty,
routerConfig: RouterConfig = NoRouter, routerConfig: RouterConfig = NoRouter,
scope: Scope = NoScopeGiven, scope: Scope = NoScopeGiven,
dispatcher: String = Deploy.NoDispatcherGiven) { dispatcher: String = Deploy.NoDispatcherGiven,
mailbox: String = Deploy.NoMailboxGiven) {
/** /**
* Java API to create a Deploy with the given RouterConfig * Java API to create a Deploy with the given RouterConfig
@ -61,13 +63,13 @@ final case class Deploy(
* other members are merged using ``<X>.withFallback(other.<X>)``. * other members are merged using ``<X>.withFallback(other.<X>)``.
*/ */
def withFallback(other: Deploy): Deploy = { def withFallback(other: Deploy): Deploy = {
val disp = if (dispatcher == Deploy.NoDispatcherGiven) other.dispatcher else dispatcher
Deploy( Deploy(
path, path,
config.withFallback(other.config), config.withFallback(other.config),
routerConfig.withFallback(other.routerConfig), routerConfig.withFallback(other.routerConfig),
scope.withFallback(other.scope), scope.withFallback(other.scope),
disp) if (dispatcher == Deploy.NoDispatcherGiven) other.dispatcher else dispatcher,
if (mailbox == Deploy.NoMailboxGiven) other.mailbox else mailbox)
} }
} }
@ -154,7 +156,8 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
val deployment = config.withFallback(default) val deployment = config.withFallback(default)
val router = createRouterConfig(deployment.getString("router"), key, config, deployment) val router = createRouterConfig(deployment.getString("router"), key, config, deployment)
val dispatcher = deployment.getString("dispatcher") val dispatcher = deployment.getString("dispatcher")
Some(Deploy(key, deployment, router, NoScopeGiven, dispatcher)) val mailbox = deployment.getString("mailbox")
Some(Deploy(key, deployment, router, NoScopeGiven, dispatcher, mailbox))
} }
/** /**

View file

@ -10,7 +10,7 @@ import scala.reflect.ClassTag
import akka.routing._ import akka.routing._
import akka.util.Reflect import akka.util.Reflect
import scala.annotation.varargs import scala.annotation.varargs
import Deploy.NoDispatcherGiven import Deploy.{ NoDispatcherGiven, NoMailboxGiven }
import scala.collection.immutable import scala.collection.immutable
/** /**
@ -177,6 +177,15 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) {
case x x case x x
} }
/**
* Convenience method for extracting the mailbox information from the
* contained [[Deploy]] instance.
*/
def mailbox: Option[String] = deploy.mailbox match {
case NoMailboxGiven None
case x Some(x)
}
/** /**
* Convenience method for extracting the router configuration from the * Convenience method for extracting the router configuration from the
* contained [[Deploy]] instance. * contained [[Deploy]] instance.
@ -218,6 +227,11 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) {
*/ */
def withDispatcher(d: String): Props = copy(deploy = deploy.copy(dispatcher = d)) def withDispatcher(d: String): Props = copy(deploy = deploy.copy(dispatcher = d))
/**
* Returns a new Props with the specified mailbox set.
*/
def withMailbox(m: String): Props = copy(deploy = deploy.copy(mailbox = m))
/** /**
* Returns a new Props with the specified router config set. * Returns a new Props with the specified router config set.
*/ */
@ -309,3 +323,11 @@ private[akka] class CreatorConsumer(creator: Creator[Actor]) extends IndirectAct
override def actorClass = classOf[Actor] override def actorClass = classOf[Actor]
override def produce() = creator.create() override def produce() = creator.create()
} }
/**
* INTERNAL API
*/
private[akka] class TypedCreatorFunctionConsumer(clz: Class[_ <: Actor], creator: () Actor) extends IndirectActorProducer {
override def actorClass = clz
override def produce() = creator()
}

View file

@ -3,7 +3,7 @@
*/ */
package akka.actor package akka.actor
import akka.dispatch.{ Envelope, DequeBasedMessageQueue } import akka.dispatch.{ RequiresMessageQueue, Envelope, DequeBasedMessageQueue }
import akka.AkkaException import akka.AkkaException
/** /**
@ -47,7 +47,7 @@ import akka.AkkaException
* any trait/class that overrides the `preRestart` callback. This means it's not possible to write * any trait/class that overrides the `preRestart` callback. This means it's not possible to write
* `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`. * `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`.
*/ */
trait Stash extends Actor { trait Stash extends Actor with RequiresMessageQueue[DequeBasedMessageQueue] {
/* The private stash of the actor. It is only accessible using `stash()` and /* The private stash of the actor. It is only accessible using `stash()` and
* `unstashAll()`. * `unstashAll()`.

View file

@ -5,22 +5,16 @@
package akka.actor.dsl package akka.actor.dsl
import scala.concurrent.Await import scala.concurrent.Await
import akka.actor.ActorLogging import akka.actor._
import scala.collection.immutable.TreeSet import scala.collection.immutable.TreeSet
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Cancellable
import akka.actor.{ Actor, Stash, SupervisorStrategy }
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
import akka.actor.{ ActorSystem, ActorRefFactory }
import akka.actor.ActorRef
import akka.util.Timeout import akka.util.Timeout
import akka.actor.Status
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask import akka.pattern.ask
import akka.actor.ActorDSL
import akka.actor.Props
import scala.reflect.ClassTag import scala.reflect.ClassTag
import akka.dispatch.{ UnboundedDequeBasedMailbox, RequiresMessageQueue }
trait Creators { this: ActorDSL.type trait Creators { this: ActorDSL.type
@ -154,10 +148,7 @@ trait Creators { this: ActorDSL.type ⇒
trait ActWithStash extends Act with Stash trait ActWithStash extends Act with Stash
private def mkProps(classOfActor: Class[_], ctor: () Actor): Props = private def mkProps(classOfActor: Class[_], ctor: () Actor): Props =
if (classOf[Stash].isAssignableFrom(classOfActor)) Props(classOf[TypedCreatorFunctionConsumer], classOfActor, ctor)
Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher")
else
Props(creator = ctor)
/** /**
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. * Create an actor from the given thunk which must produce an [[akka.actor.Actor]].

View file

@ -46,7 +46,20 @@ private[akka] trait Dispatch { this: ActorCell ⇒
* Create the mailbox and enqueue the Create() message to ensure that * Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else. * this is processed before anything else.
*/ */
swapMailbox(dispatcher.createMailbox(this)) val mbox = dispatcher.createMailbox(this)
val actorClass = this.props.actorClass
if (this.system.mailboxes.hasRequiredType(actorClass)) {
this.system.mailboxes.getRequiredType(actorClass).foreach {
case c if !c.isAssignableFrom(mbox.messageQueue.getClass)
// FIXME 3237 throw an exception here instead of just logging it,
// and update the comment on the RequiresMessageQueue trait
val e = new IllegalArgumentException(s"Actor [${this.self.path}] requires mailbox type [${c}]" +
s" got [${mbox.messageQueue.getClass}]")
this.systemImpl.eventStream.publish(Error(e, getClass.getName, getClass, e.getMessage))
case _
}
}
swapMailbox(mbox)
mailbox.setActor(this) mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅

View file

@ -126,6 +126,17 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/ */
protected[akka] def createMailbox(actor: Cell): Mailbox //FIXME should this really be private[akka]? protected[akka] def createMailbox(actor: Cell): Mailbox //FIXME should this really be private[akka]?
/**
* Finds out the mailbox type for an actor based on configuration, props and requirements.
*/
protected[akka] def getMailboxType(actor: Cell, mailboxType: MailboxType, mailboxTypeConfigured: Boolean): MailboxType =
actor.props.mailbox.flatMap(id actor.system.mailboxes.lookup(id)) match {
case Some(x) x
case None if mailboxTypeConfigured mailboxType
case None actor.system.mailboxes.getRequiredType(actor.props.actorClass).
flatMap(c actor.system.mailboxes.lookupByQueueType(c)).getOrElse(mailboxType)
}
/** /**
* Identifier of this dispatcher, corresponds to the full key * Identifier of this dispatcher, corresponds to the full key
* of the dispatcher configuration. * of the dispatcher configuration.
@ -351,6 +362,11 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
} }
} }
/**
* Was the mailbox type configured or derived?
*/
def mailBoxTypeConfigured: Boolean = config.getString("mailbox-type") != Deploy.NoMailboxGiven
def configureExecutor(): ExecutorServiceConfigurator = { def configureExecutor(): ExecutorServiceConfigurator = {
config.getString("executor") match { config.getString("executor") match {
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)

View file

@ -34,10 +34,11 @@ class BalancingDispatcher(
throughput: Int, throughput: Int,
throughputDeadlineTime: Duration, throughputDeadlineTime: Duration,
mailboxType: MailboxType, mailboxType: MailboxType,
_mailBoxTypeConfigured: Boolean,
_executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
_shutdownTimeout: FiniteDuration, _shutdownTimeout: FiniteDuration,
attemptTeamWork: Boolean) attemptTeamWork: Boolean)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _mailBoxTypeConfigured, _executorServiceFactoryProvider, _shutdownTimeout) {
/** /**
* INTERNAL API * INTERNAL API

View file

@ -14,6 +14,8 @@ import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.Awaitable import scala.concurrent.Awaitable
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import java.lang.reflect.ParameterizedType
/** /**
* The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a * The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
@ -33,6 +35,7 @@ class Dispatcher(
val throughput: Int, val throughput: Int,
val throughputDeadlineTime: Duration, val throughputDeadlineTime: Duration,
val mailboxType: MailboxType, val mailboxType: MailboxType,
val mailboxTypeConfigured: Boolean,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider, executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val shutdownTimeout: FiniteDuration) val shutdownTimeout: FiniteDuration)
extends MessageDispatcher(_prerequisites) { extends MessageDispatcher(_prerequisites) {
@ -86,8 +89,9 @@ class Dispatcher(
/** /**
* INTERNAL API * INTERNAL API
*/ */
protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = {
new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue new Mailbox(getMailboxType(actor, mailboxType, mailboxTypeConfigured).create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
}
/** /**
* INTERNAL API * INTERNAL API

View file

@ -174,6 +174,7 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType,
mailBoxTypeConfigured,
configureExecutor(), configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
@ -196,7 +197,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP
config.getString("id"), config.getString("id"),
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, configureExecutor(), mailboxType, mailBoxTypeConfigured, configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork")) config.getBoolean("attempt-teamwork"))
@ -229,7 +230,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
*/ */
override def dispatcher(): MessageDispatcher = override def dispatcher(): MessageDispatcher =
new PinnedDispatcher( new PinnedDispatcher(
prerequisites, null, config.getString("id"), mailboxType, prerequisites, null, config.getString("id"), mailboxType, mailBoxTypeConfigured,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig) Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)
} }

View file

@ -567,7 +567,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Finit
} }
/** /**
* UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents. * UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor. * Extend this class and provide the Comparator in the constructor.
*/ */
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType { class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
@ -579,7 +579,7 @@ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val i
} }
/** /**
* BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents. * BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor. * Extend this class and provide the Comparator in the constructor.
*/ */
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
@ -624,3 +624,13 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
} }
} }
/**
* Trait to signal that an Actor requires a certain type of message queue semantics.
*
* The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config,
* to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used.
*
* The queue type of the created mailbox will be checked against the type T and an error will be logged if it doesn't match.
*/
trait RequiresMessageQueue[T]

View file

@ -0,0 +1,145 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor.{ Actor, DynamicAccess, ActorSystem }
import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging.Warning
import akka.ConfigurationException
import scala.annotation.tailrec
import java.lang.reflect.ParameterizedType
private[akka] class Mailboxes(
val settings: ActorSystem.Settings,
val eventStream: EventStream,
dynamicAccess: DynamicAccess) {
private val mailboxTypeConfigurators = new ConcurrentHashMap[String, Option[MailboxTypeConfigurator]]
private val mailboxBindings: Map[Class[_ <: Any], String] = {
import scala.collection.JavaConverters._
settings.config.getConfig("akka.actor.mailbox.requirements").root.unwrapped.asScala
.toMap.foldLeft(Map.empty[Class[_ <: Any], String]) {
case (m, (k, v))
dynamicAccess.getClassFor[Any](k).map {
case x m.updated(x, v.toString)
}.recover {
case e
throw new ConfigurationException(s"Type [${k}] specified as akka.actor.mailbox.requirement " +
s"[${v}] in config can't be loaded due to [${e.getMessage}]", e)
}.get
}
}
/**
* Returns a mailbox type as specified in configuration, based on the id, or if not defined None.
*/
def lookup(id: String): Option[MailboxType] = lookupConfigurator(id).map { _.mailboxType }
/**
* Returns a mailbox type as specified in configuration, based on the type, or if not defined None.
*/
def lookupByQueueType(queueType: Class[_ <: Any]): Option[MailboxType] =
lookupId(queueType).flatMap(x lookup(x))
private final val rmqClass = classOf[RequiresMessageQueue[_]]
/**
* Return the required message queue type for this class if any.
*/
def getRequiredType(actorClass: Class[_ <: Actor]): Option[Class[_]] = {
@tailrec
def innerRequiredType(classes: Iterator[Class[_]]): Option[Class[_]] = {
if (classes.isEmpty) None
else {
val c = classes.next()
if (rmqClass.isAssignableFrom(c)) {
val ifaces = c.getGenericInterfaces
val tpe = ifaces.collectFirst {
case t: ParameterizedType if rmqClass.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]])
t.getActualTypeArguments.head.asInstanceOf[Class[_]]
}
if (tpe.isDefined) tpe
else innerRequiredType(classes ++ ifaces.map {
case c: Class[_] c
case c: ParameterizedType c.getRawType.asInstanceOf[Class[_]]
} ++ Iterator(c.getSuperclass))
} else {
innerRequiredType(classes)
}
}
}
if (rmqClass.isAssignableFrom(actorClass)) innerRequiredType(Iterator(actorClass))
else None
}
/**
* Check if this class can have a required message queue type.
*/
def hasRequiredType(actorClass: Class[_ <: Actor]): Boolean = rmqClass.isAssignableFrom(actorClass)
private def lookupId(queueType: Class[_]): Option[String] = {
mailboxBindings.get(queueType) match {
case None
eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Mapping for [${queueType}] not configured"))
None
case s s
}
}
private def lookupConfigurator(id: String): Option[MailboxTypeConfigurator] = {
mailboxTypeConfigurators.get(id) match {
case null
// It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
val newConfigurator =
if (settings.config.hasPath(id)) {
Some(new MailboxTypeConfigurator(settings, config(id), dynamicAccess))
} else {
eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Type [${id}] not configured"))
None
}
if (newConfigurator.isDefined) {
mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
case null newConfigurator
case existing existing
}
} else None
case existing existing
}
}
//INTERNAL API
private def config(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" -> id).asJava)
.withFallback(settings.config.getConfig(id))
}
}
private[akka] class MailboxTypeConfigurator(
val settings: ActorSystem.Settings,
val config: Config,
dynamicAccess: DynamicAccess) {
private val instance: MailboxType = {
val id = config.getString("id")
config.getString("mailbox-type") match {
case "" throw new IllegalArgumentException(s"The setting mailbox-type, defined in [${id}}] is empty")
case fqcn
val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> config)
dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception
throw new IllegalArgumentException(
(s"Cannot instantiate MailboxType [${fqcn}], defined in [${id}], make sure it has a public" +
" constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"),
exception)
}).get
}
}
def mailboxType: MailboxType = instance
}

View file

@ -19,6 +19,7 @@ class PinnedDispatcher(
_actor: ActorCell, _actor: ActorCell,
_id: String, _id: String,
_mailboxType: MailboxType, _mailboxType: MailboxType,
_mailboxTypeConfigured: Boolean,
_shutdownTimeout: FiniteDuration, _shutdownTimeout: FiniteDuration,
_threadPoolConfig: ThreadPoolConfig) _threadPoolConfig: ThreadPoolConfig)
extends Dispatcher(_prerequisites, extends Dispatcher(_prerequisites,
@ -26,6 +27,7 @@ class PinnedDispatcher(
Int.MaxValue, Int.MaxValue,
Duration.Zero, Duration.Zero,
_mailboxType, _mailboxType,
_mailboxTypeConfigured,
_threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1), _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
_shutdownTimeout) { _shutdownTimeout) {

View file

@ -23,6 +23,7 @@ object ClusterDeployerSpec {
} }
/user/service2 { /user/service2 {
dispatcher = mydispatcher dispatcher = mydispatcher
mailbox = mymailbox
router = round-robin router = round-robin
nr-of-instances = 20 nr-of-instances = 20
cluster.enabled = on cluster.enabled = on
@ -56,7 +57,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(
totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)),
ClusterScope, ClusterScope,
Deploy.NoDispatcherGiven))) Deploy.NoDispatcherGiven,
Deploy.NoMailboxGiven)))
} }
"be able to parse 'akka.actor.deployment._' with specified cluster deploy routee settings" in { "be able to parse 'akka.actor.deployment._' with specified cluster deploy routee settings" in {
@ -71,7 +73,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(
totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)), totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)),
ClusterScope, ClusterScope,
"mydispatcher"))) "mydispatcher",
"mymailbox")))
} }
} }

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#my-bounded-untyped-actor
import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;
public class MyBoundedUntypedActor extends MyUntypedActor
implements RequiresMessageQueue<BoundedMessageQueueSemantics> {
}
//#my-bounded-untyped-actor

View file

@ -30,6 +30,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
//#imports-custom //#imports-custom
//#imports-required-mailbox
//#imports-required-mailbox
import docs.actor.MyBoundedUntypedActor;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -48,8 +53,8 @@ public class DispatcherDocTestBase {
@BeforeClass @BeforeClass
public static void beforeAll() { public static void beforeAll() {
system = ActorSystem.create("MySystem", system = ActorSystem.create("MySystem",
ConfigFactory.parseString( ConfigFactory.parseString(DispatcherDocSpec.javaConfig()).withFallback(
DispatcherDocSpec.config()).withFallback(AkkaSpec.testConf())); ConfigFactory.parseString(DispatcherDocSpec.config())).withFallback(AkkaSpec.testConf()));
} }
@AfterClass @AfterClass
@ -96,6 +101,33 @@ public class DispatcherDocTestBase {
//#lookup //#lookup
} }
@SuppressWarnings("unused")
@Test
public void defineMailboxInConfig() {
//#defining-mailbox-in-config
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class),
"priomailboxactor");
//#defining-mailbox-in-config
}
@SuppressWarnings("unused")
@Test
public void defineMailboxInCode() {
//#defining-mailbox-in-code
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class)
.withMailbox("prio-mailbox"));
//#defining-mailbox-in-code
}
@SuppressWarnings("unused")
@Test
public void usingARequiredMailbox() {
ActorRef myActor =
system.actorOf(Props.create(MyBoundedUntypedActor.class));
}
@Test @Test
public void priorityDispatcher() throws Exception { public void priorityDispatcher() throws Exception {
JavaTestKit probe = new JavaTestKit(system); JavaTestKit probe = new JavaTestKit(system);

View file

@ -202,14 +202,55 @@ And then an example on how you would use it:
.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher .. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher
It is also possible to configure a mailbox type directly like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-mailbox-config-java,mailbox-deployment-config
And then use it either from deployment like this:
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#defining-mailbox-in-config
Or code like this:
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#defining-mailbox-in-code
Requiring a message queue type for an Actor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It is possible to require a certain type of message queue for a certain type of actor
by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor
The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note:: .. note::
Make sure to include a constructor which takes The type of the queue in the mailbox created for an actor will be checked against the required type in the
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` interface and if the queue doesn't implement the required type an error will be logged.
arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the Mailbox configuration precedence
mailbox type will be instantiated once for each dispatcher using it. ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The order of precedence for the mailbox type of an actor, where lower numbers override higher, is:
1. Mailbox type configured in the deployment of the actor
2. Mailbox type configured on the dispatcher of the actor
3. Mailbox type configured on the Props of the actor
4. Mailbox type configured via message queue requirement
Creating your own Mailbox type Creating your own Mailbox type
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -220,7 +261,8 @@ An example is worth a thousand quacks:
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example .. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
configuration, or the mailbox configuration.
.. note:: .. note::
@ -228,8 +270,9 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config``
arguments, as this constructor is invoked reflectively to construct your arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the the configuration which describes the dispatcher or mailbox setting using
mailbox type will be instantiated once for each dispatcher using it. this mailbox type; the mailbox type will be instantiated once for each
dispatcher or mailbox setting using it.
Special Semantics of ``system.actorOf`` Special Semantics of ``system.actorOf``

View file

@ -245,12 +245,13 @@ directives are part of the :class:`Act` trait):
.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#supervise-with .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#supervise-with
Last but not least there is a little bit of convenience magic built-in, which Last but not least there is a little bit of convenience magic built-in, which
detects if the runtime class of the statically given actor subtype extends the detects if the runtime class of the statically given actor subtype extends the
:class:`Stash` trait (this is a complicated way of saying that ``new Act with :class:`RequiresMessageQueue` trait via the :class:`Stash` trait (this is a
Stash`` would not work because its runtime erased type is just an anonymous complicated way of saying that ``new Act with Stash`` would not work because its
subtype of ``Act``). The purpose is to automatically use a dispatcher with the runtime erased type is just an anonymous subtype of ``Act``). The purpose is to
appropriate deque-based mailbox, ``akka.actor.default-stash-dispatcher``. automatically use the appropriate deque-based mailbox type required by :class:`Stash`.
If you want to use this magic, simply extend :class:`ActWithStash`: If you want to use this magic, simply extend :class:`ActWithStash`:
.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#act-with-stash .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#act-with-stash

View file

@ -12,8 +12,25 @@ import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import docs.dispatcher.DispatcherDocSpec.MyBoundedActor
object DispatcherDocSpec { object DispatcherDocSpec {
val javaConfig = """
//#prio-dispatcher-config-java
prio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox"
//Other dispatcher configuration goes here
}
//#prio-dispatcher-config-java
//#prio-mailbox-config-java
prio-mailbox {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other mailbox configuration goes here
}
//#prio-mailbox-config-java
"""
val config = """ val config = """
//#my-dispatcher-config //#my-dispatcher-config
my-dispatcher { my-dispatcher {
@ -74,7 +91,7 @@ object DispatcherDocSpec {
core-pool-size-factor = 8.0 core-pool-size-factor = 8.0
max-pool-size-factor = 16.0 max-pool-size-factor = 16.0
} }
# Specifies the bounded capacity of the mailbox queue # Specifies the bounded capacity of the message queue
mailbox-capacity = 100 mailbox-capacity = 100
throughput = 3 throughput = 3
} }
@ -94,15 +111,9 @@ object DispatcherDocSpec {
//#prio-dispatcher-config //#prio-dispatcher-config
prio-dispatcher { prio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
}
//#prio-dispatcher-config
//#prio-dispatcher-config-java
prio-dispatcher-java {
mailbox-type = "docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox"
//Other dispatcher configuration goes here //Other dispatcher configuration goes here
} }
//#prio-dispatcher-config-java //#prio-dispatcher-config
//#dispatcher-deployment-config //#dispatcher-deployment-config
akka.actor.deployment { akka.actor.deployment {
@ -111,6 +122,38 @@ object DispatcherDocSpec {
} }
} }
//#dispatcher-deployment-config //#dispatcher-deployment-config
//#prio-mailbox-config
prio-mailbox {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other mailbox configuration goes here
}
//#prio-mailbox-config
//#mailbox-deployment-config
akka.actor.deployment {
/priomailboxactor {
mailbox = prio-mailbox
}
}
//#mailbox-deployment-config
//#bounded-mailbox-config
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
//#bounded-mailbox-config
//#required-mailbox-config
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
//#required-mailbox-config
""" """
//#prio-mailbox //#prio-mailbox
@ -144,6 +187,13 @@ object DispatcherDocSpec {
} }
} }
//#required-mailbox-class
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
class MyBoundedActor extends MyActor with RequiresMessageQueue[BoundedMessageQueueSemantics]
//#required-mailbox-class
//#mailbox-implementation-example //#mailbox-implementation-example
class MyUnboundedMailbox extends akka.dispatch.MailboxType { class MyUnboundedMailbox extends akka.dispatch.MailboxType {
import akka.actor.{ ActorRef, ActorSystem } import akka.actor.{ ActorRef, ActorSystem }
@ -209,6 +259,27 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
//#lookup //#lookup
} }
"defining mailbox in config" in {
val context = system
//#defining-mailbox-in-config
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "priomailboxactor")
//#defining-mailbox-in-config
}
"defining mailbox in code" in {
val context = system
//#defining-mailbox-in-code
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))
//#defining-mailbox-in-code
}
"using a required mailbox" in {
val context = system
val myActor = context.actorOf(Props[MyBoundedActor])
}
"defining priority dispatcher" in { "defining priority dispatcher" in {
new AnyRef { new AnyRef {
//#prio-dispatcher //#prio-dispatcher

View file

@ -204,6 +204,56 @@ And then an example on how you would use it:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher
It is also possible to configure a mailbox type directly like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-mailbox-config,mailbox-deployment-config
And then use it either from deployment like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-config
Or code like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-code
Requiring a message queue type for an Actor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It is possible to require a certain type of message queue for a certain type of actor
by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class
The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note::
The type of the queue in the mailbox created for an actor will be checked against the required type in the
trait and if the queue doesn't implement the required type an error will be logged.
Mailbox configuration precedence
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The order of precedence for the mailbox type of an actor, where lower numbers override higher, is:
1. Mailbox type configured in the deployment of the actor
2. Mailbox type configured on the dispatcher of the actor
3. Mailbox type configured on the Props of the actor
4. Mailbox type configured via message queue requirement
Creating your own Mailbox type Creating your own Mailbox type
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -211,7 +261,8 @@ An example is worth a thousand quacks:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
configuration, or the mailbox configuration.
.. note:: .. note::
@ -219,8 +270,10 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config``
arguments, as this constructor is invoked reflectively to construct your arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the the configuration which describes the dispatcher or mailbox setting using
mailbox type will be instantiated once for each dispatcher using it. this mailbox type; the mailbox type will be instantiated once for each
dispatcher or mailbox setting using it.
Special Semantics of ``system.actorOf`` Special Semantics of ``system.actorOf``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -267,7 +267,7 @@ private[akka] class RemoteActorRefProvider(
} }
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) b withFallback a) match { Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) b withFallback a) match {
case d @ Deploy(_, _, _, RemoteScope(addr), _) case d @ Deploy(_, _, _, RemoteScope(addr), _, _)
if (hasAddress(addr)) { if (hasAddress(addr)) {
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
} else { } else {

View file

@ -129,6 +129,7 @@ object CallingThreadDispatcher {
class CallingThreadDispatcher( class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites, _prerequisites: DispatcherPrerequisites,
val mailboxType: MailboxType, val mailboxType: MailboxType,
val mailboxTypeConfigured: Boolean,
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) { val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._ import CallingThreadDispatcher._
@ -136,7 +137,8 @@ class CallingThreadDispatcher(
override def id: String = Id override def id: String = Id
protected[akka] override def createMailbox(actor: akka.actor.Cell) = new CallingThreadMailbox(actor, mailboxType) protected[akka] override def createMailbox(actor: akka.actor.Cell) =
new CallingThreadMailbox(actor, getMailboxType(actor, mailboxType, mailboxTypeConfigured))
protected[akka] override def shutdown() {} protected[akka] override def shutdown() {}
@ -302,7 +304,7 @@ class CallingThreadDispatcher(
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) { extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new CallingThreadDispatcher(prerequisites, mailboxType()) private val instance = new CallingThreadDispatcher(prerequisites, mailboxType(), mailBoxTypeConfigured)
override def dispatcher(): MessageDispatcher = instance override def dispatcher(): MessageDispatcher = instance
} }