From d6f42ca344a730575e62390753e652d40a477802 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 19 Aug 2012 20:25:12 +0200 Subject: [PATCH] remove child in Terminated() processing, see #2391 --- .../src/test/scala/akka/actor/ActorSystemSpec.scala | 3 ++- akka-actor/src/main/scala/akka/actor/Actor.scala | 2 +- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 10 ++++++++-- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 4 ++-- .../src/main/scala/akka/actor/cell/Children.scala | 2 +- .../src/main/scala/akka/actor/cell/DeathWatch.scala | 2 +- .../src/main/scala/akka/pattern/AskSupport.scala | 4 ++-- .../java/code/docs/actor/FaultHandlingTestBase.java | 4 ++-- .../scala/code/docs/actor/FaultHandlingDocSpec.scala | 6 ++++-- 9 files changed, 23 insertions(+), 14 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 7dfdf83e6e..1daddfb5b1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -214,7 +214,8 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt EventFilter[Exception]("hello", occurrences = 1) intercept { a ! "die" } - probe.expectMsg(Terminated(a)(true)) + val t = probe.expectMsg(Terminated(a)(true, 0)) + t.existenceConfirmed must be(true) } "shut down when /user escalates" in { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 6a9bdb0801..bfb805d690 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -63,7 +63,7 @@ case object Kill extends Kill { * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. */ @SerialVersionUID(1L) -case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean, val uid: Int) extends AutoReceivedMessage abstract class ReceiveTimeout extends PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2e1c7cd610..90500cfc88 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -365,8 +365,14 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t @ Terminated(actor) ⇒ + getChildByRef(actor) match { + case Some(crs) if crs.uid == t.uid ⇒ removeChild(actor) + case _ ⇒ + } + watchedActorTerminated(t.actor) + receiveMessage(t) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 30aacac0d8..13225165f1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -441,7 +441,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee == this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0) true case _: Unwatch ⇒ true // Just ignore case _ ⇒ false @@ -466,7 +466,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee != this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0) true case w: Unwatch ⇒ true // Just ignore case _ ⇒ false diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 5704a6b22b..edbb3489fc 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -88,7 +88,7 @@ private[akka] trait Children { this: ActorCell ⇒ swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) } - @tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = { + @tailrec final protected def removeChild(ref: ActorRef): ChildrenContainer = { val c = childrenRefs val n = c.remove(ref) if (swapChildrenRefs(c, n)) n diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index 031019f3f6..2c78075764 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -36,7 +36,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) + val terminated = Terminated(self)(existenceConfirmed = true, uid) try { watchedBy foreach { watcher ⇒ diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 2909034934..42bc9fcb9b 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -262,7 +262,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _: Terminate ⇒ stop() case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true) + if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, 0) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide result.tryComplete(Left(new ActorKilledException("Stopped"))) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(existenceConfirmed = true) + val termination = Terminated(this)(existenceConfirmed = true, 0) watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } } } diff --git a/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java index b4779676ee..5dde14afdb 100644 --- a/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java @@ -182,7 +182,7 @@ public class FaultHandlingTestBase { final TestProbe probe = new TestProbe(system); probe.watch(child); child.tell(new IllegalArgumentException()); - probe.expectMsg(new Terminated(child, true)); + probe.expectMsg(new Terminated(child, true, 0)); //#stop //#escalate-kill @@ -190,7 +190,7 @@ public class FaultHandlingTestBase { probe.watch(child); assert Await.result(ask(child, "get", 5000), timeout).equals(0); child.tell(new Exception()); - probe.expectMsg(new Terminated(child, true)); + probe.expectMsg(new Terminated(child, true, 0)); //#escalate-kill //#escalate-restart diff --git a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala index f291eb0132..e140e262a5 100644 --- a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala @@ -113,7 +113,8 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { //#stop watch(child) // have testActor watch “child” child ! new IllegalArgumentException // break it - expectMsg(Terminated(child)(existenceConfirmed = true)) + val t = expectMsg(Terminated(child)(true, 0)) + t.existenceConfirmed must be(true) child.isTerminated must be(true) //#stop } @@ -127,7 +128,8 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { expectMsg(0) child2 ! new Exception("CRASH") // escalate failure - expectMsg(Terminated(child2)(existenceConfirmed = true)) + val t = expectMsg(Terminated(child2)(true, 0)) + t.existenceConfirmed must be(true) //#escalate-kill //#escalate-restart val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")