redo ChildTerminated() processing, see #2391"
the previous patch of including the UID in the Terminated message did suffer from the system vs. normal message ordering problem, hence I reverted the previous fix and replaced it by sending a NullMessage after ChildTerminated, exactly like in the Supervise case.
This commit is contained in:
parent
d110836629
commit
43170ff168
10 changed files with 17 additions and 23 deletions
|
|
@ -214,7 +214,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
||||||
EventFilter[Exception]("hello", occurrences = 1) intercept {
|
EventFilter[Exception]("hello", occurrences = 1) intercept {
|
||||||
a ! "die"
|
a ! "die"
|
||||||
}
|
}
|
||||||
val t = probe.expectMsg(Terminated(a)(true, 0))
|
val t = probe.expectMsg(Terminated(a)(true))
|
||||||
t.existenceConfirmed must be(true)
|
t.existenceConfirmed must be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -63,10 +63,7 @@ case object Kill extends Kill {
|
||||||
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
|
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
|
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
|
||||||
@BeanProperty val existenceConfirmed: Boolean,
|
|
||||||
private[akka] val uid: Int)
|
|
||||||
extends AutoReceivedMessage
|
|
||||||
|
|
||||||
abstract class ReceiveTimeout extends PossiblyHarmful
|
abstract class ReceiveTimeout extends PossiblyHarmful
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -375,14 +375,8 @@ private[akka] class ActorCell(
|
||||||
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
||||||
|
|
||||||
msg.message match {
|
msg.message match {
|
||||||
case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid)
|
case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid)
|
||||||
case t @ Terminated(actor) ⇒
|
case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t)
|
||||||
getChildByRef(actor) match {
|
|
||||||
case Some(crs) if crs.uid == t.uid ⇒ removeChild(actor)
|
|
||||||
case _ ⇒
|
|
||||||
}
|
|
||||||
watchedActorTerminated(t.actor)
|
|
||||||
receiveMessage(t)
|
|
||||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||||
case PoisonPill ⇒ self.stop()
|
case PoisonPill ⇒ self.stop()
|
||||||
case SelectParent(m) ⇒ parent.tell(m, msg.sender)
|
case SelectParent(m) ⇒ parent.tell(m, msg.sender)
|
||||||
|
|
|
||||||
|
|
@ -441,7 +441,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
||||||
protected def specialHandle(msg: Any): Boolean = msg match {
|
protected def specialHandle(msg: Any): Boolean = msg match {
|
||||||
case w: Watch ⇒
|
case w: Watch ⇒
|
||||||
if (w.watchee == this && w.watcher != this)
|
if (w.watchee == this && w.watcher != this)
|
||||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0)
|
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
|
||||||
true
|
true
|
||||||
case _: Unwatch ⇒ true // Just ignore
|
case _: Unwatch ⇒ true // Just ignore
|
||||||
case _ ⇒ false
|
case _ ⇒ false
|
||||||
|
|
@ -466,10 +466,11 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
|
||||||
override protected def specialHandle(msg: Any): Boolean = msg match {
|
override protected def specialHandle(msg: Any): Boolean = msg match {
|
||||||
case w: Watch ⇒
|
case w: Watch ⇒
|
||||||
if (w.watchee != this && w.watcher != this)
|
if (w.watchee != this && w.watcher != this)
|
||||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, 0)
|
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
|
||||||
true
|
true
|
||||||
case w: Unwatch ⇒ true // Just ignore
|
case w: Unwatch ⇒ true // Just ignore
|
||||||
case _ ⇒ false
|
case NullMessage ⇒ true
|
||||||
|
case _ ⇒ false
|
||||||
}
|
}
|
||||||
|
|
||||||
@throws(classOf[java.io.ObjectStreamException])
|
@throws(classOf[java.io.ObjectStreamException])
|
||||||
|
|
|
||||||
|
|
@ -94,7 +94,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
||||||
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
|
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
|
||||||
}
|
}
|
||||||
|
|
||||||
@tailrec final protected def removeChild(ref: ActorRef): ChildrenContainer = {
|
@tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = {
|
||||||
val c = childrenRefs
|
val c = childrenRefs
|
||||||
val n = c.remove(ref)
|
val n = c.remove(ref)
|
||||||
if (swapChildrenRefs(c, n)) n
|
if (swapChildrenRefs(c, n)) n
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
||||||
|
|
||||||
protected def tellWatchersWeDied(actor: Actor): Unit = {
|
protected def tellWatchersWeDied(actor: Actor): Unit = {
|
||||||
if (!watchedBy.isEmpty) {
|
if (!watchedBy.isEmpty) {
|
||||||
val terminated = Terminated(self)(existenceConfirmed = true, uid)
|
val terminated = Terminated(self)(existenceConfirmed = true)
|
||||||
try {
|
try {
|
||||||
watchedBy foreach {
|
watchedBy foreach {
|
||||||
watcher ⇒
|
watcher ⇒
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import akka.event.Logging.{ Warning, Error, Debug }
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.dispatch.SystemMessage
|
import akka.dispatch.SystemMessage
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
|
import akka.dispatch.NullMessage
|
||||||
|
|
||||||
private[akka] trait FaultHandling { this: ActorCell ⇒
|
private[akka] trait FaultHandling { this: ActorCell ⇒
|
||||||
|
|
||||||
|
|
@ -157,6 +158,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
||||||
try if (a ne null) a.postStop()
|
try if (a ne null) a.postStop()
|
||||||
finally try dispatcher.detach(this)
|
finally try dispatcher.detach(this)
|
||||||
finally try parent.sendSystemMessage(ChildTerminated(self))
|
finally try parent.sendSystemMessage(ChildTerminated(self))
|
||||||
|
finally try parent.tell(NullMessage) // read ScalaDoc of NullMessage to see why
|
||||||
finally try tellWatchersWeDied(a)
|
finally try tellWatchersWeDied(a)
|
||||||
finally try unwatchWatchedActors(a)
|
finally try unwatchWatchedActors(a)
|
||||||
finally {
|
finally {
|
||||||
|
|
|
||||||
|
|
@ -262,7 +262,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
||||||
case _: Terminate ⇒ stop()
|
case _: Terminate ⇒ stop()
|
||||||
case Watch(watchee, watcher) ⇒
|
case Watch(watchee, watcher) ⇒
|
||||||
if (watchee == this && watcher != this) {
|
if (watchee == this && watcher != this) {
|
||||||
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, 0)
|
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
|
||||||
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
|
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
|
||||||
case Unwatch(watchee, watcher) ⇒
|
case Unwatch(watchee, watcher) ⇒
|
||||||
if (watchee == this && watcher != this) remWatcher(watcher)
|
if (watchee == this && watcher != this) remWatcher(watcher)
|
||||||
|
|
@ -281,7 +281,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
||||||
result tryComplete Failure(new ActorKilledException("Stopped"))
|
result tryComplete Failure(new ActorKilledException("Stopped"))
|
||||||
val watchers = clearWatchers()
|
val watchers = clearWatchers()
|
||||||
if (!watchers.isEmpty) {
|
if (!watchers.isEmpty) {
|
||||||
val termination = Terminated(this)(existenceConfirmed = true, 0)
|
val termination = Terminated(this)(existenceConfirmed = true)
|
||||||
watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } }
|
watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -211,4 +211,4 @@ public class FaultHandlingTestBase {
|
||||||
}
|
}
|
||||||
//#testkit
|
//#testkit
|
||||||
}
|
}
|
||||||
//#testkit
|
//#testkit
|
||||||
|
|
|
||||||
|
|
@ -153,4 +153,4 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#testkit
|
//#testkit
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue