=clu #23286 filter emitted reachability event by DC
This commit is contained in:
parent
a85e4b056c
commit
9f4da87840
2 changed files with 13 additions and 10 deletions
|
|
@ -268,13 +268,12 @@ object ClusterEvent {
|
|||
private[cluster] def diffUnreachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[UnreachableMember] =
|
||||
if (newState eq oldState) Nil
|
||||
else {
|
||||
val oldGossip = oldState.latestGossip
|
||||
val newGossip = newState.latestGossip
|
||||
val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated
|
||||
(newGossip.overview.reachability.allUnreachableOrTerminated.collect {
|
||||
val oldUnreachableNodes = oldState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated
|
||||
newState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect {
|
||||
case node if !oldUnreachableNodes.contains(node) && node != newState.selfUniqueAddress ⇒
|
||||
UnreachableMember(newGossip.member(node))
|
||||
})(collection.breakOut)
|
||||
}(collection.breakOut)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -283,13 +282,11 @@ object ClusterEvent {
|
|||
private[cluster] def diffReachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachableMember] =
|
||||
if (newState eq oldState) Nil
|
||||
else {
|
||||
val oldGossip = oldState.latestGossip
|
||||
val newGossip = newState.latestGossip
|
||||
(oldState.overview.reachability.allUnreachable.collect {
|
||||
case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != newState.selfUniqueAddress ⇒
|
||||
oldState.dcReachabilityNoOutsideNodes.allUnreachable.collect {
|
||||
case node if newGossip.hasMember(node) && newState.dcReachabilityNoOutsideNodes.isReachable(node) && node != newState.selfUniqueAddress ⇒
|
||||
ReachableMember(newGossip.member(node))
|
||||
})(collection.breakOut)
|
||||
|
||||
}(collection.breakOut)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -413,7 +410,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
*/
|
||||
def sendCurrentClusterState(receiver: ActorRef): Unit = {
|
||||
val unreachable: Set[Member] =
|
||||
membershipState.latestGossip.overview.reachability.allUnreachableOrTerminated.collect {
|
||||
membershipState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect {
|
||||
case node if node != selfUniqueAddress ⇒ membershipState.latestGossip.member(node)
|
||||
}
|
||||
val state = CurrentClusterState(
|
||||
|
|
@ -454,6 +451,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
}
|
||||
|
||||
def publishDiff(oldState: MembershipState, newState: MembershipState, pub: AnyRef ⇒ Unit): Unit = {
|
||||
def inSameDc(reachabilityEvent: ReachabilityEvent): Boolean =
|
||||
reachabilityEvent.member.dataCenter == selfDc
|
||||
|
||||
diffMemberEvents(oldState, newState) foreach pub
|
||||
diffUnreachable(oldState, newState) foreach pub
|
||||
diffReachable(oldState, newState) foreach pub
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue