Incorporate review feedback, see #2550

This commit is contained in:
Patrik Nordwall 2012-09-28 11:18:15 +02:00
parent c6dd37607c
commit ba7a18dde5
7 changed files with 29 additions and 11 deletions

View file

@ -215,7 +215,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
EventFilter[Exception]("hello", occurrences = 1) intercept {
a ! "die"
}
val t = probe.expectMsg(Terminated(a)(true))
val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false))
t.existenceConfirmed must be(true)
}

View file

@ -65,11 +65,18 @@ case object Kill extends Kill {
* Terminated message can't be forwarded to another actor, since that actor
* might not be watching the subject. Instead, if you need to forward Terminated
* to another actor you should send the information in your own message.
*
* @param actor the watched actor that terminated
* @param existenceConfirmed is false when the Terminated message was not sent
* directly from the watched actor, but derived from another source, such as
* when watching a non-local ActorRef, which might not have been resolved
* @param addressTerminated the Terminated message was derived from
* that the remote node hosting the watched actor was detected as unreachable
*/
@SerialVersionUID(1L)
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
@BeanProperty val existenceConfirmed: Boolean,
@BeanProperty val addressTerminated: Boolean = false) extends AutoReceivedMessage
@BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage
/**
* INTERNAL API

View file

@ -382,10 +382,7 @@ private[akka] class ActorCell(
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated
// when a parent is watching a child and it terminates due to AddressTerminated,
// it should be removed to support immediate creation of child with same name
if (t.addressTerminated)
childrenRefs.getByRef(t.actor) foreach { crs removeChildAndGetStateChange(crs.child) }
if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor)
watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
@ -396,6 +393,18 @@ private[akka] class ActorCell(
}
}
/**
* When a parent is watching a child and it terminates due to AddressTerminated,
* it should be removed to support immediate creation of child with same name.
*
* For remote deployed actors ChildTerminated should be sent to the supervisor
* to clean up child references of remote deployed actors when remote node
* goes down, i.e. triggered by AddressTerminated, but that is the responsibility
* of the ActorRefProvider to handle that scenario.
*/
private def removeChildWhenToAddressTerminated(child: ActorRef): Unit =
childrenRefs.getByRef(child) foreach { crs removeChildAndGetStateChange(crs.child) }
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
/*

View file

@ -442,7 +442,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee == this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
true
case _: Unwatch true // Just ignore
case _ false
@ -467,7 +467,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
override protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee != this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
true
case w: Unwatch true // Just ignore
case NullMessage true

View file

@ -49,7 +49,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
try {
watchedBy foreach {
watcher

View file

@ -261,7 +261,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
case _: Terminate stop()
case Watch(watchee, watcher)
if (watchee == this && watcher != this) {
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false)
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher)
if (watchee == this && watcher != this) remWatcher(watcher)
@ -280,7 +280,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
result tryComplete Failure(new ActorKilledException("Stopped"))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(existenceConfirmed = true)
val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
}
}

View file

@ -76,6 +76,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor {
// send extra ChildTerminated to the supervisor so that it will remove the child
if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a))
supervisors -= a
case _: Terminated
}
}