diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala new file mode 100644 index 0000000000..a36a5264e2 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.actor + +import com.typesafe.config.ConfigFactory +import akka.testkit._ +import akka.dispatch._ + +object ActorMailboxSpec { + val mailboxConf = ConfigFactory.parseString(""" + unbounded-dispatcher { + mailbox-type = "akka.dispatch.UnboundedMailbox" + } + + bounded-dispatcher { + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10s + mailbox-type = "akka.dispatch.BoundedMailbox" + } + + unbounded-mailbox { + mailbox-type = "akka.dispatch.UnboundedMailbox" + } + + bounded-mailbox { + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10s + mailbox-type = "akka.dispatch.BoundedMailbox" + } + + akka.actor.deployment { + /default-default { + } + /default-override-from-props { + } + /default-override-from-trait { + } + /default-override-from-stash { + } + /default-bounded { + mailbox = bounded-mailbox + } + /default-unbounded-deque { + mailbox = akka.actor.mailbox.unbounded-deque-based + } + /default-unbounded-deque-override-trait { + mailbox = akka.actor.mailbox.unbounded-deque-based + } + /unbounded-default { + dispatcher = unbounded-dispatcher + } + /unbounded-default-override-trait { + dispatcher = unbounded-dispatcher + } + /unbounded-bounded { + dispatcher= unbounded-dispatcher + mailbox = bounded-mailbox + } + /bounded-default { + dispatcher = bounded-dispatcher + } + /bounded-unbounded { + dispatcher = bounded-dispatcher + mailbox = unbounded-mailbox + } + /bounded-unbounded-override-props { + dispatcher = bounded-dispatcher + mailbox = unbounded-mailbox + } + } + + akka.actor.mailbox.requirements { + "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox + } + """) + + class QueueReportingActor extends Actor { + def receive = { + case _ ⇒ sender ! context.asInstanceOf[ActorCell].mailbox.messageQueue + } + } + + class BoundedQueueReportingActor extends QueueReportingActor with RequiresMessageQueue[BoundedMessageQueueSemantics] + + class StashQueueReportingActor extends QueueReportingActor with Stash + + val UnboundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[UnboundedMessageQueueSemantics]) + val BoundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[BoundedMessageQueueSemantics]) + val UnboundedDeqMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[DequeBasedMessageQueue], + classOf[UnboundedDequeBasedMessageQueueSemantics]) +} + +class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with DefaultTimeout with ImplicitSender { + + import ActorMailboxSpec._ + + def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): Unit = { + val actor = system.actorOf(props, name) + + actor ! "ping" + val q = expectMsgType[MessageQueue] + types foreach (t ⇒ assert(t isInstance q, s"Type [${q.getClass}] is not assignable to [${t}]")) + } + + "An Actor" must { + + "get an unbounded message queue by default" in { + checkMailboxQueue(Props[QueueReportingActor], "default-default", UnboundedMailboxTypes) + } + + "get an unbounded dequeu message queue when it is only configured on the props" in { + checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"), + "default-override-from-props", UnboundedDeqMailboxTypes) + } + + "get an bounded message queue when it's only configured with RequiresMailbox" in { + checkMailboxQueue(Props[BoundedQueueReportingActor], + "default-override-from-trait", BoundedMailboxTypes) + } + + "get an unbounded dequeu message queue when it's only mixed with Stash" in { + checkMailboxQueue(Props[StashQueueReportingActor], + "default-override-from-stash", UnboundedDeqMailboxTypes) + } + + "get a bounded message queue when it's configured as mailbox" in { + checkMailboxQueue(Props[QueueReportingActor], "default-bounded", BoundedMailboxTypes) + } + + "get an unbounded dequeu message queue when it's configured as mailbox" in { + checkMailboxQueue(Props[QueueReportingActor], "default-unbounded-deque", UnboundedDeqMailboxTypes) + } + + "get an unbounded dequeu message queue when it's configured as mailbox overriding RequestMailbox" in { + filterEvents(EventFilter[IllegalArgumentException]()) { + checkMailboxQueue(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait", + UnboundedDeqMailboxTypes) + } + } + + "get an unbounded message queue when defined in dispatcher" in { + checkMailboxQueue(Props[QueueReportingActor], "unbounded-default", UnboundedMailboxTypes) + } + + "get an unbounded message queue when defined in dispatcher overriding RequestMailbox" in { + filterEvents(EventFilter[IllegalArgumentException]()) { + checkMailboxQueue(Props[BoundedQueueReportingActor], "unbounded-default-override-trait", UnboundedMailboxTypes) + } + } + + "get a bounded message queue when it's configured as mailbox overriding unbounded in dispatcher" in { + checkMailboxQueue(Props[QueueReportingActor], "unbounded-bounded", BoundedMailboxTypes) + } + + "get a bounded message queue by when defined in dispatcher" in { + checkMailboxQueue(Props[QueueReportingActor], "bounded-default", BoundedMailboxTypes) + } + + "get an unbounded message queue when it's configured as mailbox overriding bounded in dispatcher" in { + checkMailboxQueue(Props[QueueReportingActor], "bounded-unbounded", UnboundedMailboxTypes) + } + + "get an unbounded message queue overriding configuration on the props" in { + checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"), + "bounded-unbounded-override-props", UnboundedMailboxTypes) + } + + } +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index cd5352d137..bb93c9bb8a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -89,6 +89,7 @@ object ActorSystemSpec { config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, + mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { val doneIt = new Switch diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index d4760b241c..004d5ea921 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -28,6 +28,9 @@ object DeployerSpec { /service3 { dispatcher = my-dispatcher } + /service4 { + mailbox = my-mailbox + } /service-round-robin { router = round-robin } @@ -80,7 +83,8 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment.get.config, NoRouter, NoScopeGiven, - Deploy.NoDispatcherGiven))) + Deploy.NoDispatcherGiven, + Deploy.NoMailboxGiven))) } "use None deployment for undefined service" in { @@ -99,7 +103,22 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment.get.config, NoRouter, NoScopeGiven, - dispatcher = "my-dispatcher"))) + dispatcher = "my-dispatcher", + Deploy.NoMailboxGiven))) + } + + "be able to parse 'akka.actor.deployment._' with mailbox config" in { + val service = "/service4" + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) + + deployment must be(Some( + Deploy( + service, + deployment.get.config, + NoRouter, + NoScopeGiven, + Deploy.NoDispatcherGiven, + mailbox = "my-mailbox"))) } "detect invalid number-of-instances" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 75bd0a2e3d..d779587e64 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -83,6 +83,7 @@ object SupervisorHierarchySpec { config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, + mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 5d4a812e52..ee52045257 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -521,6 +521,7 @@ object DispatcherModelSpec { config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, + mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor @@ -594,6 +595,7 @@ object BalancingDispatcherModelSpec { config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, + mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index c23102c35f..ff04077ed0 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -32,7 +32,7 @@ object CallingThreadDispatcherModelSpec { extends MessageDispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor { + new CallingThreadDispatcher(prerequisites, UnboundedMailbox(), false) with MessageDispatcherInterceptor { override def id: String = config.getString("id") } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index e79a312460..516dcd292e 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -99,6 +99,14 @@ akka { # specified at all. dispatcher = "" + # The id of the mailbox to use for this actor. + # If undefined or empty the default mailbox of the configured dispatcher + # is used or if there is no mailbox configuration the mailbox specified + # in code (Props.withMailbox) is used. + # If there is a mailbox defined in the configured dispatcher then that + # overrides this setting. + mailbox = "" + # routing (load-balance) scheme to use # - available: "from-code", "round-robin", "random", "smallest-mailbox", # "scatter-gather", "broadcast" @@ -191,11 +199,6 @@ akka { } } - # Default dispatcher for Actors that extend Stash - default-stash-dispatcher { - mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" - } - default-dispatcher { # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are @@ -308,6 +311,24 @@ akka { stash-capacity = -1 } + mailbox { + # Mapping between message queue semantics and mailbox configurations. + # Used by akka.dispatch.RequiresMessageQueue[T] to enforce different + # mailbox types on actors. + # If your Actor implements RequiresMessageQueue[T], then when you create + # an instance of that actor its mailbox type will be decided by looking + # up a mailbox configuration via T in this mapping + requirements { + "akka.dispatch.DequeBasedMessageQueue" = akka.actor.mailbox.unbounded-deque-based + } + + unbounded-deque-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public constructor + # with (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + } + } + debug { # enable function of Actor.loggable(), which is to log any received message # at DEBUG level, see the “Testing Actor Systems” section of the Akka diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 61aa820d2d..665eae3fbd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -279,6 +279,10 @@ private[akka] trait Cell { * which may be a costly operation, 0 otherwise. */ def numberOfMessages: Int + /** + * The props for this actor cell. + */ + def props: Props } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 216e8302fb..110f1afbb7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -68,8 +68,7 @@ import java.util.concurrent.TimeUnit * be circumvented by shadowing the name `system` within `"fred"`). * * Note: If you want to use an `Act with Stash`, you should use the - * `ActWithStash` trait in order to have the actor run on a special dispatcher - * (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based + * `ActWithStash` trait in order to have the actor get the necessary deque-based * mailbox setting. */ object ActorDSL extends dsl.Inbox with dsl.Creators { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ef8ff80175..ad32898ee5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -693,11 +693,20 @@ private[akka] class LocalActorRefProvider private[akka] ( } val props2 = - if (lookupDeploy) deployer.lookup(path) match { - case Some(d) if d.dispatcher != Deploy.NoDispatcherGiven ⇒ props.withDispatcher(d.dispatcher) - case _ ⇒ props // dispatcher not specified in deployment config - } - else props + if (lookupDeploy) { + // mailbox and dispatcher defined in deploy should override props + deployer.lookup(path) match { + case Some(d) ⇒ + (d.dispatcher, d.mailbox) match { + case (Deploy.NoDispatcherGiven, Deploy.NoMailboxGiven) ⇒ props + case (dsp, Deploy.NoMailboxGiven) ⇒ props.withDispatcher(dsp) + case (Deploy.NoMailboxGiven, mbx) ⇒ props.withMailbox(mbx) + case (dsp, mbx) ⇒ props.withDispatcher(dsp).withMailbox(mbx) + } + case _ ⇒ props // no deployment config found + } + + } else props if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async) else new LocalActorRef(system, props2, supervisor, path) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8b9e1bb76e..a36e5e8c5f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -319,6 +319,11 @@ abstract class ActorSystem extends ActorRefFactory { */ implicit def dispatcher: ExecutionContext + /** + * Helper object for looking up configured mailbox types. + */ + def mailboxes: Mailboxes + /** * Register a block of code (callback) to run after ActorSystem.shutdown has been issued and * all actors in this actor system have been stopped. @@ -560,6 +565,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher + val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess) + val internalCallingThreadExecutionContext: ExecutionContext = dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse( new ExecutionContext with BatchingExecutor { diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index c3d728fc25..ab5c934109 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -15,6 +15,7 @@ import scala.annotation.tailrec object Deploy { final val NoDispatcherGiven = "" + final val NoMailboxGiven = "" } /** @@ -32,13 +33,14 @@ object Deploy { * val remoteProps = someProps.withDeploy(Deploy(scope = RemoteScope("someOtherNodeName"))) * }}} */ -@SerialVersionUID(1L) +@SerialVersionUID(2L) final case class Deploy( path: String = "", config: Config = ConfigFactory.empty, routerConfig: RouterConfig = NoRouter, scope: Scope = NoScopeGiven, - dispatcher: String = Deploy.NoDispatcherGiven) { + dispatcher: String = Deploy.NoDispatcherGiven, + mailbox: String = Deploy.NoMailboxGiven) { /** * Java API to create a Deploy with the given RouterConfig @@ -61,13 +63,13 @@ final case class Deploy( * other members are merged using ``.withFallback(other.)``. */ def withFallback(other: Deploy): Deploy = { - val disp = if (dispatcher == Deploy.NoDispatcherGiven) other.dispatcher else dispatcher Deploy( path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope), - disp) + if (dispatcher == Deploy.NoDispatcherGiven) other.dispatcher else dispatcher, + if (mailbox == Deploy.NoMailboxGiven) other.mailbox else mailbox) } } @@ -154,7 +156,8 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val deployment = config.withFallback(default) val router = createRouterConfig(deployment.getString("router"), key, config, deployment) val dispatcher = deployment.getString("dispatcher") - Some(Deploy(key, deployment, router, NoScopeGiven, dispatcher)) + val mailbox = deployment.getString("mailbox") + Some(Deploy(key, deployment, router, NoScopeGiven, dispatcher, mailbox)) } /** diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 90aacdb7ff..ed57e1b83f 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -10,7 +10,7 @@ import scala.reflect.ClassTag import akka.routing._ import akka.util.Reflect import scala.annotation.varargs -import Deploy.NoDispatcherGiven +import Deploy.{ NoDispatcherGiven, NoMailboxGiven } import scala.collection.immutable /** @@ -177,6 +177,15 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) { case x ⇒ x } + /** + * Convenience method for extracting the mailbox information from the + * contained [[Deploy]] instance. + */ + def mailbox: Option[String] = deploy.mailbox match { + case NoMailboxGiven ⇒ None + case x ⇒ Some(x) + } + /** * Convenience method for extracting the router configuration from the * contained [[Deploy]] instance. @@ -218,6 +227,11 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) { */ def withDispatcher(d: String): Props = copy(deploy = deploy.copy(dispatcher = d)) + /** + * Returns a new Props with the specified mailbox set. + */ + def withMailbox(m: String): Props = copy(deploy = deploy.copy(mailbox = m)) + /** * Returns a new Props with the specified router config set. */ @@ -309,3 +323,11 @@ private[akka] class CreatorConsumer(creator: Creator[Actor]) extends IndirectAct override def actorClass = classOf[Actor] override def produce() = creator.create() } + +/** + * INTERNAL API + */ +private[akka] class TypedCreatorFunctionConsumer(clz: Class[_ <: Actor], creator: () ⇒ Actor) extends IndirectActorProducer { + override def actorClass = clz + override def produce() = creator() +} diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 1df263e399..abefa9524d 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -3,7 +3,7 @@ */ package akka.actor -import akka.dispatch.{ Envelope, DequeBasedMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, Envelope, DequeBasedMessageQueue } import akka.AkkaException /** @@ -47,7 +47,7 @@ import akka.AkkaException * any trait/class that overrides the `preRestart` callback. This means it's not possible to write * `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`. */ -trait Stash extends Actor { +trait Stash extends Actor with RequiresMessageQueue[DequeBasedMessageQueue] { /* The private stash of the actor. It is only accessible using `stash()` and * `unstashAll()`. diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala index bcbcd52cbd..ffc973e491 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -5,22 +5,16 @@ package akka.actor.dsl import scala.concurrent.Await -import akka.actor.ActorLogging +import akka.actor._ import scala.collection.immutable.TreeSet import scala.concurrent.duration._ -import akka.actor.Cancellable -import akka.actor.{ Actor, Stash, SupervisorStrategy } import scala.collection.mutable.Queue -import akka.actor.{ ActorSystem, ActorRefFactory } -import akka.actor.ActorRef import akka.util.Timeout -import akka.actor.Status import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask -import akka.actor.ActorDSL -import akka.actor.Props import scala.reflect.ClassTag +import akka.dispatch.{ UnboundedDequeBasedMailbox, RequiresMessageQueue } trait Creators { this: ActorDSL.type ⇒ @@ -154,10 +148,7 @@ trait Creators { this: ActorDSL.type ⇒ trait ActWithStash extends Act with Stash private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props = - if (classOf[Stash].isAssignableFrom(classOfActor)) - Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher") - else - Props(creator = ctor) + Props(classOf[TypedCreatorFunctionConsumer], classOfActor, ctor) /** * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index b6b1880bbe..96c776714e 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -46,7 +46,20 @@ private[akka] trait Dispatch { this: ActorCell ⇒ * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ - swapMailbox(dispatcher.createMailbox(this)) + val mbox = dispatcher.createMailbox(this) + val actorClass = this.props.actorClass + if (this.system.mailboxes.hasRequiredType(actorClass)) { + this.system.mailboxes.getRequiredType(actorClass).foreach { + case c if !c.isAssignableFrom(mbox.messageQueue.getClass) ⇒ + // FIXME 3237 throw an exception here instead of just logging it, + // and update the comment on the RequiresMessageQueue trait + val e = new IllegalArgumentException(s"Actor [${this.self.path}] requires mailbox type [${c}]" + + s" got [${mbox.messageQueue.getClass}]") + this.systemImpl.eventStream.publish(Error(e, getClass.getName, getClass, e.getMessage)) + case _ ⇒ + } + } + swapMailbox(mbox) mailbox.setActor(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 5d7cc9a30a..2de53ae4f1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -126,6 +126,17 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ protected[akka] def createMailbox(actor: Cell): Mailbox //FIXME should this really be private[akka]? + /** + * Finds out the mailbox type for an actor based on configuration, props and requirements. + */ + protected[akka] def getMailboxType(actor: Cell, mailboxType: MailboxType, mailboxTypeConfigured: Boolean): MailboxType = + actor.props.mailbox.flatMap(id ⇒ actor.system.mailboxes.lookup(id)) match { + case Some(x) ⇒ x + case None if mailboxTypeConfigured ⇒ mailboxType + case None ⇒ actor.system.mailboxes.getRequiredType(actor.props.actorClass). + flatMap(c ⇒ actor.system.mailboxes.lookupByQueueType(c)).getOrElse(mailboxType) + } + /** * Identifier of this dispatcher, corresponds to the full key * of the dispatcher configuration. @@ -351,6 +362,11 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit } } + /** + * Was the mailbox type configured or derived? + */ + def mailBoxTypeConfigured: Boolean = config.getString("mailbox-type") != Deploy.NoMailboxGiven + def configureExecutor(): ExecutorServiceConfigurator = { config.getString("executor") match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 3942762e09..26795f5aba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -34,10 +34,11 @@ class BalancingDispatcher( throughput: Int, throughputDeadlineTime: Duration, mailboxType: MailboxType, + _mailBoxTypeConfigured: Boolean, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _shutdownTimeout: FiniteDuration, attemptTeamWork: Boolean) - extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { + extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _mailBoxTypeConfigured, _executorServiceFactoryProvider, _shutdownTimeout) { /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 74c765f401..eb97628dfc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -14,6 +14,8 @@ import scala.concurrent.forkjoin.ForkJoinPool import scala.concurrent.duration.Duration import scala.concurrent.Awaitable import scala.concurrent.duration.FiniteDuration +import scala.annotation.tailrec +import java.lang.reflect.ParameterizedType /** * The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a @@ -33,6 +35,7 @@ class Dispatcher( val throughput: Int, val throughputDeadlineTime: Duration, val mailboxType: MailboxType, + val mailboxTypeConfigured: Boolean, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, val shutdownTimeout: FiniteDuration) extends MessageDispatcher(_prerequisites) { @@ -86,8 +89,9 @@ class Dispatcher( /** * INTERNAL API */ - protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = - new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue + protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = { + new Mailbox(getMailboxType(actor, mailboxType, mailboxTypeConfigured).create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue + } /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index b5a91618be..bca09ce56f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -174,6 +174,7 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, + mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) @@ -196,7 +197,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, configureExecutor(), + mailboxType, mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), config.getBoolean("attempt-teamwork")) @@ -229,7 +230,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer */ override def dispatcher(): MessageDispatcher = new PinnedDispatcher( - prerequisites, null, config.getString("id"), mailboxType, + prerequisites, null, config.getString("id"), mailboxType, mailBoxTypeConfigured, Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index fc665dc86b..f3475e0ad3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -567,7 +567,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Finit } /** - * UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents. + * UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents. * Extend this class and provide the Comparator in the constructor. */ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType { @@ -579,7 +579,7 @@ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val i } /** - * BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents. + * BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents. * Extend this class and provide the Comparator in the constructor. */ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { @@ -624,3 +624,13 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut } } + +/** + * Trait to signal that an Actor requires a certain type of message queue semantics. + * + * The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config, + * to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used. + * + * The queue type of the created mailbox will be checked against the type T and an error will be logged if it doesn't match. + */ +trait RequiresMessageQueue[T] diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala new file mode 100644 index 0000000000..2633eeeb8c --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.dispatch + +import com.typesafe.config.{ ConfigFactory, Config } +import akka.actor.{ Actor, DynamicAccess, ActorSystem } +import akka.event.EventStream +import java.util.concurrent.ConcurrentHashMap +import akka.event.Logging.Warning +import akka.ConfigurationException +import scala.annotation.tailrec +import java.lang.reflect.ParameterizedType + +private[akka] class Mailboxes( + val settings: ActorSystem.Settings, + val eventStream: EventStream, + dynamicAccess: DynamicAccess) { + + private val mailboxTypeConfigurators = new ConcurrentHashMap[String, Option[MailboxTypeConfigurator]] + + private val mailboxBindings: Map[Class[_ <: Any], String] = { + import scala.collection.JavaConverters._ + settings.config.getConfig("akka.actor.mailbox.requirements").root.unwrapped.asScala + .toMap.foldLeft(Map.empty[Class[_ <: Any], String]) { + case (m, (k, v)) ⇒ + dynamicAccess.getClassFor[Any](k).map { + case x ⇒ m.updated(x, v.toString) + }.recover { + case e ⇒ + throw new ConfigurationException(s"Type [${k}] specified as akka.actor.mailbox.requirement " + + s"[${v}] in config can't be loaded due to [${e.getMessage}]", e) + }.get + } + } + + /** + * Returns a mailbox type as specified in configuration, based on the id, or if not defined None. + */ + def lookup(id: String): Option[MailboxType] = lookupConfigurator(id).map { _.mailboxType } + + /** + * Returns a mailbox type as specified in configuration, based on the type, or if not defined None. + */ + def lookupByQueueType(queueType: Class[_ <: Any]): Option[MailboxType] = + lookupId(queueType).flatMap(x ⇒ lookup(x)) + + private final val rmqClass = classOf[RequiresMessageQueue[_]] + /** + * Return the required message queue type for this class if any. + */ + def getRequiredType(actorClass: Class[_ <: Actor]): Option[Class[_]] = { + @tailrec + def innerRequiredType(classes: Iterator[Class[_]]): Option[Class[_]] = { + if (classes.isEmpty) None + else { + val c = classes.next() + if (rmqClass.isAssignableFrom(c)) { + val ifaces = c.getGenericInterfaces + val tpe = ifaces.collectFirst { + case t: ParameterizedType if rmqClass.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]]) ⇒ + t.getActualTypeArguments.head.asInstanceOf[Class[_]] + } + if (tpe.isDefined) tpe + else innerRequiredType(classes ++ ifaces.map { + case c: Class[_] ⇒ c + case c: ParameterizedType ⇒ c.getRawType.asInstanceOf[Class[_]] + } ++ Iterator(c.getSuperclass)) + } else { + innerRequiredType(classes) + } + } + } + if (rmqClass.isAssignableFrom(actorClass)) innerRequiredType(Iterator(actorClass)) + else None + } + + /** + * Check if this class can have a required message queue type. + */ + def hasRequiredType(actorClass: Class[_ <: Actor]): Boolean = rmqClass.isAssignableFrom(actorClass) + + private def lookupId(queueType: Class[_]): Option[String] = { + mailboxBindings.get(queueType) match { + case None ⇒ + eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Mapping for [${queueType}] not configured")) + None + case s ⇒ s + } + } + + private def lookupConfigurator(id: String): Option[MailboxTypeConfigurator] = { + mailboxTypeConfigurators.get(id) match { + case null ⇒ + // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. + val newConfigurator = + if (settings.config.hasPath(id)) { + Some(new MailboxTypeConfigurator(settings, config(id), dynamicAccess)) + } else { + eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Type [${id}] not configured")) + None + } + + if (newConfigurator.isDefined) { + mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match { + case null ⇒ newConfigurator + case existing ⇒ existing + } + } else None + + case existing ⇒ existing + } + } + + //INTERNAL API + private def config(id: String): Config = { + import scala.collection.JavaConverters._ + ConfigFactory.parseMap(Map("id" -> id).asJava) + .withFallback(settings.config.getConfig(id)) + } +} + +private[akka] class MailboxTypeConfigurator( + val settings: ActorSystem.Settings, + val config: Config, + dynamicAccess: DynamicAccess) { + private val instance: MailboxType = { + val id = config.getString("id") + config.getString("mailbox-type") match { + case "" ⇒ throw new IllegalArgumentException(s"The setting mailbox-type, defined in [${id}}] is empty") + case fqcn ⇒ + val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> config) + dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ + case exception ⇒ + throw new IllegalArgumentException( + (s"Cannot instantiate MailboxType [${fqcn}], defined in [${id}], make sure it has a public" + + " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"), + exception) + }).get + } + } + + def mailboxType: MailboxType = instance +} diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index 1b645a64ea..adcf87d118 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -19,6 +19,7 @@ class PinnedDispatcher( _actor: ActorCell, _id: String, _mailboxType: MailboxType, + _mailboxTypeConfigured: Boolean, _shutdownTimeout: FiniteDuration, _threadPoolConfig: ThreadPoolConfig) extends Dispatcher(_prerequisites, @@ -26,6 +27,7 @@ class PinnedDispatcher( Int.MaxValue, Duration.Zero, _mailboxType, + _mailboxTypeConfigured, _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1), _shutdownTimeout) { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 74c839ae80..4fec9b0630 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -23,6 +23,7 @@ object ClusterDeployerSpec { } /user/service2 { dispatcher = mydispatcher + mailbox = mymailbox router = round-robin nr-of-instances = 20 cluster.enabled = on @@ -56,7 +57,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), ClusterScope, - Deploy.NoDispatcherGiven))) + Deploy.NoDispatcherGiven, + Deploy.NoMailboxGiven))) } "be able to parse 'akka.actor.deployment._' with specified cluster deploy routee settings" in { @@ -71,7 +73,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)), ClusterScope, - "mydispatcher"))) + "mydispatcher", + "mymailbox"))) } } diff --git a/akka-docs/rst/java/code/docs/actor/MyBoundedUntypedActor.java b/akka-docs/rst/java/code/docs/actor/MyBoundedUntypedActor.java new file mode 100644 index 0000000000..af34cbe638 --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/MyBoundedUntypedActor.java @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.actor; + +//#my-bounded-untyped-actor +import akka.dispatch.BoundedMessageQueueSemantics; +import akka.dispatch.RequiresMessageQueue; + +public class MyBoundedUntypedActor extends MyUntypedActor + implements RequiresMessageQueue { +} +//#my-bounded-untyped-actor diff --git a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java index 0741bcaece..6fd18d535a 100644 --- a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java @@ -30,6 +30,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; //#imports-custom +//#imports-required-mailbox + +//#imports-required-mailbox + +import docs.actor.MyBoundedUntypedActor; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -48,8 +53,8 @@ public class DispatcherDocTestBase { @BeforeClass public static void beforeAll() { system = ActorSystem.create("MySystem", - ConfigFactory.parseString( - DispatcherDocSpec.config()).withFallback(AkkaSpec.testConf())); + ConfigFactory.parseString(DispatcherDocSpec.javaConfig()).withFallback( + ConfigFactory.parseString(DispatcherDocSpec.config())).withFallback(AkkaSpec.testConf())); } @AfterClass @@ -96,6 +101,33 @@ public class DispatcherDocTestBase { //#lookup } + @SuppressWarnings("unused") + @Test + public void defineMailboxInConfig() { + //#defining-mailbox-in-config + ActorRef myActor = + system.actorOf(Props.create(MyUntypedActor.class), + "priomailboxactor"); + //#defining-mailbox-in-config + } + + @SuppressWarnings("unused") + @Test + public void defineMailboxInCode() { + //#defining-mailbox-in-code + ActorRef myActor = + system.actorOf(Props.create(MyUntypedActor.class) + .withMailbox("prio-mailbox")); + //#defining-mailbox-in-code + } + + @SuppressWarnings("unused") + @Test + public void usingARequiredMailbox() { + ActorRef myActor = + system.actorOf(Props.create(MyBoundedUntypedActor.class)); + } + @Test public void priorityDispatcher() throws Exception { JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-docs/rst/java/dispatchers.rst b/akka-docs/rst/java/dispatchers.rst index 8d1888c5f4..0557ef5f62 100644 --- a/akka-docs/rst/java/dispatchers.rst +++ b/akka-docs/rst/java/dispatchers.rst @@ -202,14 +202,55 @@ And then an example on how you would use it: .. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher +It is also possible to configure a mailbox type directly like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-mailbox-config-java,mailbox-deployment-config + +And then use it either from deployment like this: + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#defining-mailbox-in-config + +Or code like this: + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#defining-mailbox-in-code + + +Requiring a message queue type for an Actor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +It is possible to require a certain type of message queue for a certain type of actor +by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is +an example: + +.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor + +The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in +configuration like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: bounded-mailbox-config,required-mailbox-config + +Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded +mailbox. If the actor has a different mailbox configured in deployment, either directly or via +a dispatcher with a specified mailbox type, then that will override this mapping. + .. note:: - Make sure to include a constructor which takes - ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` - arguments, as this constructor is invoked reflectively to construct your - mailbox type. The config passed in as second argument is that section from - the configuration which describes the dispatcher using this mailbox type; the - mailbox type will be instantiated once for each dispatcher using it. + The type of the queue in the mailbox created for an actor will be checked against the required type in the + interface and if the queue doesn't implement the required type an error will be logged. + + +Mailbox configuration precedence +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The order of precedence for the mailbox type of an actor, where lower numbers override higher, is: + +1. Mailbox type configured in the deployment of the actor +2. Mailbox type configured on the dispatcher of the actor +3. Mailbox type configured on the Props of the actor +4. Mailbox type configured via message queue requirement + Creating your own Mailbox type ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -220,7 +261,8 @@ An example is worth a thousand quacks: .. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example -And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher +configuration, or the mailbox configuration. .. note:: @@ -228,8 +270,9 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from - the configuration which describes the dispatcher using this mailbox type; the - mailbox type will be instantiated once for each dispatcher using it. + the configuration which describes the dispatcher or mailbox setting using + this mailbox type; the mailbox type will be instantiated once for each + dispatcher or mailbox setting using it. Special Semantics of ``system.actorOf`` diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index d83c90fa7d..c322deb6ad 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -245,12 +245,13 @@ directives are part of the :class:`Act` trait): .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#supervise-with + Last but not least there is a little bit of convenience magic built-in, which detects if the runtime class of the statically given actor subtype extends the -:class:`Stash` trait (this is a complicated way of saying that ``new Act with -Stash`` would not work because its runtime erased type is just an anonymous -subtype of ``Act``). The purpose is to automatically use a dispatcher with the -appropriate deque-based mailbox, ``akka.actor.default-stash-dispatcher``. +:class:`RequiresMessageQueue` trait via the :class:`Stash` trait (this is a +complicated way of saying that ``new Act with Stash`` would not work because its +runtime erased type is just an anonymous subtype of ``Act``). The purpose is to +automatically use the appropriate deque-based mailbox type required by :class:`Stash`. If you want to use this magic, simply extend :class:`ActWithStash`: .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#act-with-stash diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index 4c803d4a7c..d5d2116bbc 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -12,8 +12,25 @@ import akka.event.Logging import akka.event.LoggingAdapter import scala.concurrent.duration._ import akka.actor._ +import docs.dispatcher.DispatcherDocSpec.MyBoundedActor object DispatcherDocSpec { + val javaConfig = """ + //#prio-dispatcher-config-java + prio-dispatcher { + mailbox-type = "docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox" + //Other dispatcher configuration goes here + } + //#prio-dispatcher-config-java + + //#prio-mailbox-config-java + prio-mailbox { + mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" + //Other mailbox configuration goes here + } + //#prio-mailbox-config-java + """ + val config = """ //#my-dispatcher-config my-dispatcher { @@ -74,7 +91,7 @@ object DispatcherDocSpec { core-pool-size-factor = 8.0 max-pool-size-factor = 16.0 } - # Specifies the bounded capacity of the mailbox queue + # Specifies the bounded capacity of the message queue mailbox-capacity = 100 throughput = 3 } @@ -94,15 +111,9 @@ object DispatcherDocSpec { //#prio-dispatcher-config prio-dispatcher { mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" - } - //#prio-dispatcher-config - - //#prio-dispatcher-config-java - prio-dispatcher-java { - mailbox-type = "docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox" //Other dispatcher configuration goes here } - //#prio-dispatcher-config-java + //#prio-dispatcher-config //#dispatcher-deployment-config akka.actor.deployment { @@ -111,6 +122,38 @@ object DispatcherDocSpec { } } //#dispatcher-deployment-config + + //#prio-mailbox-config + prio-mailbox { + mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" + //Other mailbox configuration goes here + } + //#prio-mailbox-config + + //#mailbox-deployment-config + + akka.actor.deployment { + /priomailboxactor { + mailbox = prio-mailbox + } + } + //#mailbox-deployment-config + + //#bounded-mailbox-config + bounded-mailbox { + mailbox-type = "akka.dispatch.BoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10s + } + //#bounded-mailbox-config + + //#required-mailbox-config + + akka.actor.mailbox.requirements { + "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox + } + //#required-mailbox-config + """ //#prio-mailbox @@ -144,6 +187,13 @@ object DispatcherDocSpec { } } + //#required-mailbox-class + import akka.dispatch.RequiresMessageQueue + import akka.dispatch.BoundedMessageQueueSemantics + + class MyBoundedActor extends MyActor with RequiresMessageQueue[BoundedMessageQueueSemantics] + //#required-mailbox-class + //#mailbox-implementation-example class MyUnboundedMailbox extends akka.dispatch.MailboxType { import akka.actor.{ ActorRef, ActorSystem } @@ -209,6 +259,27 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { //#lookup } + "defining mailbox in config" in { + val context = system + //#defining-mailbox-in-config + import akka.actor.Props + val myActor = context.actorOf(Props[MyActor], "priomailboxactor") + //#defining-mailbox-in-config + } + + "defining mailbox in code" in { + val context = system + //#defining-mailbox-in-code + import akka.actor.Props + val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox")) + //#defining-mailbox-in-code + } + + "using a required mailbox" in { + val context = system + val myActor = context.actorOf(Props[MyBoundedActor]) + } + "defining priority dispatcher" in { new AnyRef { //#prio-dispatcher diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index 2dad196a0e..3bee1126fd 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -204,6 +204,56 @@ And then an example on how you would use it: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher +It is also possible to configure a mailbox type directly like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-mailbox-config,mailbox-deployment-config + +And then use it either from deployment like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-config + +Or code like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-code + + +Requiring a message queue type for an Actor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +It is possible to require a certain type of message queue for a certain type of actor +by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is +an example: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class + +The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in +configuration like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: bounded-mailbox-config,required-mailbox-config + +Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded +mailbox. If the actor has a different mailbox configured in deployment, either directly or via +a dispatcher with a specified mailbox type, then that will override this mapping. + +.. note:: + + The type of the queue in the mailbox created for an actor will be checked against the required type in the + trait and if the queue doesn't implement the required type an error will be logged. + + +Mailbox configuration precedence +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The order of precedence for the mailbox type of an actor, where lower numbers override higher, is: + +1. Mailbox type configured in the deployment of the actor +2. Mailbox type configured on the dispatcher of the actor +3. Mailbox type configured on the Props of the actor +4. Mailbox type configured via message queue requirement + + Creating your own Mailbox type ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -211,7 +261,8 @@ An example is worth a thousand quacks: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example -And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher +configuration, or the mailbox configuration. .. note:: @@ -219,8 +270,10 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from - the configuration which describes the dispatcher using this mailbox type; the - mailbox type will be instantiated once for each dispatcher using it. + the configuration which describes the dispatcher or mailbox setting using + this mailbox type; the mailbox type will be instantiated once for each + dispatcher or mailbox setting using it. + Special Semantics of ``system.actorOf`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5d80522c90..ee4a6bbe1b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -267,7 +267,7 @@ private[akka] class RemoteActorRefProvider( } Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { - case d @ Deploy(_, _, _, RemoteScope(addr), _) ⇒ + case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒ if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 72d1f83805..70b7fe4992 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -129,6 +129,7 @@ object CallingThreadDispatcher { class CallingThreadDispatcher( _prerequisites: DispatcherPrerequisites, val mailboxType: MailboxType, + val mailboxTypeConfigured: Boolean, val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) { import CallingThreadDispatcher._ @@ -136,7 +137,8 @@ class CallingThreadDispatcher( override def id: String = Id - protected[akka] override def createMailbox(actor: akka.actor.Cell) = new CallingThreadMailbox(actor, mailboxType) + protected[akka] override def createMailbox(actor: akka.actor.Cell) = + new CallingThreadMailbox(actor, getMailboxType(actor, mailboxType, mailboxTypeConfigured)) protected[akka] override def shutdown() {} @@ -302,7 +304,7 @@ class CallingThreadDispatcher( class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance = new CallingThreadDispatcher(prerequisites, mailboxType()) + private val instance = new CallingThreadDispatcher(prerequisites, mailboxType(), mailBoxTypeConfigured) override def dispatcher(): MessageDispatcher = instance }