diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 9544ff798c..b2545eabc3 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -293,7 +293,8 @@ akka { } default-mailbox { - # FQCN of the MailboxType. The Class of the FQCN must have a public constructor with + # FQCN of the MailboxType. The Class of the FQCN must have a public + # constructor with # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "akka.dispatch.UnboundedMailbox" @@ -301,7 +302,7 @@ akka { # capacity. The provided value must be positive. # NOTICE: # Up to version 2.1 the mailbox type was determined based on this setting; - # this is no longer the case, the type must explicitly name a bounded mailbox. + # this is no longer the case, the type must explicitly be a bounded mailbox. mailbox-capacity = 1000 # If the mailbox is bounded then this is the timeout for enqueueing diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 00ba396ce0..ceacb110f8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -760,7 +760,7 @@ private[akka] class LocalActorRefProvider private[akka] ( } catch { case NonFatal(e) ⇒ throw new ConfigurationException( s"configuration problem while creating [$path] with router dispatcher [${routerProps.dispatcher}] and mailbox [${routerProps.mailbox}] " + - s" and routee dispatcher [${routeeProps.dispatcher}] and mailbox [${routeeProps.mailbox}]", e) + s"and routee dispatcher [${routeeProps.dispatcher}] and mailbox [${routeeProps.mailbox}]", e) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0537db7fec..cb926d0d1c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -544,25 +544,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def deadLetters: ActorRef = provider.deadLetters - val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { - def enqueue(receiver: ActorRef, envelope: Envelope): Unit = - deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) - def dequeue() = null - def hasMessages = false - def numberOfMessages = 0 - def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () - }) { - becomeClosed() - def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = - deadLetters ! DeadLetter(handle, receiver, receiver) - def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil - def hasSystemMessages = false - } - - val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess) + val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters) val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings, mailboxes)) + threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes)) val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 9fe0ed4cf7..6c0b9971e1 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -59,7 +59,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ val req = system.mailboxes.getRequiredType(actorClass) if (req isInstance mbox.messageQueue) Create(None) else Create(Some(ActorInitializationException(self, - s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass}]"))) + s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass.getName}]"))) case _ ⇒ Create(None) } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 656d1c5f68..11d4fc075b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -87,7 +87,10 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } - import configurator.prerequisites._ + import configurator.prerequisites + + val mailboxes = prerequisites.mailboxes + val eventStream = prerequisites.eventStream @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! @@ -168,7 +171,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator private def scheduleShutdownAction(): Unit = { // IllegalStateException is thrown if scheduler has been shutdown - try scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { + try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { override def execute(runnable: Runnable): Unit = runnable.run() override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) }) catch { @@ -196,7 +199,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator protected[akka] def unregister(actor: ActorCell) { if (debug) actors.remove(this, actor.self) addInhabitants(-1) - val mailBox = actor.swapMailbox(deadLetterMailbox) + val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox) mailBox.becomeClosed() mailBox.cleanUp() } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index b5fa8ed203..b51b27c0b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -55,7 +55,7 @@ class BalancingDispatcher( private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue) extends Mailbox(_messageQueue) with DefaultSystemMessageQueue { override def cleanUp(): Unit = { - val dlq = system.deadLetterMailbox + val dlq = mailboxes.deadLetterMailbox //Don't call the original implementation of this since it scraps all messages, and we don't want to do that var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage)) while (messages.nonEmpty) { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index a7f657dcc9..99fc8acc62 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -19,7 +19,6 @@ import akka.actor.Deploy trait DispatcherPrerequisites { def threadFactory: ThreadFactory def eventStream: EventStream - def deadLetterMailbox: Mailbox def scheduler: Scheduler def dynamicAccess: DynamicAccess def settings: ActorSystem.Settings @@ -32,7 +31,6 @@ trait DispatcherPrerequisites { private[akka] case class DefaultDispatcherPrerequisites( val threadFactory: ThreadFactory, val eventStream: EventStream, - val deadLetterMailbox: Mailbox, val scheduler: Scheduler, val dynamicAccess: DynamicAccess, val settings: ActorSystem.Settings, @@ -188,11 +186,14 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi override def dispatcher(): MessageDispatcher = instance } -object BalancingDispatcherConfigurator { +/** + * INTERNAL API + */ +private[akka] object BalancingDispatcherConfigurator { private val defaultRequirement = ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics") def amendConfig(config: Config): Config = - if (config.getString("mailbox-requirement") != "") config + if (config.getString("mailbox-requirement") != Mailboxes.NoMailboxRequirement) config else defaultRequirement.withFallback(config) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index e191a7a7f7..ebbb525830 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -269,7 +269,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * if we closed the mailbox, we must dump the remaining system messages * to deadLetters (this is essential for DeathWatch) */ - val dlm = actor.systemImpl.deadLetterMailbox + val dlm = actor.dispatcher.mailboxes.deadLetterMailbox while (messageList.nonEmpty) { val msg = messageList.head messageList = messageList.tail @@ -295,7 +295,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ protected[dispatch] def cleanUp(): Unit = if (actor ne null) { // actor is null for the deadLetterMailbox - val dlm = actor.systemImpl.deadLetterMailbox + val dlm = actor.dispatcher.mailboxes.deadLetterMailbox var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) while (messageList.nonEmpty) { // message must be “virgin” before being able to systemEnqueue again @@ -306,7 +306,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() - messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) + messageQueue.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue) } } @@ -394,7 +394,7 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ if (Mailbox.debug) println(receiver + " having enqueued " + message) val currentList = systemQueueGet if (currentList.head == NoMessage) { - if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message) + if (actor ne null) actor.dispatcher.mailboxes.deadLetterMailbox.systemEnqueue(receiver, message) } else { if (!systemQueuePut(currentList, message :: currentList)) { message.unlink() diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala index 1bff1edc05..0a3a3e9fd9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -18,16 +18,40 @@ import akka.actor.Deploy import scala.util.Try import scala.util.Failure import scala.util.control.NonFatal -import akka.dispatch.DispatcherPrerequisites +import akka.actor.ActorRef +import akka.actor.DeadLetter +import akka.dispatch.sysmsg.SystemMessage +import akka.dispatch.sysmsg.LatestFirstSystemMessageList +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import akka.dispatch.sysmsg.SystemMessageList object Mailboxes { final val DefaultMailboxId = "akka.actor.default-mailbox" + final val NoMailboxRequirement = "" } private[akka] class Mailboxes( val settings: ActorSystem.Settings, val eventStream: EventStream, - dynamicAccess: DynamicAccess) { + dynamicAccess: DynamicAccess, + deadLetters: ActorRef) { + + import Mailboxes._ + + val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { + def enqueue(receiver: ActorRef, envelope: Envelope): Unit = + deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) + def dequeue() = null + def hasMessages = false + def numberOfMessages = 0 + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () + }) { + becomeClosed() + def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = + deadLetters ! DeadLetter(handle, receiver, receiver) + def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil + def hasSystemMessages = false + } private val mailboxTypeConfigurators = new ConcurrentHashMap[String, MailboxType] @@ -72,8 +96,8 @@ private[akka] class Mailboxes( private var mailboxSizeWarningIssued = false def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match { - case "" ⇒ classOf[MessageQueue] - case x ⇒ dynamicAccess.getClassFor[AnyRef](x).get + case NoMailboxRequirement ⇒ classOf[MessageQueue] + case x ⇒ dynamicAccess.getClassFor[AnyRef](x).get } def getProducedMessageQueueType(mailboxType: MailboxType): Class[_] = { @@ -136,7 +160,7 @@ private[akka] class Mailboxes( } else if (hasMailboxRequirement) { verifyRequirements(lookupByQueueType(mailboxRequirement)) } else { - verifyRequirements(lookup(Mailboxes.DefaultMailboxId)) + verifyRequirements(lookup(DefaultMailboxId)) } } @@ -156,6 +180,7 @@ private[akka] class Mailboxes( case null ⇒ // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. val newConfigurator = id match { + // TODO RK remove these two for Akka 2.3 case "unbounded" ⇒ UnboundedMailbox() case "bounded" ⇒ new BoundedMailbox(settings, config(id)) case _ ⇒ @@ -184,7 +209,7 @@ private[akka] class Mailboxes( } } - private val defaultMailboxConfig = settings.config.getConfig(Mailboxes.DefaultMailboxId) + private val defaultMailboxConfig = settings.config.getConfig(DefaultMailboxId) //INTERNAL API private def config(id: String): Config = { diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index b92b6fdbbe..ea6f4b4eb6 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -79,8 +79,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows: 6. The default mailbox ``akka.actor.default-mailbox`` will be used. -Which Configuration is pass to the Mailbox Type ------------------------------------------------ +Which Configuration is passed to the Mailbox Type +------------------------------------------------- Each mailbox type is implemented by a class which extends :class:`MailboxType` and takes two constructor arguments: a :class:`ActorSystem.Settings` object and diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index 5fe450c20f..56a5174fcb 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -85,12 +85,12 @@ Search Replace with If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``, and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.]`` method. -ActorContext & ActorRefFactory dispatcher +ActorContext & ActorRefFactory Dispatcher ========================================= The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``. -Removed fallback to default dispatcher +Removed Fallback to Default Dispatcher ====================================== If deploying an actor with a specific dispatcher, e.g. @@ -106,6 +106,17 @@ Akka 2.2 introduces the possibility to add dispatcher configuration to the The fallback was removed because in many cases its application was neither intended nor noticed. +Changed Configuration Section for Dispatcher & Mailbox +====================================================== + +The mailbox configuration defaults moved from ``akka.actor.default-dispatcher`` +to ``akka.actor.default-mailbox``. You will not have to change anything unless +your configuration overrides a setting in the default dispatcher section. + +The ``mailbox-type`` now requires a fully-qualified class name for the mailbox +to use. The special words ``bounded`` and ``unbounded`` are retained for a +migration period throughout the 2.2 series. + API changes to FSM and TestFSMRef ================================= diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index 7e613f3df5..ccd1c6076d 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -79,8 +79,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows: 6. The default mailbox ``akka.actor.default-mailbox`` will be used. -Which Configuration is pass to the Mailbox Type ------------------------------------------------ +Which Configuration is passed to the Mailbox Type +------------------------------------------------- Each mailbox type is implemented by a class which extends :class:`MailboxType` and takes two constructor arguments: a :class:`ActorSystem.Settings` object and diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 03ab9f7b64..78607fcf16 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -340,7 +340,7 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT val qq = queue CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) super.cleanUp() - qq.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) + qq.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue) q.remove() } }