final touch to actor start-up sequence

split systemDispatch(Create()) into systemEnqueue(Create()) directly
after createMailbox and registerForExecution from within
Dispatcher.attach() (resp. CallingThreadDispatcher.register() does its
own thing)
This commit is contained in:
Roland 2012-02-13 15:33:31 +01:00
parent bb40c1ae30
commit 7c57a9d60e
4 changed files with 31 additions and 32 deletions

View file

@ -285,20 +285,19 @@ private[akka] class ActorCell(
final def isTerminated: Boolean = mailbox.isClosed
final def start(): Unit = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
mailbox = dispatcher.createMailbox(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
/*
* attach before submitting the mailbox for the first time, because
* otherwise the actor could already be dead before the dispatcher is
* informed of its existence (with reversed attach/detach sequence).
*/
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Create())
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅

View file

@ -185,9 +185,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
def id: String
/**
* Attaches the specified actor instance to this dispatcher
* Attaches the specified actor instance to this dispatcher, which includes
* scheduling it to run for the first time (Create() is expected to have
* been enqueued by the ActorCell upon mailbox creation).
*/
final def attach(actor: ActorCell): Unit = register(actor)
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
}
/**
* Detaches the specified actor instance from this dispatcher
@ -243,7 +248,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
() if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
/**
* If you override it, you must call it. But only ever once. See "attach" for only invocation
* If you override it, you must call it. But only ever once. See "attach" for only invocation.
*/
protected[akka] def register(actor: ActorCell) {
inhabitantsUpdater.incrementAndGet(this)

View file

@ -110,40 +110,24 @@ class BalancingDispatcher(
}
}
protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit =
/*
* need to filter out Create() messages here because BalancingDispatcher
* already enqueues this within register(), which is called first by the
* ActorCell.
*/
invocation match {
case Create()
case x super.systemDispatch(receiver, invocation)
}
protected[akka] override def register(actor: ActorCell) = {
val mbox = actor.mailbox
mbox.systemEnqueue(actor.self, Create())
// must make sure that Create() is the first message enqueued in this mailbox
super.register(actor)
buddies.add(actor)
// must make sure that buddy-add is executed before the actor has had a chance to die
registerForExecution(mbox, false, true)
}
protected[akka] override def unregister(actor: ActorCell) = {
buddies.remove(actor)
super.unregister(actor)
if (messageQueue.hasMessages) registerOne()
if (messageQueue.hasMessages) scheduleOne()
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
if (!registerForExecution(receiver.mailbox, false, false) &&
buddyWakeupThreshold >= 0 &&
_pressure.get >= buddyWakeupThreshold) registerOne()
_pressure.get >= buddyWakeupThreshold) scheduleOne()
}
@tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit =
if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i)
@tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit =
if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i)
}

View file

@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
@ -132,6 +132,17 @@ class CallingThreadDispatcher(
protected[akka] override def shutdownTimeout = 1 second
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)
actor.mailbox match {
case mbox: CallingThreadMailbox
val queue = mbox.queue
queue.enter
runQueue(mbox, queue)
case x throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass)
}
}
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn