diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 16c9b21c7e..91b6859f75 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -268,13 +268,12 @@ object ClusterEvent { private[cluster] def diffUnreachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[UnreachableMember] = if (newState eq oldState) Nil else { - val oldGossip = oldState.latestGossip val newGossip = newState.latestGossip - val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated - (newGossip.overview.reachability.allUnreachableOrTerminated.collect { + val oldUnreachableNodes = oldState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated + newState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect { case node if !oldUnreachableNodes.contains(node) && node != newState.selfUniqueAddress ⇒ UnreachableMember(newGossip.member(node)) - })(collection.breakOut) + }(collection.breakOut) } /** @@ -283,13 +282,11 @@ object ClusterEvent { private[cluster] def diffReachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachableMember] = if (newState eq oldState) Nil else { - val oldGossip = oldState.latestGossip val newGossip = newState.latestGossip - (oldState.overview.reachability.allUnreachable.collect { - case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != newState.selfUniqueAddress ⇒ + oldState.dcReachabilityNoOutsideNodes.allUnreachable.collect { + case node if newGossip.hasMember(node) && newState.dcReachabilityNoOutsideNodes.isReachable(node) && node != newState.selfUniqueAddress ⇒ ReachableMember(newGossip.member(node)) - })(collection.breakOut) - + }(collection.breakOut) } /** @@ -413,7 +410,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto */ def sendCurrentClusterState(receiver: ActorRef): Unit = { val unreachable: Set[Member] = - membershipState.latestGossip.overview.reachability.allUnreachableOrTerminated.collect { + membershipState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect { case node if node != selfUniqueAddress ⇒ membershipState.latestGossip.member(node) } val state = CurrentClusterState( @@ -454,6 +451,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } def publishDiff(oldState: MembershipState, newState: MembershipState, pub: AnyRef ⇒ Unit): Unit = { + def inSameDc(reachabilityEvent: ReachabilityEvent): Boolean = + reachabilityEvent.member.dataCenter == selfDc + diffMemberEvents(oldState, newState) foreach pub diffUnreachable(oldState, newState) foreach pub diffReachable(oldState, newState) foreach pub diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index 99e10a273c..0c893f4e55 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -90,6 +90,9 @@ import scala.util.Random overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) } + lazy val dcReachabilityNoOutsideNodes: Reachability = + overview.reachability.remove(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) + /** * @return Up to `crossDcConnections` oldest members for each DC */