send Supervise before attaching to dispatcher, see #2301

This commit is contained in:
Roland 2012-07-13 11:49:26 +02:00
parent 783a59fb99
commit 8517d24c3a
10 changed files with 51 additions and 23 deletions

View file

@ -273,7 +273,7 @@ private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
val props: Props,
@volatile var parent: InternalActorRef)
val parent: InternalActorRef)
extends UntypedActorContext with Cell
with cell.ReceiveTimeout
with cell.Children
@ -290,14 +290,15 @@ private[akka] class ActorCell(
protected final def lookupRoot = self
final def provider = system.provider
var actor: Actor = _
private[this] var _actor: Actor = _
def actor: Actor = _actor
protected def actor_=(a: Actor): Unit = _actor = a
var currentMessage: Envelope = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
/*
* MESSAGE PROCESSING
*/
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage): Unit = try {
message match {

View file

@ -262,10 +262,7 @@ private[akka] class LocalActorRef private[akka] (
* that is reached).
*/
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
actorCell.start()
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
_supervisor.sendSystemMessage(akka.dispatch.Supervise(this))
actorCell.start(sendSupervise = true)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor)

View file

@ -698,19 +698,30 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
node match {
case wc: ActorRefWithCell
val cell = wc.underlying
indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " +
(if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") +
node.path.name + " " + Logging.simpleName(node) + " " +
(cell match {
case real: ActorCell if (real.actor ne null) real.actor.getClass else "null"
case _ Logging.simpleName(cell)
}) +
(cell match {
case real: ActorCell " status=" + real.mailbox.status
case _ ""
}) +
" " + (cell.childrenRefs match {
case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason)
"Terminating(" + reason + ")" +
(toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", ""))
(toDie.toSeq.sorted mkString ("\n" + indent + " | toDie: ", "\n" + indent + " | ", ""))
case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) x.toString
case n: ChildrenContainer.NormalChildrenContainer n.c.size + " children"
case x Logging.simpleName(x)
}) +
(if (cell.childrenRefs.children.isEmpty) "" else "\n") +
(cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n"))
({
val children = cell.childrenRefs.children.toSeq.sorted
val bulk = children.dropRight(1) map (printNode(_, indent + " |"))
bulk ++ (children.lastOption map (printNode(_, indent + " ")))
} mkString ("\n"))
case _
indent + node.path.name + " " + Logging.simpleName(node)
}

View file

@ -77,7 +77,7 @@ private[akka] class RepointableActorRef(
* This is called by activate() to obtain the cell which is to replace the
* unstarted cell. The cell must be fully functional.
*/
def newCell(): Cell = new ActorCell(system, this, props, supervisor).start()
def newCell(): Cell = new ActorCell(system, this, props, supervisor).start(sendSupervise = false)
def suspend(): Unit = underlying.suspend()

View file

@ -101,6 +101,8 @@ private[akka] trait Children { this: ActorCell ⇒
}
}
final protected def setTerminated(): Unit = Unsafe.instance.putObjectVolatile(this, AbstractActorCell.childrenOffset, TerminatedChildrenContainer)
/*
* ActorCell-internal API
*/

View file

@ -58,13 +58,14 @@ private[akka] object ChildrenContainer {
def shallDie(actor: ActorRef): ChildrenContainer = this
def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
def unreserve(name: String): ChildrenContainer = this
override def toString = "no children"
}
/**
* This is the empty container, shared among all leaf actors.
*/
object EmptyChildrenContainer extends EmptyChildrenContainer
object EmptyChildrenContainer extends EmptyChildrenContainer {
override def toString = "no children"
}
/**
* This is the empty container which is installed after the last child has
@ -77,6 +78,7 @@ private[akka] object ChildrenContainer {
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
override def isTerminating: Boolean = true
override def isNormal: Boolean = false
override def toString = "terminated"
}
/**
@ -85,7 +87,7 @@ private[akka] object ChildrenContainer {
* calling context.stop(child) and processing the ChildTerminated() system
* message).
*/
class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer {
class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))

View file

@ -35,7 +35,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
final def isTerminated: Boolean = mailbox.isClosed
final def start(): this.type = {
final def start(sendSupervise: Boolean): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
@ -47,6 +47,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
}
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)

View file

@ -98,13 +98,20 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
// do not propagate failures during shutdown to the supervisor
setFailed()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
} else finishTerminate()
if (!wasTerminating) {
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
// do not propagate failures during shutdown to the supervisor
setFailed()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
}
} else {
setTerminated()
finishTerminate()
}
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {

View file

@ -72,7 +72,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
start()
start(sendSupervise = false)
/*
* end of construction

View file

@ -17,6 +17,7 @@ import java.util.concurrent.TimeoutException
import akka.dispatch.{ Await, MessageDispatcher }
import akka.dispatch.Dispatchers
import akka.pattern.ask
import akka.actor.ActorSystemImpl
object TimingTest extends Tag("timing")
object LongRunningTest extends Tag("long-running")
@ -78,7 +79,9 @@ abstract class AkkaSpec(_system: ActorSystem)
beforeShutdown()
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
case _: TimeoutException
system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
println(system.asInstanceOf[ActorSystemImpl].printTree)
}
atTermination()
}