remove child in Terminated() processing, see #2391
This commit is contained in:
parent
7f1a4d3ab6
commit
d6f42ca344
9 changed files with 23 additions and 14 deletions
|
|
@ -214,7 +214,8 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
|||
EventFilter[Exception]("hello", occurrences = 1) intercept {
|
||||
a ! "die"
|
||||
}
|
||||
probe.expectMsg(Terminated(a)(true))
|
||||
val t = probe.expectMsg(Terminated(a)(true, 0))
|
||||
t.existenceConfirmed must be(true)
|
||||
}
|
||||
|
||||
"shut down when /user escalates" in {
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ case object Kill extends Kill {
|
|||
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
|
||||
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean, val uid: Int) extends AutoReceivedMessage
|
||||
|
||||
abstract class ReceiveTimeout extends PossiblyHarmful
|
||||
|
||||
|
|
|
|||
|
|
@ -365,8 +365,14 @@ private[akka] class ActorCell(
|
|||
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
||||
|
||||
msg.message match {
|
||||
case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid)
|
||||
case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t)
|
||||
case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid)
|
||||
case t @ Terminated(actor) ⇒
|
||||
getChildByRef(actor) match {
|
||||
case Some(crs) if crs.uid == t.uid ⇒ removeChild(actor)
|
||||
case _ ⇒
|
||||
}
|
||||
watchedActorTerminated(t.actor)
|
||||
receiveMessage(t)
|
||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||
case PoisonPill ⇒ self.stop()
|
||||
case SelectParent(m) ⇒ parent.tell(m, msg.sender)
|
||||
|
|
|
|||
|
|
@ -441,7 +441,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
|||
protected def specialHandle(msg: Any): Boolean = msg match {
|
||||
case w: Watch ⇒
|
||||
if (w.watchee == this && w.watcher != this)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0)
|
||||
true
|
||||
case _: Unwatch ⇒ true // Just ignore
|
||||
case _ ⇒ false
|
||||
|
|
@ -466,7 +466,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
|
|||
override protected def specialHandle(msg: Any): Boolean = msg match {
|
||||
case w: Watch ⇒
|
||||
if (w.watchee != this && w.watcher != this)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0)
|
||||
true
|
||||
case w: Unwatch ⇒ true // Just ignore
|
||||
case _ ⇒ false
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
|
||||
}
|
||||
|
||||
@tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = {
|
||||
@tailrec final protected def removeChild(ref: ActorRef): ChildrenContainer = {
|
||||
val c = childrenRefs
|
||||
val n = c.remove(ref)
|
||||
if (swapChildrenRefs(c, n)) n
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
|||
|
||||
protected def tellWatchersWeDied(actor: Actor): Unit = {
|
||||
if (!watchedBy.isEmpty) {
|
||||
val terminated = Terminated(self)(existenceConfirmed = true)
|
||||
val terminated = Terminated(self)(existenceConfirmed = true, uid)
|
||||
try {
|
||||
watchedBy foreach {
|
||||
watcher ⇒
|
||||
|
|
|
|||
|
|
@ -262,7 +262,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
|||
case _: Terminate ⇒ stop()
|
||||
case Watch(watchee, watcher) ⇒
|
||||
if (watchee == this && watcher != this) {
|
||||
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
|
||||
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, 0)
|
||||
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
|
||||
case Unwatch(watchee, watcher) ⇒
|
||||
if (watchee == this && watcher != this) remWatcher(watcher)
|
||||
|
|
@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
|||
result.tryComplete(Left(new ActorKilledException("Stopped")))
|
||||
val watchers = clearWatchers()
|
||||
if (!watchers.isEmpty) {
|
||||
val termination = Terminated(this)(existenceConfirmed = true)
|
||||
val termination = Terminated(this)(existenceConfirmed = true, 0)
|
||||
watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ public class FaultHandlingTestBase {
|
|||
final TestProbe probe = new TestProbe(system);
|
||||
probe.watch(child);
|
||||
child.tell(new IllegalArgumentException());
|
||||
probe.expectMsg(new Terminated(child, true));
|
||||
probe.expectMsg(new Terminated(child, true, 0));
|
||||
//#stop
|
||||
|
||||
//#escalate-kill
|
||||
|
|
@ -190,7 +190,7 @@ public class FaultHandlingTestBase {
|
|||
probe.watch(child);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
|
||||
child.tell(new Exception());
|
||||
probe.expectMsg(new Terminated(child, true));
|
||||
probe.expectMsg(new Terminated(child, true, 0));
|
||||
//#escalate-kill
|
||||
|
||||
//#escalate-restart
|
||||
|
|
|
|||
|
|
@ -113,7 +113,8 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
|
|||
//#stop
|
||||
watch(child) // have testActor watch “child”
|
||||
child ! new IllegalArgumentException // break it
|
||||
expectMsg(Terminated(child)(existenceConfirmed = true))
|
||||
val t = expectMsg(Terminated(child)(true, 0))
|
||||
t.existenceConfirmed must be(true)
|
||||
child.isTerminated must be(true)
|
||||
//#stop
|
||||
}
|
||||
|
|
@ -127,7 +128,8 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
|
|||
expectMsg(0)
|
||||
|
||||
child2 ! new Exception("CRASH") // escalate failure
|
||||
expectMsg(Terminated(child2)(existenceConfirmed = true))
|
||||
val t = expectMsg(Terminated(child2)(true, 0))
|
||||
t.existenceConfirmed must be(true)
|
||||
//#escalate-kill
|
||||
//#escalate-restart
|
||||
val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue