diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 6047d99cd7..3222edd5c8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -11,6 +11,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved +import akka.cluster.ClusterEvent.UnreachableMember import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher @@ -57,16 +58,20 @@ private[cluster] class ClusterRemoteWatcher( import RemoteWatcher._ + val cluster = Cluster(context.system) + import cluster.selfAddress + var clusterNodes: Set[Address] = Set.empty override def preStart(): Unit = { super.preStart() - Cluster(context.system).subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[UnreachableMember]) } override def postStop(): Unit = { super.postStop() - Cluster(context.system).unsubscribe(self) + cluster.unsubscribe(self) } override def receive = receiveClusterEvent orElse super.receive @@ -75,15 +80,27 @@ private[cluster] class ClusterRemoteWatcher( case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒ () // cluster managed node, don't propagate to super case state: CurrentClusterState ⇒ - clusterNodes = state.members.map(_.address) + clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility + unreachable = state.unreachable.collect { case m if m.address != selfAddress ⇒ m.address } case MemberUp(m) ⇒ - clusterNodes += m.address - takeOverResponsibility(m.address) + if (m.address != selfAddress) { + clusterNodes += m.address + takeOverResponsibility(m.address) + unreachable -= m.address + } + case UnreachableMember(m) ⇒ + if (m.address != selfAddress) + unreachable += m.address case MemberRemoved(m) ⇒ - clusterNodes -= m.address - quarantine(m.address, m.uniqueAddress.uid) - publishAddressTerminated(m.address) + if (m.address != selfAddress) { + clusterNodes -= m.address + if (unreachable contains m.address) { + quarantine(m.address, m.uniqueAddress.uid) + unreachable -= m.address + } + publishAddressTerminated(m.address) + } } /**