Send extra DeathWatchNotification from RemoteWatcher, see #3368
This commit is contained in:
parent
c616fc43a6
commit
2dd0141bfc
2 changed files with 19 additions and 5 deletions
|
|
@ -35,7 +35,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQu
|
|||
|
||||
case t @ Terminated(a) if supervisors isDefinedAt a ⇒
|
||||
// send extra DeathWatchNotification to the supervisor so that it will remove the child
|
||||
supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
|
||||
supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = t.existenceConfirmed,
|
||||
addressTerminated = t.addressTerminated))
|
||||
supervisors -= a
|
||||
|
||||
case _: Terminated ⇒
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ import akka.actor.Terminated
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.ConfigurationException
|
||||
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
||||
import akka.actor.InternalActorRef
|
||||
import akka.dispatch.sysmsg.DeathWatchNotification
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -124,7 +126,7 @@ private[akka] class RemoteWatcher(
|
|||
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
|
||||
case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher)
|
||||
case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher)
|
||||
case Terminated(watchee) ⇒ terminated(watchee)
|
||||
case t @ Terminated(watchee) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated)
|
||||
|
||||
// test purpose
|
||||
case Stats ⇒
|
||||
|
|
@ -209,11 +211,22 @@ private[akka] class RemoteWatcher(
|
|||
log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path)
|
||||
}
|
||||
|
||||
def terminated(watchee: ActorRef): Unit = {
|
||||
def terminated(watchee: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = {
|
||||
log.debug("Watchee terminated: [{}]", watchee.path)
|
||||
watching = watching.filterNot {
|
||||
case (wee, _) ⇒ wee == watchee
|
||||
|
||||
// When watchee is stopped it sends DeathWatchNotification to the watcher and to this RemoteWatcher,
|
||||
// which is also watching. Send extra DeathWatchNotification to the watcher in case the
|
||||
// DeathWatchNotification message is only delivered to RemoteWatcher. Otherwise there is a risk that
|
||||
// the monitoring is removed, subsequent node failure is not detected and the original watcher is
|
||||
// never notified. This may occur for normal system shutdown of the watchee system when not all remote
|
||||
// messages are flushed at shutdown.
|
||||
watching --= watching collect {
|
||||
case tuple @ (wee, wer: InternalActorRef) if wee == watchee ⇒
|
||||
if (!addressTerminated && wer != self)
|
||||
wer.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated))
|
||||
tuple
|
||||
}
|
||||
|
||||
checkLastUnwatchOfNode(watchee.path.address)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue