diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala index 448640d253..ce2afb20e5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala @@ -35,7 +35,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQu case t @ Terminated(a) if supervisors isDefinedAt a ⇒ // send extra DeathWatchNotification to the supervisor so that it will remove the child - supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) + supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = t.existenceConfirmed, + addressTerminated = t.addressTerminated)) supervisors -= a case _: Terminated ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index a9c877dab6..68bc4c683d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -15,6 +15,8 @@ import akka.actor.Terminated import akka.actor.ExtendedActorSystem import akka.ConfigurationException import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.InternalActorRef +import akka.dispatch.sysmsg.DeathWatchNotification /** * INTERNAL API @@ -124,7 +126,7 @@ private[akka] class RemoteWatcher( case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher) case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher) - case Terminated(watchee) ⇒ terminated(watchee) + case t @ Terminated(watchee) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated) // test purpose case Stats ⇒ @@ -209,11 +211,22 @@ private[akka] class RemoteWatcher( log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) } - def terminated(watchee: ActorRef): Unit = { + def terminated(watchee: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { log.debug("Watchee terminated: [{}]", watchee.path) - watching = watching.filterNot { - case (wee, _) ⇒ wee == watchee + + // When watchee is stopped it sends DeathWatchNotification to the watcher and to this RemoteWatcher, + // which is also watching. Send extra DeathWatchNotification to the watcher in case the + // DeathWatchNotification message is only delivered to RemoteWatcher. Otherwise there is a risk that + // the monitoring is removed, subsequent node failure is not detected and the original watcher is + // never notified. This may occur for normal system shutdown of the watchee system when not all remote + // messages are flushed at shutdown. + watching --= watching collect { + case tuple @ (wee, wer: InternalActorRef) if wee == watchee ⇒ + if (!addressTerminated && wer != self) + wer.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated)) + tuple } + checkLastUnwatchOfNode(watchee.path.address) }