move deadLetterMailbox into Mailboxes and fix review comments, see #3342

This commit is contained in:
Roland 2013-06-03 11:41:11 +02:00
parent 141656a054
commit 20eb28a03c
13 changed files with 72 additions and 46 deletions

View file

@ -293,7 +293,8 @@ akka {
} }
default-mailbox { default-mailbox {
# FQCN of the MailboxType. The Class of the FQCN must have a public constructor with # FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.UnboundedMailbox" mailbox-type = "akka.dispatch.UnboundedMailbox"
@ -301,7 +302,7 @@ akka {
# capacity. The provided value must be positive. # capacity. The provided value must be positive.
# NOTICE: # NOTICE:
# Up to version 2.1 the mailbox type was determined based on this setting; # Up to version 2.1 the mailbox type was determined based on this setting;
# this is no longer the case, the type must explicitly name a bounded mailbox. # this is no longer the case, the type must explicitly be a bounded mailbox.
mailbox-capacity = 1000 mailbox-capacity = 1000
# If the mailbox is bounded then this is the timeout for enqueueing # If the mailbox is bounded then this is the timeout for enqueueing

View file

@ -760,7 +760,7 @@ private[akka] class LocalActorRefProvider private[akka] (
} catch { } catch {
case NonFatal(e) throw new ConfigurationException( case NonFatal(e) throw new ConfigurationException(
s"configuration problem while creating [$path] with router dispatcher [${routerProps.dispatcher}] and mailbox [${routerProps.mailbox}] " + s"configuration problem while creating [$path] with router dispatcher [${routerProps.dispatcher}] and mailbox [${routerProps.mailbox}] " +
s" and routee dispatcher [${routeeProps.dispatcher}] and mailbox [${routeeProps.mailbox}]", e) s"and routee dispatcher [${routeeProps.dispatcher}] and mailbox [${routeeProps.mailbox}]", e)
} }
} }
} }

View file

@ -544,25 +544,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def deadLetters: ActorRef = provider.deadLetters def deadLetters: ActorRef = provider.deadLetters
val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)
def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
def dequeue() = null
def hasMessages = false
def numberOfMessages = 0
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
}) {
becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil
def hasSystemMessages = false
}
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess)
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings, mailboxes)) threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes))
val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher

View file

@ -59,7 +59,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
val req = system.mailboxes.getRequiredType(actorClass) val req = system.mailboxes.getRequiredType(actorClass)
if (req isInstance mbox.messageQueue) Create(None) if (req isInstance mbox.messageQueue) Create(None)
else Create(Some(ActorInitializationException(self, else Create(Some(ActorInitializationException(self,
s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass}]"))) s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass.getName}]")))
case _ Create(None) case _ Create(None)
} }

View file

@ -87,7 +87,10 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
import MessageDispatcher._ import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
import configurator.prerequisites._ import configurator.prerequisites
val mailboxes = prerequisites.mailboxes
val eventStream = prerequisites.eventStream
@volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
@volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
@ -168,7 +171,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
private def scheduleShutdownAction(): Unit = { private def scheduleShutdownAction(): Unit = {
// IllegalStateException is thrown if scheduler has been shutdown // IllegalStateException is thrown if scheduler has been shutdown
try scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run() override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t)
}) catch { }) catch {
@ -196,7 +199,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[akka] def unregister(actor: ActorCell) { protected[akka] def unregister(actor: ActorCell) {
if (debug) actors.remove(this, actor.self) if (debug) actors.remove(this, actor.self)
addInhabitants(-1) addInhabitants(-1)
val mailBox = actor.swapMailbox(deadLetterMailbox) val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox)
mailBox.becomeClosed() mailBox.becomeClosed()
mailBox.cleanUp() mailBox.cleanUp()
} }

View file

