diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 40e281dba2..1f51ed6dbc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -243,7 +243,7 @@ class ActorLookupSpec extends AkkaSpec { actors must be === Seq(c1, c2) expectNoMsg(1 second) } - + "drop messages which cannot be delivered" in { implicit val sender = c2 ActorSelection(c21, "../../*/c21") ! GetSender(testActor) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a0c8c89fa0..07d3cf4d1a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -51,8 +51,6 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful -case object ChildTerminated extends AutoReceivedMessage with PossiblyHarmful - case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index b96b2acded..65be64a32b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -269,6 +269,8 @@ private[akka] class ActorCell( val c = children if (c.isEmpty) doTerminate() else { + // do not process normal messages while waiting for all children to terminate + dispatcher suspend this if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) for (child ← c) child.stop() stopping = true @@ -290,8 +292,9 @@ private[akka] class ActorCell( try { if (stopping) message match { - case Terminate() ⇒ terminate() // to allow retry - case _ ⇒ + case Terminate() ⇒ terminate() // to allow retry + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case _ ⇒ } else message match { case Create() ⇒ create() @@ -302,10 +305,11 @@ private[akka] class ActorCell( case Unlink(subject) ⇒ system.deathWatch.unsubscribe(self, subject) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject)) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { case e ⇒ //Should we really catch everything here? @@ -324,9 +328,7 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg if stopping ⇒ // receiving Terminated in response to stopping children is too common to generate noise - if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) - case msg ⇒ actor(msg) + case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { @@ -370,15 +372,10 @@ private[akka] class ActorCell( def autoReceiveMessage(msg: Envelope) { if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg)) - if (stopping) msg.message match { - case ChildTerminated ⇒ handleChildTerminated(sender) - case _ ⇒ system.deadLetterMailbox.enqueue(self, msg) - } - else msg.message match { + msg.message match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) case RevertHotSwap ⇒ unbecome() case Failed(cause) ⇒ handleFailure(sender, cause) - case ChildTerminated ⇒ handleChildTerminated(sender) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) @@ -395,7 +392,7 @@ private[akka] class ActorCell( if (a ne null) a.postStop() } finally { try { - parent.tell(ChildTerminated, self) + parent.sendSystemMessage(ChildTerminated(self)) system.deathWatch.publish(Terminated(self)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped")) } finally { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 23f269e172..e0a4095d3e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -10,8 +10,8 @@ import scala.annotation.tailrec import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } import akka.actor.Timeout.intToTimeout import akka.config.ConfigurationException -import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope } -import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter, BroadcastRouter } +import akka.dispatch._ +import akka.routing._ import akka.AkkaException import com.eaio.uuid.UUID import akka.util.{ Duration, Switch, Helpers } @@ -369,15 +369,15 @@ class LocalActorRefProvider( override def isTerminated = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { - case Failed(ex) ⇒ causeOfTermination = Some(ex); sender.stop() - case ChildTerminated ⇒ stop() - case _ ⇒ log.error(this + " received unexpected message " + message) + case Failed(ex) ⇒ causeOfTermination = Some(ex); sender.stop() + case _ ⇒ log.error(this + " received unexpected message " + message) }) override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead - case _ ⇒ log.error(this + " received unexpected system message " + message) + case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case ChildTerminated(child) ⇒ stop() + case _ ⇒ log.error(this + " received unexpected system message " + message) } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 99557e33c8..0845978e4a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -61,6 +61,7 @@ case class Suspend() extends SystemMessage // sent to self from ActorCell.suspen case class Resume() extends SystemMessage // sent to self from ActorCell.resume case class Terminate() extends SystemMessage // sent to self from ActorCell.stop case class Supervise(child: ActorRef) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsWatching case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsWatching diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 06de342df3..63b7c74161 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -28,6 +28,9 @@ object Mailbox { // secondary status: Scheduled bit may be added to Open/Suspended final val Scheduled = 4 + // mailbox debugging helper using println (see below) + // FIXME TODO take this out before release (but please leave in until M2!) + final val debug = false } /** @@ -164,6 +167,7 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes var processedMessages = 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0 do { + if (debug) println(actor.self + " processing message " + nextMessage) actor invoke nextMessage processAllSystemMessages() //After we're done, process all system messages @@ -186,6 +190,7 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes var nextMessage = systemDrain() try { while (nextMessage ne null) { + if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! @@ -240,6 +245,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) + if (Mailbox.debug) println(actor.self + " having enqueued " + message) val head = systemQueueGet /* * this write is safely published by the compareAndSet contained within diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 2a3933c93b..5bc2c8df3b 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -224,6 +224,7 @@ class CallingThreadDispatcher( } if (handle ne null) { try { + if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle) mbox.actor.invoke(handle) true } catch {