diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9268406086..c22529b2c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -285,20 +285,19 @@ private[akka] class ActorCell( final def isTerminated: Boolean = mailbox.isClosed final def start(): Unit = { + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ mailbox = dispatcher.createMailbox(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) - /* - * attach before submitting the mailbox for the first time, because - * otherwise the actor could already be dead before the dispatcher is - * informed of its existence (with reversed attach/detach sequence). - */ + // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Create()) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 9d1575c4ec..22eadb55d5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -185,9 +185,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def id: String /** - * Attaches the specified actor instance to this dispatcher + * Attaches the specified actor instance to this dispatcher, which includes + * scheduling it to run for the first time (Create() is expected to have + * been enqueued by the ActorCell upon mailbox creation). */ - final def attach(actor: ActorCell): Unit = register(actor) + final def attach(actor: ActorCell): Unit = { + register(actor) + registerForExecution(actor.mailbox, false, true) + } /** * Detaches the specified actor instance from this dispatcher @@ -243,7 +248,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() /** - * If you override it, you must call it. But only ever once. See "attach" for only invocation + * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { inhabitantsUpdater.incrementAndGet(this) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 70101578f0..61ac773aa0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -110,40 +110,24 @@ class BalancingDispatcher( } } - protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = - /* - * need to filter out Create() messages here because BalancingDispatcher - * already enqueues this within register(), which is called first by the - * ActorCell. - */ - invocation match { - case Create() ⇒ - case x ⇒ super.systemDispatch(receiver, invocation) - } - protected[akka] override def register(actor: ActorCell) = { - val mbox = actor.mailbox - mbox.systemEnqueue(actor.self, Create()) - // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) buddies.add(actor) - // must make sure that buddy-add is executed before the actor has had a chance to die - registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - if (messageQueue.hasMessages) registerOne() + if (messageQueue.hasMessages) scheduleOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) scheduleOne() } - @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = - if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8282ee58f5..8b2d15a079 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import scala.annotation.tailrec import com.typesafe.config.Config -import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } +import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } @@ -132,6 +132,17 @@ class CallingThreadDispatcher( protected[akka] override def shutdownTimeout = 1 second + protected[akka] override def register(actor: ActorCell): Unit = { + super.register(actor) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + queue.enter + runQueue(mbox, queue) + case x ⇒ throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass) + } + } + override def suspend(actor: ActorCell) { actor.mailbox match { case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn