fix bug in ActorRef.stop() implementation

- it was telling all children to stop(), then waited for the
  ChildTerminated messages and finally terminated itself
- this worked fine, except when the stop came from the supervisor, i.e.
  the recipient was suspended and did not process the ChildTerminated
- so, as the mirror of Supervise() that it is, I changed
  ChildTerminated() to be a system message and instead of stopping
  processing normal messages by checking the stopping flag, just suspend
  the actor while awaiting the ChildTerminated's to flow in.
This commit is contained in:
Roland 2011-12-03 18:16:41 +01:00
parent 3d0bb8b415
commit 4c1d722398
7 changed files with 29 additions and 26 deletions

View file

@ -243,7 +243,7 @@ class ActorLookupSpec extends AkkaSpec {
actors must be === Seq(c1, c2) actors must be === Seq(c1, c2)
expectNoMsg(1 second) expectNoMsg(1 second)
} }
"drop messages which cannot be delivered" in { "drop messages which cannot be delivered" in {
implicit val sender = c2 implicit val sender = c2
ActorSelection(c21, "../../*/c21") ! GetSender(testActor) ActorSelection(c21, "../../*/c21") ! GetSender(testActor)

View file

@ -51,8 +51,6 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
case object ChildTerminated extends AutoReceivedMessage with PossiblyHarmful
case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful
case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful

View file

@ -269,6 +269,8 @@ private[akka] class ActorCell(
val c = children val c = children
if (c.isEmpty) doTerminate() if (c.isEmpty) doTerminate()
else { else {
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping"))
for (child c) child.stop() for (child c) child.stop()
stopping = true stopping = true
@ -290,8 +292,9 @@ private[akka] class ActorCell(
try { try {
if (stopping) message match { if (stopping) message match {
case Terminate() terminate() // to allow retry case Terminate() terminate() // to allow retry
case _ case ChildTerminated(child) handleChildTerminated(child)
case _
} }
else message match { else message match {
case Create() create() case Create() create()
@ -302,10 +305,11 @@ private[akka] class ActorCell(
case Unlink(subject) case Unlink(subject)
system.deathWatch.unsubscribe(self, subject) system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject))
case Suspend() suspend() case Suspend() suspend()
case Resume() resume() case Resume() resume()
case Terminate() terminate() case Terminate() terminate()
case Supervise(child) supervise(child) case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
} }
} catch { } catch {
case e //Should we really catch everything here? case e //Should we really catch everything here?
@ -324,9 +328,7 @@ private[akka] class ActorCell(
cancelReceiveTimeout() // FIXME: leave this here??? cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match { messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle) case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg if stopping // receiving Terminated in response to stopping children is too common to generate noise case msg actor(msg)
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
case msg actor(msg)
} }
currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch {
@ -370,15 +372,10 @@ private[akka] class ActorCell(
def autoReceiveMessage(msg: Envelope) { def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg)) if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg))
if (stopping) msg.message match { msg.message match {
case ChildTerminated handleChildTerminated(sender)
case _ system.deadLetterMailbox.enqueue(self, msg)
}
else msg.message match {
case HotSwap(code, discardOld) become(code(self), discardOld) case HotSwap(code, discardOld) become(code(self), discardOld)
case RevertHotSwap unbecome() case RevertHotSwap unbecome()
case Failed(cause) handleFailure(sender, cause) case Failed(cause) handleFailure(sender, cause)
case ChildTerminated handleChildTerminated(sender)
case Kill throw new ActorKilledException("Kill") case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop() case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender) case SelectParent(m) parent.tell(m, msg.sender)
@ -395,7 +392,7 @@ private[akka] class ActorCell(
if (a ne null) a.postStop() if (a ne null) a.postStop()
} finally { } finally {
try { try {
parent.tell(ChildTerminated, self) parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self)) system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped"))
} finally { } finally {

View file

@ -10,8 +10,8 @@ import scala.annotation.tailrec
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
import akka.actor.Timeout.intToTimeout import akka.actor.Timeout.intToTimeout
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope } import akka.dispatch._
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter, BroadcastRouter } import akka.routing._
import akka.AkkaException import akka.AkkaException
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.util.{ Duration, Switch, Helpers } import akka.util.{ Duration, Switch, Helpers }
@ -369,15 +369,15 @@ class LocalActorRefProvider(
override def isTerminated = stopped.isOn override def isTerminated = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
case Failed(ex) causeOfTermination = Some(ex); sender.stop() case Failed(ex) causeOfTermination = Some(ex); sender.stop()
case ChildTerminated stop() case _ log.error(this + " received unexpected message " + message)
case _ log.error(this + " received unexpected message " + message)
}) })
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match { message match {
case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead
case _ log.error(this + " received unexpected system message " + message) case ChildTerminated(child) stop()
case _ log.error(this + " received unexpected system message " + message)
} }
} }
} }

View file

@ -61,6 +61,7 @@ case class Suspend() extends SystemMessage // sent to self from ActorCell.suspen
case class Resume() extends SystemMessage // sent to self from ActorCell.resume case class Resume() extends SystemMessage // sent to self from ActorCell.resume
case class Terminate() extends SystemMessage // sent to self from ActorCell.stop case class Terminate() extends SystemMessage // sent to self from ActorCell.stop
case class Supervise(child: ActorRef) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start case class Supervise(child: ActorRef) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate
case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsWatching case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsWatching
case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsWatching case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsWatching

View file

@ -28,6 +28,9 @@ object Mailbox {
// secondary status: Scheduled bit may be added to Open/Suspended // secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4 final val Scheduled = 4
// mailbox debugging helper using println (see below)
// FIXME TODO take this out before release (but please leave in until M2!)
final val debug = false
} }
/** /**
@ -164,6 +167,7 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var processedMessages = 0 var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0
do { do {
if (debug) println(actor.self + " processing message " + nextMessage)
actor invoke nextMessage actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages processAllSystemMessages() //After we're done, process all system messages
@ -186,6 +190,7 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var nextMessage = systemDrain() var nextMessage = systemDrain()
try { try {
while (nextMessage ne null) { while (nextMessage ne null) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
actor systemInvoke nextMessage actor systemInvoke nextMessage
nextMessage = nextMessage.next nextMessage = nextMessage.next
// dont ever execute normal message when system message present! // dont ever execute normal message when system message present!
@ -240,6 +245,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec @tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null) assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet val head = systemQueueGet
/* /*
* this write is safely published by the compareAndSet contained within * this write is safely published by the compareAndSet contained within

View file

@ -224,6 +224,7 @@ class CallingThreadDispatcher(
} }
if (handle ne null) { if (handle ne null) {
try { try {
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
mbox.actor.invoke(handle) mbox.actor.invoke(handle)
true true
} catch { } catch {