diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index d18b1f97bb..eefd7d9b2a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -107,7 +107,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende case class FF(fail: Failed) val supervisor = actorOf(Props[Supervisor] .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { - override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = { + override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(cause)), child) super.handleFailure(child, cause, stats, children) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index b67b1892d1..90011faa52 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -54,9 +54,7 @@ private[akka] object ActorCell { override def initialValue = Stack[ActorContext]() } - val emptyChildrenRefs = TreeMap[String, ActorRef]() - - val emptyChildrenStats = TreeMap[ActorRef, ChildRestartStats]() + val emptyChildrenRefs = TreeMap[String, ChildRestartStats]() } //vars don't need volatile since it's protected with the mailbox status @@ -81,9 +79,7 @@ private[akka] class ActorCell( var futureTimeout: Option[Cancellable] = None - //FIXME TODO Coalesce childrenRefs and ChildrenStats into one field, this to conserve memory - var childrenRefs = emptyChildrenRefs - var childrenStats = emptyChildrenStats + var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs var currentMessage: Envelope = null @@ -138,13 +134,10 @@ private[akka] class ActorCell( subject } - final def children: Iterable[ActorRef] = childrenStats.keys + final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) - final def getChild(name: String): Option[ActorRef] = { - val isClosed = mailbox.isClosed // fence plus volatile read - if (isClosed) None - else childrenRefs.get(name) - } + final def getChild(name: String): Option[ActorRef] = + if (isShutdown) None else childrenRefs.get(name).map(_.child) final def tell(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) @@ -242,12 +235,16 @@ private[akka] class ActorCell( } def supervise(child: ActorRef): Unit = { - val stats = childrenStats - if (!stats.contains(child)) { - childrenRefs = childrenRefs.updated(child.name, child) - childrenStats = childrenStats.updated(child, ChildRestartStats()) + val stat = childrenRefs.get(child.name) + if (stat.isDefined) { + if (stat.get.child == child) + system.eventStream.publish(Warning(self.toString, "Already supervising " + child)) + else + system.eventStream.publish(Warning(self.toString, "Already supervising other child with same name '" + child.name + "', old: " + stat.get + " new: " + child)) + } else { + childrenRefs = childrenRefs.updated(child.name, ChildRestartStats(child)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now supervising " + child)) - } else system.eventStream.publish(Warning(self.toString, "Already supervising " + child)) + } } try { @@ -286,13 +283,9 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ - if (stopping) { - // receiving Terminated in response to stopping children is too common to generate noise - if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) - } else { - actor(msg) - } + case msg if stopping ⇒ // receiving Terminated in response to stopping children is too common to generate noise + if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) + case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { @@ -371,16 +364,16 @@ private[akka] class ActorCell( } } - final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match { - case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause - case None ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child")) + final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.name) match { + case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenRefs.values)) throw cause + case Some(stats) ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) + case None ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { childrenRefs -= child.name - childrenStats -= child props.faultHandler.handleChildTerminated(child, children) - if (stopping && childrenStats.isEmpty) doTerminate() + if (stopping && childrenRefs.isEmpty) doTerminate() } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 90da6dd10a..4656f5a3e3 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -9,7 +9,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import java.lang.{ Iterable ⇒ JIterable } -case class ChildRestartStats(var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { +case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { @@ -118,7 +118,7 @@ abstract class FaultHandlingStrategy { /** * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. */ - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) @@ -133,7 +133,7 @@ abstract class FaultHandlingStrategy { /** * Returns whether it processed the failure or not */ - def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = { + def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { case Resume ⇒ child.resume(); true @@ -191,12 +191,12 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, //TODO optimization to drop all children here already? } - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { - if (restart && children.forall(_._2.requestRestartPermission(retriesWindow))) - children.foreach(_._1.restart(cause)) + if (restart && children.forall(_.requestRestartPermission(retriesWindow))) + children.foreach(_.child.restart(cause)) else - children.foreach(_._1.stop()) + children.foreach(_.child.stop()) } } } @@ -245,7 +245,7 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) child.restart(cause) else diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 5d70286226..7d3b3c3d8b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -188,7 +188,7 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes var nextMessage = systemDrain() try { while (nextMessage ne null) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs + "/" + actor.childrenStats) + if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present!