@ -55,7 +55,7 @@ class BalancingDispatcher(
private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue) private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue)
extends Mailbox(_messageQueue) with DefaultSystemMessageQueue { extends Mailbox(_messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = { override def cleanUp(): Unit = {
val dlq = system.deadLetterMailbox val dlq = mailboxes.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that //Don't call the original implementation of this since it scraps all messages, and we don't want to do that
var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage)) var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messages.nonEmpty) { while (messages.nonEmpty) {

View file

@ -19,7 +19,6 @@ import akka.actor.Deploy
trait DispatcherPrerequisites { trait DispatcherPrerequisites {
def threadFactory: ThreadFactory def threadFactory: ThreadFactory
def eventStream: EventStream def eventStream: EventStream
def deadLetterMailbox: Mailbox
def scheduler: Scheduler def scheduler: Scheduler
def dynamicAccess: DynamicAccess def dynamicAccess: DynamicAccess
def settings: ActorSystem.Settings def settings: ActorSystem.Settings
@ -32,7 +31,6 @@ trait DispatcherPrerequisites {
private[akka] case class DefaultDispatcherPrerequisites( private[akka] case class DefaultDispatcherPrerequisites(
val threadFactory: ThreadFactory, val threadFactory: ThreadFactory,
val eventStream: EventStream, val eventStream: EventStream,
val deadLetterMailbox: Mailbox,
val scheduler: Scheduler, val scheduler: Scheduler,
val dynamicAccess: DynamicAccess, val dynamicAccess: DynamicAccess,
val settings: ActorSystem.Settings, val settings: ActorSystem.Settings,
@ -188,11 +186,14 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
override def dispatcher(): MessageDispatcher = instance override def dispatcher(): MessageDispatcher = instance
} }
object BalancingDispatcherConfigurator { /**
* INTERNAL API
*/
private[akka] object BalancingDispatcherConfigurator {
private val defaultRequirement = private val defaultRequirement =
ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics") ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics")
def amendConfig(config: Config): Config = def amendConfig(config: Config): Config =
if (config.getString("mailbox-requirement") != "") config if (config.getString("mailbox-requirement") != Mailboxes.NoMailboxRequirement) config
else defaultRequirement.withFallback(config) else defaultRequirement.withFallback(config)
} }

View file

@ -269,7 +269,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* if we closed the mailbox, we must dump the remaining system messages * if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch) * to deadLetters (this is essential for DeathWatch)
*/ */
val dlm = actor.systemImpl.deadLetterMailbox val dlm = actor.dispatcher.mailboxes.deadLetterMailbox
while (messageList.nonEmpty) { while (messageList.nonEmpty) {
val msg = messageList.head val msg = messageList.head
messageList = messageList.tail messageList = messageList.tail
@ -295,7 +295,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/ */
protected[dispatch] def cleanUp(): Unit = protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox val dlm = actor.dispatcher.mailboxes.deadLetterMailbox
var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messageList.nonEmpty) { while (messageList.nonEmpty) {
// message must be virgin before being able to systemEnqueue again // message must be virgin before being able to systemEnqueue again
@ -306,7 +306,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
} }
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) messageQueue.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue)
} }
} }
@ -394,7 +394,7 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
if (Mailbox.debug) println(receiver + " having enqueued " + message) if (Mailbox.debug) println(receiver + " having enqueued " + message)
val currentList = systemQueueGet val currentList = systemQueueGet
if (currentList.head == NoMessage) { if (currentList.head == NoMessage) {
if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message) if (actor ne null) actor.dispatcher.mailboxes.deadLetterMailbox.systemEnqueue(receiver, message)
} else { } else {
if (!systemQueuePut(currentList, message :: currentList)) { if (!systemQueuePut(currentList, message :: currentList)) {
message.unlink() message.unlink()

View file

@ -18,16 +18,40 @@ import akka.actor.Deploy
import scala.util.Try import scala.util.Try
import scala.util.Failure import scala.util.Failure
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.dispatch.DispatcherPrerequisites import akka.actor.ActorRef
import akka.actor.DeadLetter
import akka.dispatch.sysmsg.SystemMessage
import akka.dispatch.sysmsg.LatestFirstSystemMessageList
import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
import akka.dispatch.sysmsg.SystemMessageList
object Mailboxes { object Mailboxes {
final val DefaultMailboxId = "akka.actor.default-mailbox" final val DefaultMailboxId = "akka.actor.default-mailbox"
final val NoMailboxRequirement = ""
} }
private[akka] class Mailboxes( private[akka] class Mailboxes(
val settings: ActorSystem.Settings, val settings: ActorSystem.Settings,
val eventStream: EventStream, val eventStream: EventStream,
dynamicAccess: DynamicAccess) { dynamicAccess: DynamicAccess,
deadLetters: ActorRef) {
import Mailboxes._
val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue {
def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
def dequeue() = null
def hasMessages = false
def numberOfMessages = 0
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
}) {
becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil
def hasSystemMessages = false
}
private val mailboxTypeConfigurators = new ConcurrentHashMap[String, MailboxType] private val mailboxTypeConfigurators = new ConcurrentHashMap[String, MailboxType]
@ -72,8 +96,8 @@ private[akka] class Mailboxes(
private var mailboxSizeWarningIssued = false private var mailboxSizeWarningIssued = false
def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match { def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match {
case "" classOf[MessageQueue] case NoMailboxRequirement classOf[MessageQueue]
case x dynamicAccess.getClassFor[AnyRef](x).get case x dynamicAccess.getClassFor[AnyRef](x).get
} }
def getProducedMessageQueueType(mailboxType: MailboxType): Class[_] = { def getProducedMessageQueueType(mailboxType: MailboxType): Class[_] = {
@ -136,7 +160,7 @@ private[akka] class Mailboxes(
} else if (hasMailboxRequirement) { } else if (hasMailboxRequirement) {
verifyRequirements(lookupByQueueType(mailboxRequirement)) verifyRequirements(lookupByQueueType(mailboxRequirement))
} else { } else {
verifyRequirements(lookup(Mailboxes.DefaultMailboxId)) verifyRequirements(lookup(DefaultMailboxId))
} }
} }
@ -156,6 +180,7 @@ private[akka] class Mailboxes(
case null case null
// It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
val newConfigurator = id match { val newConfigurator = id match {
// TODO RK remove these two for Akka 2.3
case "unbounded" UnboundedMailbox() case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(settings, config(id)) case "bounded" new BoundedMailbox(settings, config(id))
case _ case _
@ -184,7 +209,7 @@ private[akka] class Mailboxes(
} }
} }
private val defaultMailboxConfig = settings.config.getConfig(Mailboxes.DefaultMailboxId) private val defaultMailboxConfig = settings.config.getConfig(DefaultMailboxId)
//INTERNAL API //INTERNAL API
private def config(id: String): Config = { private def config(id: String): Config = {

View file

@ -79,8 +79,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows:
6. The default mailbox ``akka.actor.default-mailbox`` will be used. 6. The default mailbox ``akka.actor.default-mailbox`` will be used.
Which Configuration is pass to the Mailbox Type Which Configuration is passed to the Mailbox Type
----------------------------------------------- -------------------------------------------------
Each mailbox type is implemented by a class which extends :class:`MailboxType` Each mailbox type is implemented by a class which extends :class:`MailboxType`
and takes two constructor arguments: a :class:`ActorSystem.Settings` object and and takes two constructor arguments: a :class:`ActorSystem.Settings` object and

View file

@ -85,12 +85,12 @@ Search Replace with
If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``, If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``,
and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.<collection-type>]`` method. and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.<collection-type>]`` method.
ActorContext & ActorRefFactory dispatcher ActorContext & ActorRefFactory Dispatcher
========================================= =========================================
The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``. The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``.
Removed fallback to default dispatcher Removed Fallback to Default Dispatcher
====================================== ======================================
If deploying an actor with a specific dispatcher, e.g. If deploying an actor with a specific dispatcher, e.g.
@ -106,6 +106,17 @@ Akka 2.2 introduces the possibility to add dispatcher configuration to the
The fallback was removed because in many cases its application was neither The fallback was removed because in many cases its application was neither
intended nor noticed. intended nor noticed.
Changed Configuration Section for Dispatcher & Mailbox
======================================================
The mailbox configuration defaults moved from ``akka.actor.default-dispatcher``
to ``akka.actor.default-mailbox``. You will not have to change anything unless
your configuration overrides a setting in the default dispatcher section.
The ``mailbox-type`` now requires a fully-qualified class name for the mailbox
to use. The special words ``bounded`` and ``unbounded`` are retained for a
migration period throughout the 2.2 series.
API changes to FSM and TestFSMRef API changes to FSM and TestFSMRef
================================= =================================

View file

@ -79,8 +79,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows:
6. The default mailbox ``akka.actor.default-mailbox`` will be used. 6. The default mailbox ``akka.actor.default-mailbox`` will be used.
Which Configuration is pass to the Mailbox Type Which Configuration is passed to the Mailbox Type
----------------------------------------------- -------------------------------------------------
Each mailbox type is implemented by a class which extends :class:`MailboxType` Each mailbox type is implemented by a class which extends :class:`MailboxType`
and takes two constructor arguments: a :class:`ActorSystem.Settings` object and and takes two constructor arguments: a :class:`ActorSystem.Settings` object and

View file

@ -340,7 +340,7 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT
val qq = queue val qq = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq)
super.cleanUp() super.cleanUp()
qq.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) qq.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue)
q.remove() q.remove()
} }
} }