diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 9c5dd95dc5..e18e0d56d6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -215,7 +215,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt EventFilter[Exception]("hello", occurrences = 1) intercept { a ! "die" } - val t = probe.expectMsg(Terminated(a)(true)) + val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) t.existenceConfirmed must be(true) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 08a4a84598..633458953e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -65,11 +65,18 @@ case object Kill extends Kill { * Terminated message can't be forwarded to another actor, since that actor * might not be watching the subject. Instead, if you need to forward Terminated * to another actor you should send the information in your own message. + * + * @param actor the watched actor that terminated + * @param existenceConfirmed is false when the Terminated message was not sent + * directly from the watched actor, but derived from another source, such as + * when watching a non-local ActorRef, which might not have been resolved + * @param addressTerminated the Terminated message was derived from + * that the remote node hosting the watched actor was detected as unreachable */ @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)( @BeanProperty val existenceConfirmed: Boolean, - @BeanProperty val addressTerminated: Boolean = false) extends AutoReceivedMessage + @BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2a28e4e8be..5ec4545fd1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -382,10 +382,7 @@ private[akka] class ActorCell( msg.message match { case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) case t: Terminated ⇒ - // when a parent is watching a child and it terminates due to AddressTerminated, - // it should be removed to support immediate creation of child with same name - if (t.addressTerminated) - childrenRefs.getByRef(t.actor) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") @@ -396,6 +393,18 @@ private[akka] class ActorCell( } } + /** + * When a parent is watching a child and it terminates due to AddressTerminated, + * it should be removed to support immediate creation of child with same name. + * + * For remote deployed actors ChildTerminated should be sent to the supervisor + * to clean up child references of remote deployed actors when remote node + * goes down, i.e. triggered by AddressTerminated, but that is the responsibility + * of the ActorRefProvider to handle that scenario. + */ + private def removeChildWhenToAddressTerminated(child: ActorRef): Unit = + childrenRefs.getByRef(child) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 615c4ef92e..90030bd565 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -442,7 +442,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee == this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case _: Unwatch ⇒ true // Just ignore case _ ⇒ false @@ -467,7 +467,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee != this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case w: Unwatch ⇒ true // Just ignore case NullMessage ⇒ true diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 2b8ea2322b..70f79f1d48 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -49,7 +49,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) + val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false) try { watchedBy foreach { watcher ⇒ diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 92e34a2294..704ce43d8d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -261,7 +261,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _: Terminate ⇒ stop() case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true) + if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -280,7 +280,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide result tryComplete Failure(new ActorKilledException("Stopped")) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(existenceConfirmed = true) + val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 5a679fe555..024dfdc00c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -76,6 +76,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor { // send extra ChildTerminated to the supervisor so that it will remove the child if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a)) supervisors -= a + + case _: Terminated ⇒ } }