diff --git a/akka-actor-tests/src/test/java/akka/actor/ActorCreationTest.java b/akka-actor-tests/src/test/java/akka/actor/ActorCreationTest.java index b16523af5b..2b54d51b0d 100644 --- a/akka-actor-tests/src/test/java/akka/actor/ActorCreationTest.java +++ b/akka-actor-tests/src/test/java/akka/actor/ActorCreationTest.java @@ -47,6 +47,14 @@ public class ActorCreationTest { } } + static interface I extends Creator {} + static class F implements I { + @Override + public UntypedActor create() { + return null; + } + } + @Test public void testRightCreator() { final Props p = Props.create(new C()); @@ -65,4 +73,10 @@ public class ActorCreationTest { assertEquals(UntypedActor.class, p.actorClass()); } + @Test + public void testSuperinterface() { + final Props p = Props.create(new F()); + assertEquals(UntypedActor.class, p.actorClass()); + } + } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala index 9e256f2643..a8a02e5bad 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala @@ -9,6 +9,7 @@ import akka.testkit._ import akka.dispatch._ import akka.TestUtils.verifyActorTermination import scala.concurrent.duration.Duration +import akka.ConfigurationException object ActorMailboxSpec { val mailboxConf = ConfigFactory.parseString(""" @@ -22,6 +23,13 @@ object ActorMailboxSpec { mailbox-type = "akka.dispatch.BoundedMailbox" } + requiring-bounded-dispatcher { + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10s + mailbox-type = "akka.dispatch.BoundedMailbox" + mailbox-requirement = "akka.dispatch.BoundedMessageQueueSemantics" + } + unbounded-mailbox { mailbox-type = "akka.dispatch.UnboundedMailbox" } @@ -80,10 +88,32 @@ object ActorMailboxSpec { dispatcher = bounded-dispatcher mailbox = unbounded-mailbox } - } - - akka.actor.mailbox.requirements { - "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox + /bounded-deque-requirements-configured { + dispatcher = requiring-bounded-dispatcher + mailbox = akka.actor.mailbox.bounded-deque-based + } + /bounded-deque-require-unbounded-configured { + dispatcher = requiring-bounded-dispatcher + mailbox = akka.actor.mailbox.unbounded-deque-based + } + /bounded-deque-require-unbounded-unconfigured { + dispatcher = requiring-bounded-dispatcher + } + /bounded-deque-requirements-configured-props-disp { + mailbox = akka.actor.mailbox.bounded-deque-based + } + /bounded-deque-require-unbounded-configured-props-disp { + mailbox = akka.actor.mailbox.unbounded-deque-based + } + /bounded-deque-requirements-configured-props-mail { + dispatcher = requiring-bounded-dispatcher + } + /bounded-deque-require-unbounded-configured-props-mail { + dispatcher = requiring-bounded-dispatcher + } + /bounded-deque-require-unbounded-unconfigured-props-mail { + dispatcher = requiring-bounded-dispatcher + } } """) @@ -97,10 +127,16 @@ object ActorMailboxSpec { class StashQueueReportingActor extends QueueReportingActor with Stash - val UnboundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[UnboundedMessageQueueSemantics]) - val BoundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[BoundedMessageQueueSemantics]) - val UnboundedDeqMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[DequeBasedMessageQueue], + val UnboundedMailboxTypes = Seq(classOf[UnboundedMessageQueueSemantics]) + val BoundedMailboxTypes = Seq(classOf[BoundedMessageQueueSemantics]) + val UnboundedDeqMailboxTypes = Seq( + classOf[DequeBasedMessageQueueSemantics], + classOf[UnboundedMessageQueueSemantics], classOf[UnboundedDequeBasedMessageQueueSemantics]) + val BoundedDeqMailboxTypes = Seq( + classOf[DequeBasedMessageQueueSemantics], + classOf[BoundedMessageQueueSemantics], + classOf[BoundedDequeBasedMessageQueueSemantics]) } class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with DefaultTimeout with ImplicitSender { @@ -122,7 +158,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau checkMailboxQueue(Props[QueueReportingActor], "default-default", UnboundedMailboxTypes) } - "get an unbounded dequeu message queue when it is only configured on the props" in { + "get an unbounded deque message queue when it is only configured on the props" in { checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"), "default-override-from-props", UnboundedDeqMailboxTypes) } @@ -132,7 +168,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau "default-override-from-trait", BoundedMailboxTypes) } - "get an unbounded dequeu message queue when it's only mixed with Stash" in { + "get an unbounded deque message queue when it's only mixed with Stash" in { checkMailboxQueue(Props[StashQueueReportingActor], "default-override-from-stash", UnboundedDeqMailboxTypes) } @@ -141,14 +177,12 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau checkMailboxQueue(Props[QueueReportingActor], "default-bounded", BoundedMailboxTypes) } - "get an unbounded dequeu message queue when it's configured as mailbox" in { + "get an unbounded deque message queue when it's configured as mailbox" in { checkMailboxQueue(Props[QueueReportingActor], "default-unbounded-deque", UnboundedDeqMailboxTypes) } "fail to create actor when an unbounded dequeu message queue is configured as mailbox overriding RequestMailbox" in { - filterEvents(EventFilter[ActorInitializationException]()) { - verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait")) - } + intercept[ConfigurationException](system.actorOf(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait")) } "get an unbounded message queue when defined in dispatcher" in { @@ -156,9 +190,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau } "fail to create actor when an unbounded message queue is defined in dispatcher overriding RequestMailbox" in { - filterEvents(EventFilter[ActorInitializationException]()) { - verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "unbounded-default-override-trait")) - } + intercept[ConfigurationException](system.actorOf(Props[BoundedQueueReportingActor], "unbounded-default-override-trait")) } "get a bounded message queue when it's configured as mailbox overriding unbounded in dispatcher" in { @@ -183,5 +215,84 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau "bounded-unbounded-override-props", UnboundedMailboxTypes) } + "get a bounded deque-based message queue if configured and required" in { + checkMailboxQueue(Props[StashQueueReportingActor], "bounded-deque-requirements-configured", BoundedDeqMailboxTypes) + } + + "fail with a unbounded deque-based message queue if configured and required" in { + intercept[ConfigurationException](system.actorOf(Props[StashQueueReportingActor], "bounded-deque-require-unbounded-configured")) + } + + "fail with a bounded deque-based message queue if not configured" in { + intercept[ConfigurationException](system.actorOf(Props[StashQueueReportingActor], "bounded-deque-require-unbounded-unconfigured")) + } + + "get a bounded deque-based message queue if configured and required with Props" in { + checkMailboxQueue( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher") + .withMailbox("akka.actor.mailbox.bounded-deque-based"), + "bounded-deque-requirements-configured-props", + BoundedDeqMailboxTypes) + } + + "fail with a unbounded deque-based message queue if configured and required with Props" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher") + .withMailbox("akka.actor.mailbox.unbounded-deque-based"), + "bounded-deque-require-unbounded-configured-props")) + } + + "fail with a bounded deque-based message queue if not configured with Props" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher"), + "bounded-deque-require-unbounded-unconfigured-props")) + } + + "get a bounded deque-based message queue if configured and required with Props (dispatcher)" in { + checkMailboxQueue( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher"), + "bounded-deque-requirements-configured-props-disp", + BoundedDeqMailboxTypes) + } + + "fail with a unbounded deque-based message queue if configured and required with Props (dispatcher)" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher"), + "bounded-deque-require-unbounded-configured-props-disp")) + } + + "fail with a bounded deque-based message queue if not configured with Props (dispatcher)" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor] + .withDispatcher("requiring-bounded-dispatcher"), + "bounded-deque-require-unbounded-unconfigured-props-disp")) + } + + "get a bounded deque-based message queue if configured and required with Props (mailbox)" in { + checkMailboxQueue( + Props[StashQueueReportingActor] + .withMailbox("akka.actor.mailbox.bounded-deque-based"), + "bounded-deque-requirements-configured-props-mail", + BoundedDeqMailboxTypes) + } + + "fail with a unbounded deque-based message queue if configured and required with Props (mailbox)" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor] + .withMailbox("akka.actor.mailbox.unbounded-deque-based"), + "bounded-deque-require-unbounded-configured-props-mail")) + } + + "fail with a bounded deque-based message queue if not configured with Props (mailbox)" in { + intercept[ConfigurationException](system.actorOf( + Props[StashQueueReportingActor], + "bounded-deque-require-unbounded-unconfigured-props-mail")) + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index f83e599063..f47dc9d6bc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -84,12 +84,10 @@ object ActorSystemSpec { class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) { private val instance = new Dispatcher( - prerequisites, + this, config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { val doneIt = new Switch diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 23f1e013e4..e619198be2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -269,7 +269,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { case Failure(e) if check(n, e) ⇒ if (delay.isDefined) { executor match { - case m: MessageDispatcher ⇒ m.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1)) + case m: MessageDispatcher ⇒ m.configurator.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1)) case _ ⇒ // Thread.sleep, ignore, or other? } } else run(n + 1) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 3200a0bbd7..93d7891711 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -78,12 +78,10 @@ object SupervisorHierarchySpec { extends DispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new Dispatcher(prerequisites, + new Dispatcher(this, config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 4bdceed888..14f03799dc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -529,12 +529,10 @@ object DispatcherModelSpec { extends MessageDispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new Dispatcher(prerequisites, + new Dispatcher(this, config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor @@ -600,20 +598,17 @@ object BalancingDispatcherModelSpec { } class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { + extends BalancingDispatcherConfigurator(config, prerequisites) { - private val instance: MessageDispatcher = - new BalancingDispatcher(prerequisites, + override protected def create(mailboxType: MailboxType): BalancingDispatcher = + new BalancingDispatcher(this, config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, - mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor - - override def dispatcher(): MessageDispatcher = instance } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index c3c38594ac..d1f04ba303 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -63,9 +63,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin { c.getString("type") must equal("Dispatcher") c.getString("executor") must equal("fork-join-executor") - c.getInt("mailbox-capacity") must equal(-1) - c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000) - c.getString("mailbox-type") must be("") c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) @@ -134,6 +131,18 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin io.getInt("default-backlog") must be(ioExtSettings.defaultBacklog) } } + + { + val c = config.getConfig("akka.actor.default-mailbox") + + // general mailbox config + + { + c.getInt("mailbox-capacity") must equal(1000) + c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000) + c.getString("mailbox-type") must be("akka.dispatch.UnboundedMailbox") + } + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 136f9dd8b5..ce90666293 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -209,7 +209,7 @@ object CustomMailboxSpec { } } - class MyMailbox(owner: ActorRef) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + class MyMailbox(owner: ActorRef) extends UnboundedQueueBasedMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index ff04077ed0..641440ad36 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -32,7 +32,7 @@ object CallingThreadDispatcherModelSpec { extends MessageDispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new CallingThreadDispatcher(prerequisites, UnboundedMailbox(), false) with MessageDispatcherInterceptor { + new CallingThreadDispatcher(this) with MessageDispatcherInterceptor { override def id: String = config.getString("id") } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6782b7999a..b2545eabc3 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -281,29 +281,34 @@ akka { # Throughput deadline for Dispatcher, set to 0 or negative for no deadline throughput-deadline-time = 0ms - # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set using - # the property - # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead - # to deadlock, use with care - # The following mailbox-push-timeout-time is only used for type=Dispatcher - # and only if mailbox-capacity > 0 - mailbox-capacity = -1 - - # Specifies the timeout to add a new message to a mailbox that is full - - # negative number means infinite timeout. It is only used for type=Dispatcher - # and only if mailbox-capacity > 0 - mailbox-push-timeout-time = 10s - - # FQCN of the MailboxType, if not specified the default bounded or unbounded - # mailbox is used. The Class of the FQCN must have a public constructor with - # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. - mailbox-type = "" - # For BalancingDispatcher: If the balancing dispatcher should attempt to # schedule idle actors using the same dispatcher when a message comes in, # and the dispatchers ExecutorService is not fully busy already. attempt-teamwork = on + + # If this dispatcher requires a specific type of mailbox, specify the + # fully-qualified class name here; the actually created mailbox will + # be a subtype of this type. The empty string signifies no requirement. + mailbox-requirement = "" + } + + default-mailbox { + # FQCN of the MailboxType. The Class of the FQCN must have a public + # constructor with + # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedMailbox" + + # If the mailbox is bounded then it uses this setting to determine its + # capacity. The provided value must be positive. + # NOTICE: + # Up to version 2.1 the mailbox type was determined based on this setting; + # this is no longer the case, the type must explicitly be a bounded mailbox. + mailbox-capacity = 1000 + + # If the mailbox is bounded then this is the timeout for enqueueing + # in case the mailbox is full. Negative values signify infinite + # timeout, which should be avoided as it bears the risk of dead-lock. + mailbox-push-timeout-time = 10s # For Actor with Stash: The default capacity of the stash. # If negative (or zero) then an unbounded stash is used (default) @@ -322,10 +327,16 @@ akka { requirements { "akka.dispatch.UnboundedMessageQueueSemantics" = akka.actor.mailbox.unbounded-queue-based - "akka.dispatch.DequeBasedMessageQueue" = + "akka.dispatch.BoundedMessageQueueSemantics" = + akka.actor.mailbox.bounded-queue-based + "akka.dispatch.DequeBasedMessageQueueSemantics" = akka.actor.mailbox.unbounded-deque-based "akka.dispatch.UnboundedDequeBasedMessageQueueSemantics" = akka.actor.mailbox.unbounded-deque-based + "akka.dispatch.BoundedDequeBasedMessageQueueSemantics" = + akka.actor.mailbox.bounded-deque-based + "akka.dispatch.MultipleConsumerSemantics" = + akka.actor.mailbox.unbounded-queue-based } unbounded-queue-based { @@ -335,12 +346,26 @@ akka { mailbox-type = "akka.dispatch.UnboundedMailbox" } + bounded-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedMailbox" + } + unbounded-deque-based { # FQCN of the MailboxType, The Class of the FQCN must have a public # constructor with (akka.actor.ActorSystem.Settings, # com.typesafe.config.Config) parameters. mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" } + + bounded-deque-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" + } } debug { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index b36a7389ec..b26d8df60b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -16,6 +16,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal +import akka.dispatch.MessageDispatcher /** * The actor context - the view of the actor cell from the actor. @@ -338,6 +339,7 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, + val dispatcher: MessageDispatcher, val parent: InternalActorRef) extends UntypedActorContext with Cell with dungeon.ReceiveTimeout diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bd2ee513fd..b06c7a0b5d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -271,6 +271,8 @@ private[akka] case object Nobody extends MinimalActorRef { private[akka] class LocalActorRef private[akka] ( _system: ActorSystemImpl, _props: Props, + _dispatcher: MessageDispatcher, + _mailboxType: MailboxType, _supervisor: InternalActorRef, override val path: ActorPath) extends ActorRefWithCell with LocalRef { @@ -285,11 +287,11 @@ private[akka] class LocalActorRef private[akka] ( * actorCell before we call init and start, since we can start using "this" * object from another thread as soon as we run init. */ - private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor) - actorCell.init(sendSupervise = true) + private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor) + actorCell.init(sendSupervise = true, _mailboxType) - protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = - new ActorCell(system, ref, props, supervisor) + protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell = + new ActorCell(system, ref, props, dispatcher, supervisor) protected def actorContext: ActorContext = actorCell diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 62a1735893..ceacb110f8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -13,10 +13,12 @@ import akka.util.{ Switch, Helpers } import akka.japi.Util.immutableSeq import akka.util.Collections.EmptyImmutableSeq import scala.util.{ Success, Failure } +import scala.util.control.NonFatal import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.annotation.implicitNotFound import akka.ConfigurationException +import akka.dispatch.Mailboxes /** * Interface for all ActorRef providers to implement. @@ -559,10 +561,16 @@ private[akka] class LocalActorRefProvider private[akka] ( */ protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy + private lazy val defaultDispatcher = system.dispatchers.defaultGlobalDispatcher + + private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId) + override lazy val rootGuardian: LocalActorRef = new LocalActorRef( system, Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy), + defaultDispatcher, + defaultMailbox, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this @@ -581,7 +589,7 @@ private[akka] class LocalActorRefProvider private[akka] ( val cell = rootGuardian.underlying cell.reserveChild("user") val ref = new LocalActorRef(system, Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy), - rootGuardian, rootPath / "user") + defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "user") cell.initChild(ref) ref.start() ref @@ -592,7 +600,7 @@ private[akka] class LocalActorRefProvider private[akka] ( cell.reserveChild("system") val ref = new LocalActorRef( system, Props(classOf[LocalActorRefProvider.SystemGuardian], systemGuardianStrategy, guardian), - rootGuardian, rootPath / "system") + defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "system") cell.initChild(ref) ref.start() ref @@ -712,19 +720,48 @@ private[akka] class LocalActorRefProvider private[akka] ( if (!system.dispatchers.hasDispatcher(props2.dispatcher)) throw new ConfigurationException(s"Dispatcher [${props2.dispatcher}] not configured for path $path") - if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async) - else new LocalActorRef(system, props2, supervisor, path) + try { + val dispatcher = system.dispatchers.lookup(props2.dispatcher) + val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config) + + if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async) + else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path) + } catch { + case NonFatal(e) ⇒ throw new ConfigurationException( + s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e) + } case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) val p = props.withRouter(d.routerConfig) + if (!system.dispatchers.hasDispatcher(p.dispatcher)) throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for routees of $path") if (!system.dispatchers.hasDispatcher(d.routerConfig.routerDispatcher)) throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for router of $path") - new RoutedActorRef(system, p, supervisor, path).initialize(async) + + val routerProps = + Props(p.deploy.copy(dispatcher = p.routerConfig.routerDispatcher), + classOf[RoutedActorCell.RouterCreator], Vector(p.routerConfig)) + val routeeProps = p.withRouter(NoRouter) + + try { + val routerDispatcher = system.dispatchers.lookup(p.routerConfig.routerDispatcher) + val routerMailbox = system.mailboxes.getMailboxType(routerProps, routerDispatcher.configurator.config) + + // the RouteeProvider uses context.actorOf() to create the routees, which does not allow us to pass + // these through, but obtain them here for early verification + val routeeDispatcher = system.dispatchers.lookup(p.dispatcher) + val routeeMailbox = system.mailboxes.getMailboxType(routeeProps, routeeDispatcher.configurator.config) + + new RoutedActorRef(system, routerProps, routerDispatcher, routerMailbox, routeeProps, supervisor, path).initialize(async) + } catch { + case NonFatal(e) ⇒ throw new ConfigurationException( + s"configuration problem while creating [$path] with router dispatcher [${routerProps.dispatcher}] and mailbox [${routerProps.mailbox}] " + + s"and routee dispatcher [${routeeProps.dispatcher}] and mailbox [${routeeProps.mailbox}]", e) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b6e4bbbbd8..cb926d0d1c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -544,28 +544,13 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def deadLetters: ActorRef = provider.deadLetters - val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { - def enqueue(receiver: ActorRef, envelope: Envelope): Unit = - deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) - def dequeue() = null - def hasMessages = false - def numberOfMessages = 0 - def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () - }) { - becomeClosed() - def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = - deadLetters ! DeadLetter(handle, receiver, receiver) - def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil - def hasSystemMessages = false - } + val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters) val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) + threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes)) val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher - val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess) - val internalCallingThreadExecutionContext: ExecutionContext = dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse( new ExecutionContext with BatchingExecutor { diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 46b74bcd26..47af4d32d9 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -91,11 +91,11 @@ object Props { @deprecated("use Props.withDispatcher and friends", "2.2") def apply( creator: () ⇒ Actor = Props.defaultCreator, - dispatcher: String = Dispatchers.DefaultDispatcherId, + dispatcher: String = Deploy.NoDispatcherGiven, routerConfig: RouterConfig = Props.defaultRoutedProps, deploy: Deploy = Props.defaultDeploy): Props = { - val d1 = if (dispatcher != Dispatchers.DefaultDispatcherId) deploy.copy(dispatcher = dispatcher) else deploy + val d1 = if (dispatcher != Deploy.NoDispatcherGiven) deploy.copy(dispatcher = dispatcher) else deploy val d2 = if (routerConfig != Props.defaultRoutedProps) d1.copy(routerConfig = routerConfig) else d1 val p = Props(classOf[CreatorFunctionConsumer], creator) if (d2 != Props.defaultDeploy) p.withDeploy(d2) else p @@ -125,22 +125,17 @@ object Props { def create[T <: Actor](creator: Creator[T]): Props = { if ((creator.getClass.getModifiers & Modifier.STATIC) == 0) throw new IllegalArgumentException("cannot use non-static local Creator to create actors; make it static or top-level") - val cc = classOf[Creator[_]] val ac = classOf[Actor] - @tailrec def findType(c: Class[_]): Class[_] = { - c.getGenericInterfaces collectFirst { - case t: ParameterizedType if cc.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]]) ⇒ - t.getActualTypeArguments.head match { - case c: Class[_] ⇒ c // since T <: Actor - case v: TypeVariable[_] ⇒ - v.getBounds collectFirst { case c: Class[_] if ac.isAssignableFrom(c) && c != ac ⇒ c } getOrElse ac - } - } match { - case Some(x) ⇒ x - case None ⇒ findType(c.getSuperclass) - } + val actorClass = Reflect.findMarker(creator.getClass, classOf[Creator[_]]) match { + case t: ParameterizedType ⇒ + t.getActualTypeArguments.head match { + case c: Class[_] ⇒ c // since T <: Actor + case v: TypeVariable[_] ⇒ + v.getBounds collectFirst { case c: Class[_] if ac.isAssignableFrom(c) && c != ac ⇒ c } getOrElse ac + case x ⇒ throw new IllegalArgumentException(s"unsupported type found in Creator argument [$x]") + } } - apply(defaultDeploy, classOf[CreatorConsumer], findType(creator.getClass) :: creator :: Nil) + apply(defaultDeploy, classOf[CreatorConsumer], actorClass :: creator :: Nil) } } @@ -245,9 +240,9 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any] * Convenience method for extracting the mailbox information from the * contained [[Deploy]] instance. */ - def mailbox: Option[String] = deploy.mailbox match { - case NoMailboxGiven ⇒ None - case x ⇒ Some(x) + def mailbox: String = deploy.mailbox match { + case NoMailboxGiven ⇒ Mailboxes.DefaultMailboxId + case x ⇒ x } /** diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 617658bfbb..ea5b54eedc 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -29,6 +29,8 @@ import util.Try private[akka] class RepointableActorRef( val system: ActorSystemImpl, val props: Props, + val dispatcher: MessageDispatcher, + val mailboxType: MailboxType, val supervisor: InternalActorRef, val path: ActorPath) extends ActorRefWithCell with RepointableRef { @@ -111,7 +113,7 @@ private[akka] class RepointableActorRef( * unstarted cell. The cell must be fully functional. */ def newCell(old: UnstartedCell): Cell = - new ActorCell(system, this, props, supervisor).init(sendSupervise = false) + new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType) def start(): Unit = () diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index beb20573de..ca5bf11a5e 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -3,7 +3,7 @@ */ package akka.actor -import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueue } +import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueueSemantics } import akka.AkkaException /** @@ -47,7 +47,7 @@ import akka.AkkaException * any trait/class that overrides the `preRestart` callback. This means it's not possible to write * `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`. */ -trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueue] +trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics] /** * The `UnboundedStash` trait is a version of `Stash` that enforces an unbounded stash for you actor. @@ -64,16 +64,16 @@ trait UnrestrictedStash extends Actor { */ private val capacity: Int = { val dispatcher = context.system.settings.config.getConfig(context.props.dispatcher) - val config = dispatcher.withFallback(context.system.settings.config.getConfig("akka.actor.default-dispatcher")) + val config = dispatcher.withFallback(context.system.settings.config.getConfig("akka.actor.default-mailbox")) config.getInt("stash-capacity") } /* The actor's deque-based message queue. * `mailbox.queue` is the underlying `Deque`. */ - private val mailbox: DequeBasedMessageQueue = { + private val mailbox: DequeBasedMessageQueueSemantics = { context.asInstanceOf[ActorCell].mailbox.messageQueue match { - case queue: DequeBasedMessageQueue ⇒ queue + case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" + """An (unbounded) deque-based mailbox can be configured as follows: | my-custom-mailbox { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 6ae063d9f6..6c0b9971e1 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -13,6 +13,8 @@ import akka.actor._ import akka.serialization.SerializationExtension import scala.util.control.NonFatal import scala.util.control.Exception.Catcher +import akka.dispatch.MailboxType +import akka.dispatch.ProducesMessageQueue private[akka] trait Dispatch { this: ActorCell ⇒ @@ -30,8 +32,6 @@ private[akka] trait Dispatch { this: ActorCell ⇒ final def numberOfMessages: Int = mailbox.numberOfMessages - val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) - final def isTerminated: Boolean = mailbox.isClosed /** @@ -39,24 +39,29 @@ private[akka] trait Dispatch { this: ActorCell ⇒ * reasonably different from the previous UID of a possible actor with the same path, * which can be achieved by using ThreadLocalRandom.current.nextInt(). */ - final def init(sendSupervise: Boolean): this.type = { + final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = { /* * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ - val mbox = dispatcher.createMailbox(this) + val mbox = dispatcher.createMailbox(this, mailboxType) + /* + * The mailboxType was calculated taking into account what the MailboxType + * has promised to produce. If that was more than the default, then we need + * to reverify here because the dispatcher may well have screwed it up. + */ // we need to delay the failure to the point of actor creation so we can handle // it properly in the normal way val actorClass = props.actorClass - val createMessage = if (system.mailboxes.hasRequiredType(actorClass)) { - Create(system.mailboxes.getRequiredType(actorClass).flatMap { - case c if !c.isAssignableFrom(mbox.messageQueue.getClass) ⇒ - Some(ActorInitializationException(self, s"Actor [${self}] requires mailbox type [${c}]" + - s" got [${mbox.messageQueue.getClass}]")) - case _ ⇒ None - }) - } else Create(None) + val createMessage = mailboxType match { + case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒ + val req = system.mailboxes.getRequiredType(actorClass) + if (req isInstance mbox.messageQueue) Create(None) + else Create(Some(ActorInitializationException(self, + s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass.getName}]"))) + case _ ⇒ Create(None) + } swapMailbox(mbox) mailbox.setActor(this) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index cfefb7153f..11d4fc075b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -17,6 +17,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal +import scala.util.Try +import scala.util.Failure +import akka.util.Reflect +import java.lang.reflect.ParameterizedType final case class Envelope private (val message: Any, val sender: ActorRef) @@ -79,11 +83,14 @@ private[akka] object MessageDispatcher { } } -abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext { +abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext { import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } - import prerequisites._ + import configurator.prerequisites + + val mailboxes = prerequisites.mailboxes + val eventStream = prerequisites.eventStream @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! @@ -109,18 +116,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext /** * Creates and returns a mailbox for the given actor. */ - protected[akka] def createMailbox(actor: Cell): Mailbox - - /** - * Finds out the mailbox type for an actor based on configuration, props and requirements. - */ - protected[akka] def getMailboxType(actor: Cell, mailboxType: MailboxType, mailboxTypeConfigured: Boolean): MailboxType = - actor.props.mailbox.flatMap(id ⇒ actor.system.mailboxes.lookup(id)) match { - case Some(x) ⇒ x - case None if mailboxTypeConfigured ⇒ mailboxType - case None ⇒ actor.system.mailboxes.getRequiredType(actor.props.actorClass). - flatMap(c ⇒ actor.system.mailboxes.lookupByQueueType(c)).getOrElse(mailboxType) - } + protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox /** * Identifier of this dispatcher, corresponds to the full key @@ -156,8 +152,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } override def reportFailure(t: Throwable): Unit = t match { - case e: LogEventException ⇒ prerequisites.eventStream.publish(e.event) - case _ ⇒ prerequisites.eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage)) + case e: LogEventException ⇒ eventStream.publish(e.event) + case _ ⇒ eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage)) } @tailrec @@ -175,7 +171,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext private def scheduleShutdownAction(): Unit = { // IllegalStateException is thrown if scheduler has been shutdown - try scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { + try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { override def execute(runnable: Runnable): Unit = runnable.run() override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) }) catch { @@ -203,7 +199,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext protected[akka] def unregister(actor: ActorCell) { if (debug) actors.remove(this, actor.self) addInhabitants(-1) - val mailBox = actor.swapMailbox(deadLetterMailbox) + val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox) mailBox.becomeClosed() mailBox.cleanUp() } @@ -321,37 +317,6 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit */ def dispatcher(): MessageDispatcher - /** - * Returns a factory for the [[akka.dispatch.Mailbox]] given the configuration. - * Default implementation instantiate the [[akka.dispatch.MailboxType]] specified - * as FQCN in mailbox-type config property. If mailbox-type is unspecified (empty) - * then [[akka.dispatch.UnboundedMailbox]] is used when capacity is < 1, - * otherwise [[akka.dispatch.BoundedMailbox]]. - */ - def mailboxType(): MailboxType = { - config.getString("mailbox-type") match { - case "" ⇒ - if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox() - else new BoundedMailbox(prerequisites.settings, config) - case "unbounded" ⇒ UnboundedMailbox() - case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) - case fqcn ⇒ - val args = List(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) - prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ - case exception ⇒ - throw new IllegalArgumentException( - ("Cannot instantiate MailboxType [%s], defined in [%s], " + - "make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters") - .format(fqcn, config.getString("id")), exception) - }).get - } - } - - /** - * Was the mailbox type configured or derived? - */ - def mailBoxTypeConfigured: Boolean = config.getString("mailbox-type") != Deploy.NoMailboxGiven - def configureExecutor(): ExecutorServiceConfigurator = { config.getString("executor") match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 26795f5aba..b51b27c0b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -29,16 +29,15 @@ import scala.concurrent.duration.FiniteDuration * @see akka.dispatch.Dispatchers */ class BalancingDispatcher( - _prerequisites: DispatcherPrerequisites, + _configurator: MessageDispatcherConfigurator, _id: String, throughput: Int, throughputDeadlineTime: Duration, - mailboxType: MailboxType, - _mailBoxTypeConfigured: Boolean, + _mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _shutdownTimeout: FiniteDuration, attemptTeamWork: Boolean) - extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _mailBoxTypeConfigured, _executorServiceFactoryProvider, _shutdownTimeout) { + extends Dispatcher(_configurator, _id, throughput, throughputDeadlineTime, _executorServiceFactoryProvider, _shutdownTimeout) { /** * INTERNAL API @@ -51,12 +50,12 @@ class BalancingDispatcher( /** * INTERNAL API */ - private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None) + private[akka] val messageQueue: MessageQueue = _mailboxType.create(None, None) private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue) extends Mailbox(_messageQueue) with DefaultSystemMessageQueue { override def cleanUp(): Unit = { - val dlq = system.deadLetterMailbox + val dlq = mailboxes.deadLetterMailbox //Don't call the original implementation of this since it scraps all messages, and we don't want to do that var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage)) while (messages.nonEmpty) { @@ -69,7 +68,8 @@ class BalancingDispatcher( } } - protected[akka] override def createMailbox(actor: akka.actor.Cell): Mailbox = new SharingMailbox(actor.systemImpl, messageQueue) + protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox = + new SharingMailbox(actor.systemImpl, messageQueue) protected[akka] override def register(actor: ActorCell): Unit = { super.register(actor) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index eb97628dfc..f60580943a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -30,15 +30,15 @@ import java.lang.reflect.ParameterizedType * Larger values (or zero or negative) increase throughput, smaller values increase fairness */ class Dispatcher( - _prerequisites: DispatcherPrerequisites, + _configurator: MessageDispatcherConfigurator, val id: String, val throughput: Int, val throughputDeadlineTime: Duration, - val mailboxType: MailboxType, - val mailboxTypeConfigured: Boolean, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, val shutdownTimeout: FiniteDuration) - extends MessageDispatcher(_prerequisites) { + extends MessageDispatcher(_configurator) { + + import configurator.prerequisites._ private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate { lazy val executor: ExecutorService = factory.createExecutorService @@ -46,7 +46,7 @@ class Dispatcher( } @volatile private var executorServiceDelegate: LazyExecutorServiceDelegate = - new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)) + new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, threadFactory)) protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate @@ -80,7 +80,7 @@ class Dispatcher( executorService execute invocation } catch { case e2: RejectedExecutionException ⇒ - prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!")) + eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!")) throw e2 } } @@ -89,8 +89,8 @@ class Dispatcher( /** * INTERNAL API */ - protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = { - new Mailbox(getMailboxType(actor, mailboxType, mailboxTypeConfigured).create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue + protected[akka] def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox = { + new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue } /** @@ -125,7 +125,7 @@ class Dispatcher( } catch { //Retry once case e: RejectedExecutionException ⇒ mbox.setAsIdle() - prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!")) + eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 57b626a119..99fc8acc62 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -11,6 +11,7 @@ import akka.event.Logging.Warning import akka.event.EventStream import scala.concurrent.duration.Duration import akka.ConfigurationException +import akka.actor.Deploy /** * DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher @@ -18,10 +19,10 @@ import akka.ConfigurationException trait DispatcherPrerequisites { def threadFactory: ThreadFactory def eventStream: EventStream - def deadLetterMailbox: Mailbox def scheduler: Scheduler def dynamicAccess: DynamicAccess def settings: ActorSystem.Settings + def mailboxes: Mailboxes } /** @@ -30,10 +31,10 @@ trait DispatcherPrerequisites { private[akka] case class DefaultDispatcherPrerequisites( val threadFactory: ThreadFactory, val eventStream: EventStream, - val deadLetterMailbox: Mailbox, val scheduler: Scheduler, val dynamicAccess: DynamicAccess, - val settings: ActorSystem.Settings) extends DispatcherPrerequisites + val settings: ActorSystem.Settings, + val mailboxes: Mailboxes) extends DispatcherPrerequisites object Dispatchers { /** @@ -101,7 +102,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } //INTERNAL API - private def config(id: String): Config = { + private[akka] def config(id: String): Config = { import scala.collection.JavaConverters._ def simpleName = id.substring(id.lastIndexOf('.') + 1) idConfig(id) @@ -172,12 +173,10 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi extends MessageDispatcherConfigurator(config, prerequisites) { private val instance = new Dispatcher( - prerequisites, + this, config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - mailBoxTypeConfigured, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) @@ -187,22 +186,56 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi override def dispatcher(): MessageDispatcher = instance } +/** + * INTERNAL API + */ +private[akka] object BalancingDispatcherConfigurator { + private val defaultRequirement = + ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics") + def amendConfig(config: Config): Config = + if (config.getString("mailbox-requirement") != Mailboxes.NoMailboxRequirement) config + else defaultRequirement.withFallback(config) +} + /** * Configurator for creating [[akka.dispatch.BalancingDispatcher]]. * Returns the same dispatcher instance for for each invocation * of the `dispatcher()` method. */ -class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { +class BalancingDispatcherConfigurator(_config: Config, _prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(BalancingDispatcherConfigurator.amendConfig(_config), _prerequisites) { - private val instance = new BalancingDispatcher( - prerequisites, - config.getString("id"), - config.getInt("throughput"), - Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, mailBoxTypeConfigured, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getBoolean("attempt-teamwork")) + private val instance = { + val mailboxes = prerequisites.mailboxes + val id = config.getString("id") + val requirement = mailboxes.getMailboxRequirement(config) + if (!classOf[MultipleConsumerSemantics].isAssignableFrom(requirement)) + throw new IllegalArgumentException( + "BalancingDispatcher must have 'mailbox-requirement' which implements akka.dispatch.MultipleConsumerSemantics; " + + s"dispatcher [$id] has [$requirement]") + val conf = config.withFallback(prerequisites.settings.config.getConfig(Mailboxes.DefaultMailboxId)) + val mailboxType = + if (conf.getString("mailbox-type") != Deploy.NoMailboxGiven) mailboxes.lookupByQueueType(requirement) + else { + val mt = mailboxes.lookup(conf.getString("mailbox-type")) + if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt))) + throw new IllegalArgumentException( + s"BalancingDispatcher [$id] has 'mailbox-type' [${mt.getClass}] which is incompatible with 'mailbox-requirement' [$requirement]") + mt + } + create(mailboxType) + } + + protected def create(mailboxType: MailboxType): BalancingDispatcher = + new BalancingDispatcher( + this, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getBoolean("attempt-teamwork")) /** * Returns the same dispatcher instance for each invocation @@ -233,7 +266,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer */ override def dispatcher(): MessageDispatcher = new PinnedDispatcher( - prerequisites, null, config.getString("id"), mailboxType, mailBoxTypeConfigured, + this, null, config.getString("id"), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 13cf8b6315..ebbb525830 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -269,7 +269,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * if we closed the mailbox, we must dump the remaining system messages * to deadLetters (this is essential for DeathWatch) */ - val dlm = actor.systemImpl.deadLetterMailbox + val dlm = actor.dispatcher.mailboxes.deadLetterMailbox while (messageList.nonEmpty) { val msg = messageList.head messageList = messageList.tail @@ -295,7 +295,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ protected[dispatch] def cleanUp(): Unit = if (actor ne null) { // actor is null for the deadLetterMailbox - val dlm = actor.systemImpl.deadLetterMailbox + val dlm = actor.dispatcher.mailboxes.deadLetterMailbox var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) while (messageList.nonEmpty) { // message must be “virgin” before being able to systemEnqueue again @@ -306,7 +306,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() - messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) + messageQueue.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue) } } @@ -347,7 +347,7 @@ trait MessageQueue { def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit } -class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue { +class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with UnboundedMessageQueueSemantics { final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle) @@ -394,7 +394,7 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ if (Mailbox.debug) println(receiver + " having enqueued " + message) val currentList = systemQueueGet if (currentList.head == NoMessage) { - if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message) + if (actor ne null) actor.dispatcher.mailboxes.deadLetterMailbox.systemEnqueue(receiver, message) } else { if (!systemQueuePut(currentList, message :: currentList)) { message.unlink() @@ -418,10 +418,16 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ } +/** + * This is a marker trait for message queues which support multiple consumers, + * as is required by the BalancingDispatcher. + */ +trait MultipleConsumerSemantics + /** * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue. */ -trait QueueBasedMessageQueue extends MessageQueue { +trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics { def queue: Queue[Envelope] def numberOfMessages = queue.size def hasMessages = !queue.isEmpty @@ -440,7 +446,9 @@ trait QueueBasedMessageQueue extends MessageQueue { * UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue, * i.e. a non-blocking enqueue and dequeue. */ -trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { +trait UnboundedMessageQueueSemantics + +trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle def dequeue(): Envelope = queue.poll() } @@ -449,8 +457,11 @@ trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue, * i.e. blocking enqueue with timeout. */ -trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { +trait BoundedMessageQueueSemantics { def pushTimeOut: Duration +} + +trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics { override def queue: BlockingQueue[Envelope] def enqueue(receiver: ActorRef, handle: Envelope): Unit = @@ -466,16 +477,23 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { /** * DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque. */ -trait DequeBasedMessageQueue extends QueueBasedMessageQueue { - def queue: Deque[Envelope] +trait DequeBasedMessageQueueSemantics { def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit } +trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with UnboundedMessageQueueSemantics + +trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with BoundedMessageQueueSemantics + +trait DequeBasedMessageQueue extends QueueBasedMessageQueue with DequeBasedMessageQueueSemantics { + def queue: Deque[Envelope] +} + /** * UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue, * i.e. a non-blocking enqueue and dequeue. */ -trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { +trait UnboundedDequeBasedMessageQueue extends DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics { def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle def dequeue(): Envelope = queue.poll() @@ -485,7 +503,7 @@ trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { * BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue, * i.e. blocking enqueue with timeout. */ -trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { +trait BoundedDequeBasedMessageQueue extends DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics { def pushTimeOut: Duration override def queue: BlockingDeque[Envelope] @@ -523,17 +541,23 @@ trait MailboxType { def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue } +trait ProducesMessageQueue[T <: MessageQueue] + /** * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors. */ -case class UnboundedMailbox() extends MailboxType { +case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final def queue: Queue[Envelope] = this - } + new UnboundedMailbox.MessageQueue +} + +object UnboundedMailbox { + class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { + final def queue: Queue[Envelope] = this + } } /** @@ -541,17 +565,18 @@ case class UnboundedMailbox() extends MailboxType { * the only drawback is that you can't have multiple consumers, * which rules out using it with BalancingDispatcher for instance. */ -case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType { +case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this() - final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue() + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue } /** * BoundedMailbox is the default bounded MailboxType used by Akka Actors. */ -case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) extends MailboxType { +case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration) + extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)) @@ -560,57 +585,78 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Finit if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final def queue: BlockingQueue[Envelope] = this - final val pushTimeOut = BoundedMailbox.this.pushTimeOut - } + new BoundedMailbox.MessageQueue(capacity, pushTimeOut) +} + +object BoundedMailbox { + class MessageQueue(capacity: Int, final val pushTimeOut: FiniteDuration) + extends LinkedBlockingQueue[Envelope](capacity) with BoundedQueueBasedMessageQueue { + final def queue: BlockingQueue[Envelope] = this + } } /** * UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents. * Extend this class and provide the Comparator in the constructor. */ -class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType { +class UnboundedPriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int) + extends MailboxType with ProducesMessageQueue[UnboundedPriorityMailbox.MessageQueue] { def this(cmp: Comparator[Envelope]) = this(cmp, 11) final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final def queue: Queue[Envelope] = this - } + new UnboundedPriorityMailbox.MessageQueue(initialCapacity, cmp) +} + +object UnboundedPriorityMailbox { + class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope]) + extends PriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { + final def queue: Queue[Envelope] = this + } } /** * BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents. * Extend this class and provide the Comparator in the constructor. */ -class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { +class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) + extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final def queue: BlockingQueue[Envelope] = this - final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut - } + new BoundedPriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut) +} + +object BoundedPriorityMailbox { + class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration) + extends BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) + with BoundedQueueBasedMessageQueue { + final def queue: BlockingQueue[Envelope] = this + } } /** * UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque. */ -case class UnboundedDequeBasedMailbox() extends MailboxType { +case class UnboundedDequeBasedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedDequeBasedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics { - final val queue = this - } + new UnboundedDequeBasedMailbox.MessageQueue +} + +object UnboundedDequeBasedMailbox { + class MessageQueue extends LinkedBlockingDeque[Envelope] with UnboundedDequeBasedMessageQueue { + final val queue = this + } } /** * BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque. */ -case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) extends MailboxType { +case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) + extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)) @@ -619,10 +665,14 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null") final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics { - final val queue = this - final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut - } + new BoundedDequeBasedMailbox.MessageQueue(capacity, pushTimeOut) +} + +object BoundedDequeBasedMailbox { + class MessageQueue(capacity: Int, val pushTimeOut: FiniteDuration) + extends LinkedBlockingDeque[Envelope](capacity) with BoundedDequeBasedMessageQueue { + final val queue = this + } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala index 2633eeeb8c..0a3a3e9fd9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -12,13 +12,48 @@ import akka.event.Logging.Warning import akka.ConfigurationException import scala.annotation.tailrec import java.lang.reflect.ParameterizedType +import akka.util.Reflect +import akka.actor.Props +import akka.actor.Deploy +import scala.util.Try +import scala.util.Failure +import scala.util.control.NonFatal +import akka.actor.ActorRef +import akka.actor.DeadLetter +import akka.dispatch.sysmsg.SystemMessage +import akka.dispatch.sysmsg.LatestFirstSystemMessageList +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import akka.dispatch.sysmsg.SystemMessageList + +object Mailboxes { + final val DefaultMailboxId = "akka.actor.default-mailbox" + final val NoMailboxRequirement = "" +} private[akka] class Mailboxes( val settings: ActorSystem.Settings, val eventStream: EventStream, - dynamicAccess: DynamicAccess) { + dynamicAccess: DynamicAccess, + deadLetters: ActorRef) { - private val mailboxTypeConfigurators = new ConcurrentHashMap[String, Option[MailboxTypeConfigurator]] + import Mailboxes._ + + val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { + def enqueue(receiver: ActorRef, envelope: Envelope): Unit = + deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) + def dequeue() = null + def hasMessages = false + def numberOfMessages = 0 + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () + }) { + becomeClosed() + def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = + deadLetters ! DeadLetter(handle, receiver, receiver) + def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil + def hasSystemMessages = false + } + + private val mailboxTypeConfigurators = new ConcurrentHashMap[String, MailboxType] private val mailboxBindings: Map[Class[_ <: Any], String] = { import scala.collection.JavaConverters._ @@ -38,42 +73,95 @@ private[akka] class Mailboxes( /** * Returns a mailbox type as specified in configuration, based on the id, or if not defined None. */ - def lookup(id: String): Option[MailboxType] = lookupConfigurator(id).map { _.mailboxType } + def lookup(id: String): MailboxType = lookupConfigurator(id) /** * Returns a mailbox type as specified in configuration, based on the type, or if not defined None. */ - def lookupByQueueType(queueType: Class[_ <: Any]): Option[MailboxType] = - lookupId(queueType).flatMap(x ⇒ lookup(x)) + def lookupByQueueType(queueType: Class[_ <: Any]): MailboxType = lookup(lookupId(queueType)) private final val rmqClass = classOf[RequiresMessageQueue[_]] /** * Return the required message queue type for this class if any. */ - def getRequiredType(actorClass: Class[_ <: Actor]): Option[Class[_]] = { - @tailrec - def innerRequiredType(classes: Iterator[Class[_]]): Option[Class[_]] = { - if (classes.isEmpty) None - else { - val c = classes.next() - if (rmqClass.isAssignableFrom(c)) { - val ifaces = c.getGenericInterfaces - val tpe = ifaces.collectFirst { - case t: ParameterizedType if rmqClass.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]]) ⇒ - t.getActualTypeArguments.head.asInstanceOf[Class[_]] - } - if (tpe.isDefined) tpe - else innerRequiredType(classes ++ ifaces.map { - case c: Class[_] ⇒ c - case c: ParameterizedType ⇒ c.getRawType.asInstanceOf[Class[_]] - } ++ Iterator(c.getSuperclass)) - } else { - innerRequiredType(classes) - } + def getRequiredType(actorClass: Class[_ <: Actor]): Class[_] = + Reflect.findMarker(actorClass, rmqClass) match { + case t: ParameterizedType ⇒ t.getActualTypeArguments.head match { + case c: Class[_] ⇒ c + case x ⇒ throw new IllegalArgumentException(s"no wildcard type allowed in RequireMessageQueue argument (was [$x])") } } - if (rmqClass.isAssignableFrom(actorClass)) innerRequiredType(Iterator(actorClass)) - else None + + // don’t care if this happens twice + private var mailboxSizeWarningIssued = false + + def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match { + case NoMailboxRequirement ⇒ classOf[MessageQueue] + case x ⇒ dynamicAccess.getClassFor[AnyRef](x).get + } + + def getProducedMessageQueueType(mailboxType: MailboxType): Class[_] = { + val pmqClass = classOf[ProducesMessageQueue[_]] + if (!pmqClass.isAssignableFrom(mailboxType.getClass)) classOf[MessageQueue] + else Reflect.findMarker(mailboxType.getClass, pmqClass) match { + case t: ParameterizedType ⇒ + t.getActualTypeArguments.head match { + case c: Class[_] ⇒ c + case x ⇒ throw new IllegalArgumentException( + s"no wildcard type allowed in ProducesMessageQueue argument (was [$x])") + } + } + } + + /** + * Finds out the mailbox type for an actor based on configuration, props and requirements. + */ + protected[akka] def getMailboxType(props: Props, dispatcherConfig: Config): MailboxType = { + val id = dispatcherConfig.getString("id") + val deploy = props.deploy + val actorClass = props.actorClass + lazy val actorRequirement = getRequiredType(actorClass) + + val mailboxRequirement: Class[_] = getMailboxRequirement(dispatcherConfig) + + val hasMailboxRequirement: Boolean = mailboxRequirement != classOf[MessageQueue] + + val hasMailboxType = + dispatcherConfig.hasPath("mailbox-type") && + dispatcherConfig.getString("mailbox-type") != Deploy.NoMailboxGiven + + // TODO remove in 2.3 + if (!hasMailboxType && !mailboxSizeWarningIssued && dispatcherConfig.hasPath("mailbox-size")) { + eventStream.publish(Warning("mailboxes", getClass, + "ignoring setting 'mailbox-size' for dispatcher [$id], you need to specify 'mailbox-type=bounded'")) + mailboxSizeWarningIssued = true + } + + def verifyRequirements(mailboxType: MailboxType): MailboxType = { + lazy val mqType: Class[_] = getProducedMessageQueueType(mailboxType) + if (hasMailboxRequirement && !mailboxRequirement.isAssignableFrom(mqType)) + throw new IllegalArgumentException( + s"produced message queue type [$mqType] does not fulfill requirement for dispatcher [${id}]") + if (hasRequiredType(actorClass) && !actorRequirement.isAssignableFrom(mqType)) + throw new IllegalArgumentException( + s"produced message queue type [$mqType] does not fulfill requirement for actor class [$actorClass]") + mailboxType + } + + if (deploy.mailbox != Deploy.NoMailboxGiven) { + verifyRequirements(lookup(deploy.mailbox)) + } else if (deploy.dispatcher != Deploy.NoDispatcherGiven && hasMailboxType) { + verifyRequirements(lookup(dispatcherConfig.getString("id"))) + } else if (hasRequiredType(actorClass)) { + try verifyRequirements(lookupByQueueType(getRequiredType(actorClass))) + catch { + case NonFatal(thr) if (hasMailboxRequirement) ⇒ verifyRequirements(lookupByQueueType(mailboxRequirement)) + } + } else if (hasMailboxRequirement) { + verifyRequirements(lookupByQueueType(mailboxRequirement)) + } else { + verifyRequirements(lookup(DefaultMailboxId)) + } } /** @@ -81,65 +169,53 @@ private[akka] class Mailboxes( */ def hasRequiredType(actorClass: Class[_ <: Actor]): Boolean = rmqClass.isAssignableFrom(actorClass) - private def lookupId(queueType: Class[_]): Option[String] = { + private def lookupId(queueType: Class[_]): String = mailboxBindings.get(queueType) match { - case None ⇒ - eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Mapping for [${queueType}] not configured")) - None - case s ⇒ s + case None ⇒ throw new ConfigurationException(s"Mailbox Mapping for [${queueType}] not configured") + case Some(s) ⇒ s } - } - private def lookupConfigurator(id: String): Option[MailboxTypeConfigurator] = { + private def lookupConfigurator(id: String): MailboxType = { mailboxTypeConfigurators.get(id) match { case null ⇒ // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. - val newConfigurator = - if (settings.config.hasPath(id)) { - Some(new MailboxTypeConfigurator(settings, config(id), dynamicAccess)) - } else { - eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Type [${id}] not configured")) - None - } + val newConfigurator = id match { + // TODO RK remove these two for Akka 2.3 + case "unbounded" ⇒ UnboundedMailbox() + case "bounded" ⇒ new BoundedMailbox(settings, config(id)) + case _ ⇒ + if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [${id}] not configured") + val conf = config(id) + conf.getString("mailbox-type") match { + case "" ⇒ throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty") + case fqcn ⇒ + val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> conf) + dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ + case exception ⇒ + throw new IllegalArgumentException( + (s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" + + " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"), + exception) + }).get + } + } - if (newConfigurator.isDefined) { - mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match { - case null ⇒ newConfigurator - case existing ⇒ existing - } - } else None + mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match { + case null ⇒ newConfigurator + case existing ⇒ existing + } case existing ⇒ existing } } + private val defaultMailboxConfig = settings.config.getConfig(DefaultMailboxId) + //INTERNAL API private def config(id: String): Config = { import scala.collection.JavaConverters._ ConfigFactory.parseMap(Map("id" -> id).asJava) .withFallback(settings.config.getConfig(id)) + .withFallback(defaultMailboxConfig) } } - -private[akka] class MailboxTypeConfigurator( - val settings: ActorSystem.Settings, - val config: Config, - dynamicAccess: DynamicAccess) { - private val instance: MailboxType = { - val id = config.getString("id") - config.getString("mailbox-type") match { - case "" ⇒ throw new IllegalArgumentException(s"The setting mailbox-type, defined in [${id}}] is empty") - case fqcn ⇒ - val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> config) - dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ - case exception ⇒ - throw new IllegalArgumentException( - (s"Cannot instantiate MailboxType [${fqcn}], defined in [${id}], make sure it has a public" + - " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"), - exception) - }).get - } - } - - def mailboxType: MailboxType = instance -} diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index adcf87d118..32f0e2c4ce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -15,19 +15,15 @@ import scala.concurrent.duration.FiniteDuration * the `lookup` method in [[akka.dispatch.Dispatchers]]. */ class PinnedDispatcher( - _prerequisites: DispatcherPrerequisites, + _configurator: MessageDispatcherConfigurator, _actor: ActorCell, _id: String, - _mailboxType: MailboxType, - _mailboxTypeConfigured: Boolean, _shutdownTimeout: FiniteDuration, _threadPoolConfig: ThreadPoolConfig) - extends Dispatcher(_prerequisites, + extends Dispatcher(_configurator, _id, Int.MaxValue, Duration.Zero, - _mailboxType, - _mailboxTypeConfigured, _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1), _shutdownTimeout) { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 3a06c9955d..20bceb7f10 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,7 +5,6 @@ package akka.routing import language.implicitConversions import language.postfixOps - import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ @@ -20,22 +19,33 @@ import akka.event.Logging.Warning import scala.concurrent.forkjoin.ThreadLocalRandom import scala.annotation.tailrec import akka.event.Logging.Warning +import akka.dispatch.{ MailboxType, MessageDispatcher } +import akka.dispatch.BalancingDispatcher /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to * send a message to one (or more) of these actors. */ -private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) - extends RepointableActorRef(_system, _props, _supervisor, _path) { +private[akka] class RoutedActorRef( + _system: ActorSystemImpl, + _routerProps: Props, + _routerDispatcher: MessageDispatcher, + _routerMailbox: MailboxType, + _routeeProps: Props, + _supervisor: InternalActorRef, + _path: ActorPath) + extends RepointableActorRef(_system, _routerProps, _routerDispatcher, _routerMailbox, _supervisor, _path) { // verify that a BalancingDispatcher is not used with a Router - if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) { + if (_routerProps.routerConfig != NoRouter && _routerDispatcher.isInstanceOf[BalancingDispatcher]) { throw new ConfigurationException( "Configuration for " + this + " is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") - } else _props.routerConfig.verifyConfig(_path) + } else _routerProps.routerConfig.verifyConfig(_path) - override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(sendSupervise = false) + override def newCell(old: UnstartedCell): Cell = + new RoutedActorCell(system, this, props, dispatcher, _routeeProps, supervisor) + .init(sendSupervise = false, mailboxType) } @@ -46,16 +56,16 @@ private[akka] object RoutedActorCell { } } -private[akka] final class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef) - extends ActorCell( - _system, - _ref, - _props.copy( - deploy = _props.deploy.copy(dispatcher = _props.routerConfig.routerDispatcher), - classOf[RoutedActorCell.RouterCreator], Vector(_props.routerConfig)), - _supervisor) { +private[akka] final class RoutedActorCell( + _system: ActorSystemImpl, + _ref: InternalActorRef, + _routerProps: Props, + _routerDispatcher: MessageDispatcher, + val routeeProps: Props, + _supervisor: InternalActorRef) + extends ActorCell(_system, _ref, _routerProps, _routerDispatcher, _supervisor) { - private[akka] val routerConfig = _props.routerConfig + private[akka] val routerConfig = _routerProps.routerConfig private[akka] val resizeInProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @@ -72,7 +82,6 @@ private[akka] final class RoutedActorCell(_system: ActorSystemImpl, _ref: Intern def route = _route private def startRoute() { - val routeeProps = _props.withRouter(NoRouter) _routeeProvider = routerConfig.createRouteeProvider(this, routeeProps) val r = routerConfig.createRoute(routeeProvider) // initial resize, before message send diff --git a/akka-actor/src/main/scala/akka/util/Reflect.scala b/akka-actor/src/main/scala/akka/util/Reflect.scala index 31d4b57bfd..4e67044bb0 100644 --- a/akka-actor/src/main/scala/akka/util/Reflect.scala +++ b/akka-actor/src/main/scala/akka/util/Reflect.scala @@ -5,6 +5,9 @@ package akka.util import scala.util.control.NonFatal import java.lang.reflect.Constructor import scala.collection.immutable +import java.lang.reflect.Type +import scala.annotation.tailrec +import java.lang.reflect.ParameterizedType /** * Collection of internal reflection utilities which may or may not be @@ -95,4 +98,20 @@ private[akka] object Reflect { * @return a function which when applied will create a new instance from the default constructor of the given class */ private[akka] def instantiator[T](clazz: Class[T]): () ⇒ T = () ⇒ instantiate(clazz) + + def findMarker(root: Class[_], marker: Class[_]): Type = { + @tailrec def rec(curr: Class[_]): Type = { + if (curr.getSuperclass != null && marker.isAssignableFrom(curr.getSuperclass)) rec(curr.getSuperclass) + else curr.getGenericInterfaces collectFirst { + case c: Class[_] if marker isAssignableFrom c ⇒ c + case t: ParameterizedType if marker isAssignableFrom t.getRawType.asInstanceOf[Class[_]] ⇒ t + } match { + case None ⇒ throw new IllegalArgumentException("cannot find [$marker] in ancestors of [$root]") + case Some(c: Class[_]) ⇒ if (c == marker) c else rec(c) + case Some(t: ParameterizedType) ⇒ if (t.getRawType == marker) t else rec(t.getRawType.asInstanceOf[Class[_]]) + case _ ⇒ ??? // cannot happen due to collectFirst + } + } + rec(root) + } } diff --git a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala index 27ad61e394..1c5875f401 100644 --- a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala +++ b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala @@ -8,7 +8,7 @@ import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue } import com.typesafe.config.Config import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } -import akka.dispatch.{ Envelope, MailboxType, MessageQueue, QueueBasedMessageQueue, UnboundedMessageQueueSemantics } +import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedQueueBasedMessageQueue } object PeekMailboxExtension extends ExtensionId[PeekMailboxExtension] with ExtensionIdProvider { def lookup = this @@ -53,7 +53,7 @@ class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends Ma } class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int) - extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + extends UnboundedQueueBasedMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() /* diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index c2b8c8bbf2..ea6f4b4eb6 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -1,14 +1,96 @@ .. _mailboxes-java: Mailboxes -========= +######### An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. +Mailbox Selection +================= + +Requiring a Message Queue Type for an Actor +------------------------------------------- + +It is possible to require a certain type of message queue for a certain type of actor +by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is +an example: + +.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor + +The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in +configuration like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: bounded-mailbox-config,required-mailbox-config + +Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded +mailbox. If the actor has a different mailbox configured in deployment, either directly or via +a dispatcher with a specified mailbox type, then that will override this mapping. + +.. note:: + + The type of the queue in the mailbox created for an actor will be checked against the required type in the + interface and if the queue doesn't implement the required type then actor creation will fail. + +Requiring a Message Queue Type for a Dispatcher +----------------------------------------------- + +A dispatcher may also have a requirement for the mailbox type used by the +actors running on it. An example is the BalancingDispatcher which requires a +message queue that is thread-safe for multiple concurrent consumers. Such a +requirement is formulated within the dispatcher configuration section like +this:: + + my-dispatcher { + mailbox-requirement = org.example.MyInterface + } + +The given requirement names a class or interface which will then be ensured to +be a supertype of the message queue’s implementation. In case of a +conflict—e.g. if the actor requires a mailbox type which does not satisfy this +requirement—then actor creation will fail. + +How the Mailbox Type is Selected +-------------------------------- + +When an actor is created, the :class:`ActorRefProvider` first determines the +dispatcher which will execute it. Then the mailbox is determined as follows: + +1. If the actor’s deployment configuration section contains a ``mailbox`` key + then that names a configuration section describing the mailbox type to be + used. + +2. If the actor’s ``Props`` contains a mailbox selection—i.e. ``withMailbox`` + was called on it—then that names a configuration section describing the + mailbox type to be used. + +3. If the dispatcher’s configuration section contains a ``mailbox-type`` key + the same section will be used to configure the mailbox type. + +4. If the actor requires a mailbox type as described above then the mapping for + that requirement will be used to determine the mailbox type to be used; if + that fails then the dispatcher’s requirement—if any—will be tried instead. + +5. If the dispatcher requires a mailbox type as described above then the + mapping for that requirement will be used to determine the mailbox type to + be used. + +6. The default mailbox ``akka.actor.default-mailbox`` will be used. + +Which Configuration is passed to the Mailbox Type +------------------------------------------------- + +Each mailbox type is implemented by a class which extends :class:`MailboxType` +and takes two constructor arguments: a :class:`ActorSystem.Settings` object and +a :class:`Config` section. The latter is computed by obtaining the named +configuration section from the actor system’s configuration, overriding its +``id`` key with the configuration path of the mailbox type and adding a +fall-back to the default mailbox configuration section. + Builtin Mailbox Implementations -------------------------------- +=============================== Akka comes shipped with a number of default mailbox implementations: @@ -47,7 +129,7 @@ Akka comes shipped with a number of default mailbox implementations: * Durable mailboxes, see :ref:`durable-mailboxes-java`. Mailbox configuration examples ------------------------------- +============================== How to create a PriorityMailbox: @@ -75,44 +157,8 @@ Or code like this: .. includecode:: code/docs/dispatcher/DispatcherDocTest.java#defining-mailbox-in-code -Requiring a message queue type for an Actor -------------------------------------------- - -It is possible to require a certain type of message queue for a certain type of actor -by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is -an example: - -.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor - -The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in -configuration like this: - -.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala - :include: bounded-mailbox-config,required-mailbox-config - -Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded -mailbox. If the actor has a different mailbox configured in deployment, either directly or via -a dispatcher with a specified mailbox type, then that will override this mapping. - -.. note:: - - The type of the queue in the mailbox created for an actor will be checked against the required type in the - interface and if the queue doesn't implement the required type then actor creation will fail. - - -Mailbox configuration precedence --------------------------------- - -The order of precedence for the mailbox type of an actor, where lower numbers override higher, is: - -1. Mailbox type configured in the deployment of the actor -2. Mailbox type configured on the dispatcher of the actor -3. Mailbox type configured on the Props of the actor -4. Mailbox type configured via message queue requirement - - Creating your own Mailbox type ------------------------------- +============================== An example is worth a thousand quacks: @@ -135,7 +181,7 @@ configuration, or the mailbox configuration. Special Semantics of ``system.actorOf`` ---------------------------------------- +======================================= In order to make ``system.actorOf`` both synchronous and non-blocking while keeping the return type :class:`ActorRef` (and the semantics that the returned diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index fe73e89f1d..56a5174fcb 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -85,12 +85,12 @@ Search Replace with If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``, and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.]`` method. -ActorContext & ActorRefFactory dispatcher +ActorContext & ActorRefFactory Dispatcher ========================================= The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``. -Removed fallback to default dispatcher +Removed Fallback to Default Dispatcher ====================================== If deploying an actor with a specific dispatcher, e.g. @@ -106,6 +106,17 @@ Akka 2.2 introduces the possibility to add dispatcher configuration to the The fallback was removed because in many cases its application was neither intended nor noticed. +Changed Configuration Section for Dispatcher & Mailbox +====================================================== + +The mailbox configuration defaults moved from ``akka.actor.default-dispatcher`` +to ``akka.actor.default-mailbox``. You will not have to change anything unless +your configuration overrides a setting in the default dispatcher section. + +The ``mailbox-type`` now requires a fully-qualified class name for the mailbox +to use. The special words ``bounded`` and ``unbounded`` are retained for a +migration period throughout the 2.2 series. + API changes to FSM and TestFSMRef ================================= @@ -348,3 +359,20 @@ message. Therefore the following is now safe:: context.stop(target) context.unwatch(target) +Dispatcher and Mailbox Implementation Changes +============================================= + +This point is only relevant if you have implemented a custom mailbox or +dispatcher and want to migrate that to Akka 2.2. The constructor signature of +:class:`MessageDispatcher` has changed, it now takes a +:class:`MessageDispatcherConfigurator` instead of +:class:`DispatcherPrerequisites`. Its :class:`createMailbox` method now +receives one more argument of type :class:`MailboxType`, which is the mailbox +type determined by the :class:`ActorRefProvider` for the actor based on its +deployment. The :class:`DispatcherPrerequisites` now include a +:class:`Mailboxes` instance which can be used for resolving mailbox references. +The constructor signatures of the built-in dispatcher implementation have been +adapted accordingly. The traits describing mailbox semantics have been +separated from the implementation traits. + + diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index 3cbaefcacd..f007722a7f 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -203,8 +203,7 @@ object DispatcherDocSpec { import akka.dispatch.{ Envelope, MessageQueue, - QueueBasedMessageQueue, - UnboundedMessageQueueSemantics + UnboundedQueueBasedMessageQueue } // This constructor signature must exist, it will be called by Akka @@ -213,7 +212,7 @@ object DispatcherDocSpec { // The create method is called to create the MessageQueue final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = - new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + new UnboundedQueueBasedMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } diff --git a/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala index c71164d427..96f62a40cb 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala @@ -40,7 +40,7 @@ class RouterViaProgramDocSpec extends AkkaSpec with ImplicitSender { val actor3 = system.actorOf(Props[ExampleActor1], "actor3") val routees = Vector[String]("/user/actor1", "/user/actor2", "/user/actor3") val router = system.actorOf( - Props().withRouter(RoundRobinRouter(routees = routees))) + Props.empty.withRouter(RoundRobinRouter(routees = routees))) //#programmaticRoutingRouteePaths 1 to 6 foreach { i ⇒ router ! Message1(i) } val received = receiveN(6, 5.seconds.dilated) diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index ac933cb781..ccd1c6076d 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -1,14 +1,96 @@ .. _mailboxes-scala: Mailboxes -========= +######### An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. +Mailbox Selection +================= + +Requiring a Message Queue Type for an Actor +------------------------------------------- + +It is possible to require a certain type of message queue for a certain type of actor +by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is +an example: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class + +The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in +configuration like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala + :include: bounded-mailbox-config,required-mailbox-config + +Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded +mailbox. If the actor has a different mailbox configured in deployment, either directly or via +a dispatcher with a specified mailbox type, then that will override this mapping. + +.. note:: + + The type of the queue in the mailbox created for an actor will be checked against the required type in the + trait and if the queue doesn't implement the required type then actor creation will fail. + +Requiring a Message Queue Type for a Dispatcher +----------------------------------------------- + +A dispatcher may also have a requirement for the mailbox type used by the +actors running on it. An example is the BalancingDispatcher which requires a +message queue that is thread-safe for multiple concurrent consumers. Such a +requirement is formulated within the dispatcher configuration section like +this:: + + my-dispatcher { + mailbox-requirement = org.example.MyInterface + } + +The given requirement names a class or interface which will then be ensured to +be a supertype of the message queue’s implementation. In case of a +conflict—e.g. if the actor requires a mailbox type which does not satisfy this +requirement—then actor creation will fail. + +How the Mailbox Type is Selected +-------------------------------- + +When an actor is created, the :class:`ActorRefProvider` first determines the +dispatcher which will execute it. Then the mailbox is determined as follows: + +1. If the actor’s deployment configuration section contains a ``mailbox`` key + then that names a configuration section describing the mailbox type to be + used. + +2. If the actor’s ``Props`` contains a mailbox selection—i.e. ``withMailbox`` + was called on it—then that names a configuration section describing the + mailbox type to be used. + +3. If the dispatcher’s configuration section contains a ``mailbox-type`` key + the same section will be used to configure the mailbox type. + +4. If the actor requires a mailbox type as described above then the mapping for + that requirement will be used to determine the mailbox type to be used; if + that fails then the dispatcher’s requirement—if any—will be tried instead. + +5. If the dispatcher requires a mailbox type as described above then the + mapping for that requirement will be used to determine the mailbox type to + be used. + +6. The default mailbox ``akka.actor.default-mailbox`` will be used. + +Which Configuration is passed to the Mailbox Type +------------------------------------------------- + +Each mailbox type is implemented by a class which extends :class:`MailboxType` +and takes two constructor arguments: a :class:`ActorSystem.Settings` object and +a :class:`Config` section. The latter is computed by obtaining the named +configuration section from the actor system’s configuration, overriding its +``id`` key with the configuration path of the mailbox type and adding a +fall-back to the default mailbox configuration section. + Builtin implementations ------------------------ +======================= Akka comes shipped with a number of default mailbox implementations: @@ -47,7 +129,7 @@ Akka comes shipped with a number of default mailbox implementations: * Durable mailboxes, see :ref:`durable-mailboxes-scala`. Mailbox configuration examples ------------------------------- +============================== How to create a PriorityMailbox: @@ -75,44 +157,8 @@ Or code like this: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-code -Requiring a message queue type for an Actor -------------------------------------------- - -It is possible to require a certain type of message queue for a certain type of actor -by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is -an example: - -.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class - -The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in -configuration like this: - -.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala - :include: bounded-mailbox-config,required-mailbox-config - -Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded -mailbox. If the actor has a different mailbox configured in deployment, either directly or via -a dispatcher with a specified mailbox type, then that will override this mapping. - -.. note:: - - The type of the queue in the mailbox created for an actor will be checked against the required type in the - trait and if the queue doesn't implement the required type then actor creation will fail. - - -Mailbox configuration precedence --------------------------------- - -The order of precedence for the mailbox type of an actor, where lower numbers override higher, is: - -1. Mailbox type configured in the deployment of the actor -2. Mailbox type configured on the dispatcher of the actor -3. Mailbox type configured on the Props of the actor -4. Mailbox type configured via message queue requirement - - Creating your own Mailbox type ------------------------------- +============================== An example is worth a thousand quacks: @@ -133,7 +179,7 @@ configuration, or the mailbox configuration. Special Semantics of ``system.actorOf`` ---------------------------------------- +======================================= In order to make ``system.actorOf`` both synchronous and non-blocking while keeping the return type :class:`ActorRef` (and the semantics that the returned diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2245542b4e..aa971b3b43 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -270,8 +270,16 @@ private[akka] class RemoteActorRefProvider( if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else if (props.deploy.scope == LocalScope) { - throw new IllegalArgumentException(s"configuration requested remote deployment for local-only Props at [$path]") + throw new ConfigurationException(s"configuration requested remote deployment for local-only Props at [$path]") } else try { + try { + // for consistency we check configuration of dispatcher and mailbox locally + val dispatcher = system.dispatchers.lookup(props.dispatcher) + system.mailboxes.getMailboxType(props, dispatcher.configurator.config) + } catch { + case NonFatal(e) ⇒ throw new ConfigurationException( + s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e) + } val localAddress = transport.localAddressForRemote(addr) val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). withUid(path.uid) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 4d758ef06a..6ff29c1923 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -7,6 +7,7 @@ import akka.testkit._ import akka.actor._ import akka.routing._ import com.typesafe.config._ +import akka.ConfigurationException object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" @@ -47,7 +48,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { } "reject remote deployment when the source requires LocalScope" in { - intercept[IllegalArgumentException] { + intercept[ConfigurationException] { system.actorOf(Props.empty.withDeploy(Deploy.local), "service2") }.getMessage must be === "configuration requested remote deployment for local-only Props at [akka://RemoteDeployerSpec/user/service2]" } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 0b5edad20f..78607fcf16 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -126,18 +126,16 @@ object CallingThreadDispatcher { * * @since 1.1 */ -class CallingThreadDispatcher( - _prerequisites: DispatcherPrerequisites, - val mailboxType: MailboxType, - val mailboxTypeConfigured: Boolean) extends MessageDispatcher(_prerequisites) { +class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher(_configurator) { import CallingThreadDispatcher._ + import configurator.prerequisites._ - val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher") + val log = akka.event.Logging(eventStream, "CallingThreadDispatcher") override def id: String = Id - protected[akka] override def createMailbox(actor: akka.actor.Cell) = - new CallingThreadMailbox(actor, getMailboxType(actor, mailboxType, mailboxTypeConfigured)) + protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType) = + new CallingThreadMailbox(actor, mailboxType) protected[akka] override def shutdown() {} @@ -303,7 +301,7 @@ class CallingThreadDispatcher( class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance = new CallingThreadDispatcher(prerequisites, mailboxType(), mailBoxTypeConfigured) + private val instance = new CallingThreadDispatcher(this) override def dispatcher(): MessageDispatcher = instance } @@ -342,7 +340,7 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT val qq = queue CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) super.cleanUp() - qq.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) + qq.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue) q.remove() } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index cb95ba299c..a84247b901 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -20,11 +20,15 @@ import akka.pattern.ask */ class TestActorRef[T <: Actor]( _system: ActorSystem, - _prerequisites: DispatcherPrerequisites, _props: Props, _supervisor: ActorRef, name: String) extends { + val props = + _props.withDispatcher( + if (_props.deploy.dispatcher == Deploy.NoDispatcherGiven) CallingThreadDispatcher.Id + else _props.dispatcher) + val dispatcher = _system.dispatchers.lookup(props.dispatcher) private val disregard = _supervisor match { case l: LocalActorRef ⇒ l.underlying.reserveChild(name) case r: RepointableActorRef ⇒ r.underlying match { @@ -36,9 +40,9 @@ class TestActorRef[T <: Actor]( } } with LocalActorRef( _system.asInstanceOf[ActorSystemImpl], - _props.withDispatcher( - if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id - else _props.dispatcher), + props, + dispatcher, + _system.mailboxes.getMailboxType(props, dispatcher.configurator.config), _supervisor.asInstanceOf[InternalActorRef], _supervisor.path / name) { @@ -47,8 +51,9 @@ class TestActorRef[T <: Actor]( import TestActorRef.InternalGetActor - protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = - new ActorCell(system, ref, props, supervisor) { + protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, + dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell = + new ActorCell(system, ref, props, dispatcher, supervisor) { override def autoReceiveMessage(msg: Envelope) { msg.message match { case InternalGetActor ⇒ sender ! actor @@ -132,7 +137,8 @@ object TestActorRef { apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name) def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = { - new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name) + val sysImpl = system.asInstanceOf[ActorSystemImpl] + new TestActorRef(sysImpl, props, supervisor.asInstanceOf[InternalActorRef], name) } def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 4c285e5fe4..67b5a2c068 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -8,6 +8,8 @@ import akka.actor._ import scala.concurrent.duration.Duration import akka.dispatch.DispatcherPrerequisites import scala.concurrent.duration.FiniteDuration +import akka.dispatch.MessageDispatcher +import akka.dispatch.MailboxType /** * This is a specialised form of the TestActorRef with support for querying and @@ -35,11 +37,10 @@ import scala.concurrent.duration.FiniteDuration */ class TestFSMRef[S, D, T <: Actor]( system: ActorSystem, - _prerequisites: DispatcherPrerequisites, props: Props, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D]) - extends TestActorRef(system, _prerequisites, props, supervisor, name) { + extends TestActorRef(system, props, supervisor, name) { private def fsm: T = underlyingActor @@ -93,11 +94,11 @@ object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = { val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName) + new TestFSMRef(impl, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName) } def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = { val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], name) + new TestFSMRef(impl, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], name) } }