diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9fb535dcb2..bc8e8f1cbe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -273,7 +273,7 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - @volatile var parent: InternalActorRef) + val parent: InternalActorRef) extends UntypedActorContext with Cell with cell.ReceiveTimeout with cell.Children @@ -290,14 +290,15 @@ private[akka] class ActorCell( protected final def lookupRoot = self final def provider = system.provider - var actor: Actor = _ + private[this] var _actor: Actor = _ + def actor: Actor = _actor + protected def actor_=(a: Actor): Unit = _actor = a var currentMessage: Envelope = _ private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack /* * MESSAGE PROCESSING */ - //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def systemInvoke(message: SystemMessage): Unit = try { message match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6cbd821af4..00a84f956a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -262,10 +262,7 @@ private[akka] class LocalActorRef private[akka] ( * that is reached). */ private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor) - actorCell.start() - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - _supervisor.sendSystemMessage(akka.dispatch.Supervise(this)) + actorCell.start(sendSupervise = true) protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = new ActorCell(system, ref, props, supervisor) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index c155f9d092..a83c78e0bb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -698,19 +698,30 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, node match { case wc: ActorRefWithCell ⇒ val cell = wc.underlying - indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " + + (if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") + + node.path.name + " " + Logging.simpleName(node) + " " + (cell match { case real: ActorCell ⇒ if (real.actor ne null) real.actor.getClass else "null" case _ ⇒ Logging.simpleName(cell) }) + + (cell match { + case real: ActorCell ⇒ " status=" + real.mailbox.status + case _ ⇒ "" + }) + " " + (cell.childrenRefs match { case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) ⇒ "Terminating(" + reason + ")" + - (toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", "")) + (toDie.toSeq.sorted mkString ("\n" + indent + " | toDie: ", "\n" + indent + " | ", "")) + case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) ⇒ x.toString + case n: ChildrenContainer.NormalChildrenContainer ⇒ n.c.size + " children" case x ⇒ Logging.simpleName(x) }) + (if (cell.childrenRefs.children.isEmpty) "" else "\n") + - (cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n")) + ({ + val children = cell.childrenRefs.children.toSeq.sorted + val bulk = children.dropRight(1) map (printNode(_, indent + " |")) + bulk ++ (children.lastOption map (printNode(_, indent + " "))) + } mkString ("\n")) case _ ⇒ indent + node.path.name + " " + Logging.simpleName(node) } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index b08c7adbee..ca1bb2492e 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -77,7 +77,7 @@ private[akka] class RepointableActorRef( * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ - def newCell(): Cell = new ActorCell(system, this, props, supervisor).start() + def newCell(): Cell = new ActorCell(system, this, props, supervisor).start(sendSupervise = false) def suspend(): Unit = underlying.suspend() diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index a949297b1b..e20a8addbd 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -101,6 +101,8 @@ private[akka] trait Children { this: ActorCell ⇒ } } + final protected def setTerminated(): Unit = Unsafe.instance.putObjectVolatile(this, AbstractActorCell.childrenOffset, TerminatedChildrenContainer) + /* * ActorCell-internal API */ diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index b31bbbd4af..8b85a22ea8 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -58,13 +58,14 @@ private[akka] object ChildrenContainer { def shallDie(actor: ActorRef): ChildrenContainer = this def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved)) def unreserve(name: String): ChildrenContainer = this - override def toString = "no children" } /** * This is the empty container, shared among all leaf actors. */ - object EmptyChildrenContainer extends EmptyChildrenContainer + object EmptyChildrenContainer extends EmptyChildrenContainer { + override def toString = "no children" + } /** * This is the empty container which is installed after the last child has @@ -77,6 +78,7 @@ private[akka] object ChildrenContainer { throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") override def isTerminating: Boolean = true override def isNormal: Boolean = false + override def toString = "terminated" } /** @@ -85,7 +87,7 @@ private[akka] object ChildrenContainer { * calling context.stop(child) and processing the ChildTerminated() system * message). */ - class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer { + class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 5dd9f16a6c..8c849366d8 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -35,7 +35,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ final def isTerminated: Boolean = mailbox.isClosed - final def start(): this.type = { + final def start(sendSupervise: Boolean): this.type = { /* * Create the mailbox and enqueue the Create() message to ensure that @@ -47,6 +47,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ mailbox.systemEnqueue(self, Create()) + if (sendSupervise) { + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(akka.dispatch.Supervise(self)) + } + // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 88fca60eb4..6d0c6ce283 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -98,13 +98,20 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) children foreach stop + val wasTerminating = isTerminating + if (setChildrenTerminationReason(ChildrenContainer.Termination)) { - // do not process normal messages while waiting for all children to terminate - suspendNonRecursive() - // do not propagate failures during shutdown to the supervisor - setFailed() - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) - } else finishTerminate() + if (!wasTerminating) { + // do not process normal messages while waiting for all children to terminate + suspendNonRecursive() + // do not propagate failures during shutdown to the supervisor + setFailed() + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) + } + } else { + setTerminated() + finishTerminate() + } } final def handleInvokeFailure(t: Throwable, message: String): Unit = { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index cb0f5ee09b..5f68694d8f 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -72,7 +72,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") - start() + start(sendSupervise = false) /* * end of construction diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index f381e53013..4e9053722e 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -17,6 +17,7 @@ import java.util.concurrent.TimeoutException import akka.dispatch.{ Await, MessageDispatcher } import akka.dispatch.Dispatchers import akka.pattern.ask +import akka.actor.ActorSystemImpl object TimingTest extends Tag("timing") object LongRunningTest extends Tag("long-running") @@ -78,7 +79,9 @@ abstract class AkkaSpec(_system: ActorSystem) beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { - case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + case _: TimeoutException ⇒ + system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + println(system.asInstanceOf[ActorSystemImpl].printTree) } atTermination() }