Avoid publication of duplicate UnreachableDataCenter, #24955
* see scenario in added test, extracted from debug logs "New gossip published" from MultiDcSplitBrainSpec * it publised UnreachableDataCenter even though it had already published that, when a new cross link is added as unreachable in the reachability table * don't know why the implementation of diffUnreachableDataCenter/isReachable was so complicated, but no tests are failing for the changed implementation
This commit is contained in:
parent
9a9e21a62b
commit
cf4c954009
3 changed files with 81 additions and 11 deletions
|
|
@ -22,3 +22,7 @@ ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.protobuf.msg.ClusterMe
|
||||||
# Upgrade to protobuf 3
|
# Upgrade to protobuf 3
|
||||||
ProblemFilters.exclude[Problem]("akka.cluster.protobuf.msg.ClusterMessages*")
|
ProblemFilters.exclude[Problem]("akka.cluster.protobuf.msg.ClusterMessages*")
|
||||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.protobuf.ClusterMessageSerializer.compress")
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.protobuf.ClusterMessageSerializer.compress")
|
||||||
|
|
||||||
|
# #24955 publish of UnreachableDataCenter and ReachableDataCenter
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.isReachable")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -437,14 +437,13 @@ object ClusterEvent {
|
||||||
/**
|
/**
|
||||||
* Internal API
|
* Internal API
|
||||||
*/
|
*/
|
||||||
private[cluster] def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(
|
private[cluster] def isDataCenterReachable(state: MembershipState)(otherDc: DataCenter): Boolean = {
|
||||||
otherDc: DataCenter): Boolean = {
|
|
||||||
val unrelatedDcNodes = state.latestGossip.members.collect {
|
val unrelatedDcNodes = state.latestGossip.members.collect {
|
||||||
case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc => m.uniqueAddress
|
case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc => m.uniqueAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
val reachabilityForOtherDc = state.dcReachabilityWithoutObservationsWithin.remove(unrelatedDcNodes)
|
val reachabilityForOtherDc = state.dcReachabilityWithoutObservationsWithin.remove(unrelatedDcNodes)
|
||||||
reachabilityForOtherDc.allUnreachable.filterNot(oldUnreachableNodes).isEmpty
|
reachabilityForOtherDc.allUnreachable.isEmpty
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -457,11 +456,11 @@ object ClusterEvent {
|
||||||
else {
|
else {
|
||||||
val otherDcs = (oldState.latestGossip.allDataCenters
|
val otherDcs = (oldState.latestGossip.allDataCenters
|
||||||
.union(newState.latestGossip.allDataCenters)) - newState.selfDc
|
.union(newState.latestGossip.allDataCenters)) - newState.selfDc
|
||||||
otherDcs
|
|
||||||
.filterNot(isReachable(newState, oldState.dcReachability.allUnreachableOrTerminated))
|
val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState))
|
||||||
.iterator
|
val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState))
|
||||||
.map(UnreachableDataCenter)
|
|
||||||
.to(immutable.IndexedSeq)
|
currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter).to(immutable.IndexedSeq)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -476,8 +475,8 @@ object ClusterEvent {
|
||||||
val otherDcs = (oldState.latestGossip.allDataCenters
|
val otherDcs = (oldState.latestGossip.allDataCenters
|
||||||
.union(newState.latestGossip.allDataCenters)) - newState.selfDc
|
.union(newState.latestGossip.allDataCenters)) - newState.selfDc
|
||||||
|
|
||||||
val oldUnreachableDcs = otherDcs.filterNot(isReachable(oldState, Set()))
|
val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState))
|
||||||
val currentUnreachableDcs = otherDcs.filterNot(isReachable(newState, Set()))
|
val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState))
|
||||||
|
|
||||||
oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter).to(immutable.IndexedSeq)
|
oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter).to(immutable.IndexedSeq)
|
||||||
}
|
}
|
||||||
|
|
@ -620,7 +619,7 @@ private[cluster] final class ClusterDomainEventPublisher
|
||||||
|
|
||||||
val unreachableDataCenters: Set[DataCenter] =
|
val unreachableDataCenters: Set[DataCenter] =
|
||||||
if (!membershipState.latestGossip.isMultiDc) Set.empty
|
if (!membershipState.latestGossip.isMultiDc) Set.empty
|
||||||
else membershipState.latestGossip.allDataCenters.filterNot(isReachable(membershipState, Set.empty))
|
else membershipState.latestGossip.allDataCenters.filterNot(isDataCenterReachable(membershipState))
|
||||||
|
|
||||||
val state = new CurrentClusterState(
|
val state = new CurrentClusterState(
|
||||||
members = membershipState.latestGossip.members,
|
members = membershipState.latestGossip.members,
|
||||||
|
|
|
||||||
|
|
@ -154,6 +154,73 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
|
||||||
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"be produced correctly for scenario in issue #24955" in {
|
||||||
|
|
||||||
|
// The scenario as seen from dc2MemberC was a sequence of reachability changes
|
||||||
|
// - empty
|
||||||
|
// - C --unreachable--> A
|
||||||
|
// - C --unreachable--> B
|
||||||
|
// - empty
|
||||||
|
// - B --unreachable--> C
|
||||||
|
|
||||||
|
val dc1MemberA = TestMember(Address("akka", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2")
|
||||||
|
val dc1MemberB = TestMember(Address("akka", "sys", "dc2B", 2552), Up, Set.empty[String], "dc2")
|
||||||
|
val dc2MemberC = TestMember(Address("akka", "sys", "dc3A", 2552), Up, Set.empty[String], "dc3")
|
||||||
|
|
||||||
|
val members = SortedSet(dc1MemberA, dc1MemberB, dc2MemberC)
|
||||||
|
|
||||||
|
val reachability1 = Reachability.empty
|
||||||
|
val g1 = Gossip(members, overview = GossipOverview(reachability = reachability1))
|
||||||
|
|
||||||
|
// - C --unreachable--> A
|
||||||
|
// cross unreachable => UnreachableDataCenter
|
||||||
|
val reachability2 = reachability1.unreachable(dc2MemberC.uniqueAddress, dc1MemberA.uniqueAddress)
|
||||||
|
val g2 = Gossip(members, overview = GossipOverview(reachability = reachability2))
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g1, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(
|
||||||
|
Seq(UnreachableDataCenter(dc1MemberA.dataCenter)))
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g1, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
// - C --unreachable--> B
|
||||||
|
// adding one more cross unreachable to same DC shouldn't publish anything new
|
||||||
|
// this was the problem in issue #24955, it published another UnreachableDataCenter
|
||||||
|
val reachability3 = reachability2.unreachable(dc2MemberC.uniqueAddress, dc1MemberB.uniqueAddress)
|
||||||
|
val g3 = Gossip(members, overview = GossipOverview(reachability = reachability3))
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
// - empty
|
||||||
|
// reachable again => ReachableDataCenter
|
||||||
|
val reachability4 = Reachability.empty
|
||||||
|
val g4 = Gossip(members, overview = GossipOverview(reachability = reachability4))
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(
|
||||||
|
Seq(ReachableDataCenter(dc1MemberA.dataCenter)))
|
||||||
|
|
||||||
|
// - B --unreachable--> C
|
||||||
|
// unreachable opposite direction shouldn't publish anything new
|
||||||
|
val reachability5 = reachability4.unreachable(dc1MemberB.uniqueAddress, dc2MemberC.uniqueAddress)
|
||||||
|
val g5 = Gossip(members, overview = GossipOverview(reachability = reachability5))
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g5, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g5, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
"be produced for members becoming reachable after unreachable" in {
|
"be produced for members becoming reachable after unreachable" in {
|
||||||
val reachability1 = Reachability.empty
|
val reachability1 = Reachability.empty
|
||||||
.unreachable(aUp.uniqueAddress, cUp.uniqueAddress)
|
.unreachable(aUp.uniqueAddress, cUp.uniqueAddress)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue