diff --git a/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes index e19a81f5b5..81c2ce3e97 100644 --- a/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes @@ -22,3 +22,7 @@ ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.protobuf.msg.ClusterMe # Upgrade to protobuf 3 ProblemFilters.exclude[Problem]("akka.cluster.protobuf.msg.ClusterMessages*") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.protobuf.ClusterMessageSerializer.compress") + +# #24955 publish of UnreachableDataCenter and ReachableDataCenter +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.isReachable") + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index fa5e14a86b..d41897a3cd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -437,14 +437,13 @@ object ClusterEvent { /** * Internal API */ - private[cluster] def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])( - otherDc: DataCenter): Boolean = { + private[cluster] def isDataCenterReachable(state: MembershipState)(otherDc: DataCenter): Boolean = { val unrelatedDcNodes = state.latestGossip.members.collect { case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc => m.uniqueAddress } val reachabilityForOtherDc = state.dcReachabilityWithoutObservationsWithin.remove(unrelatedDcNodes) - reachabilityForOtherDc.allUnreachable.filterNot(oldUnreachableNodes).isEmpty + reachabilityForOtherDc.allUnreachable.isEmpty } /** @@ -457,11 +456,11 @@ object ClusterEvent { else { val otherDcs = (oldState.latestGossip.allDataCenters .union(newState.latestGossip.allDataCenters)) - newState.selfDc - otherDcs - .filterNot(isReachable(newState, oldState.dcReachability.allUnreachableOrTerminated)) - .iterator - .map(UnreachableDataCenter) - .to(immutable.IndexedSeq) + + val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState)) + val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState)) + + currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter).to(immutable.IndexedSeq) } } @@ -476,8 +475,8 @@ object ClusterEvent { val otherDcs = (oldState.latestGossip.allDataCenters .union(newState.latestGossip.allDataCenters)) - newState.selfDc - val oldUnreachableDcs = otherDcs.filterNot(isReachable(oldState, Set())) - val currentUnreachableDcs = otherDcs.filterNot(isReachable(newState, Set())) + val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState)) + val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState)) oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter).to(immutable.IndexedSeq) } @@ -620,7 +619,7 @@ private[cluster] final class ClusterDomainEventPublisher val unreachableDataCenters: Set[DataCenter] = if (!membershipState.latestGossip.isMultiDc) Set.empty - else membershipState.latestGossip.allDataCenters.filterNot(isReachable(membershipState, Set.empty)) + else membershipState.latestGossip.allDataCenters.filterNot(isDataCenterReachable(membershipState)) val state = new CurrentClusterState( members = membershipState.latestGossip.members, diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 58a3d563a8..27e9e4b479 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -154,6 +154,73 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq()) } + "be produced correctly for scenario in issue #24955" in { + + // The scenario as seen from dc2MemberC was a sequence of reachability changes + // - empty + // - C --unreachable--> A + // - C --unreachable--> B + // - empty + // - B --unreachable--> C + + val dc1MemberA = TestMember(Address("akka", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2") + val dc1MemberB = TestMember(Address("akka", "sys", "dc2B", 2552), Up, Set.empty[String], "dc2") + val dc2MemberC = TestMember(Address("akka", "sys", "dc3A", 2552), Up, Set.empty[String], "dc3") + + val members = SortedSet(dc1MemberA, dc1MemberB, dc2MemberC) + + val reachability1 = Reachability.empty + val g1 = Gossip(members, overview = GossipOverview(reachability = reachability1)) + + // - C --unreachable--> A + // cross unreachable => UnreachableDataCenter + val reachability2 = reachability1.unreachable(dc2MemberC.uniqueAddress, dc1MemberA.uniqueAddress) + val g2 = Gossip(members, overview = GossipOverview(reachability = reachability2)) + diffUnreachableDataCenter( + MembershipState(g1, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===( + Seq(UnreachableDataCenter(dc1MemberA.dataCenter))) + diffReachableDataCenter( + MembershipState(g1, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + // - C --unreachable--> B + // adding one more cross unreachable to same DC shouldn't publish anything new + // this was the problem in issue #24955, it published another UnreachableDataCenter + val reachability3 = reachability2.unreachable(dc2MemberC.uniqueAddress, dc1MemberB.uniqueAddress) + val g3 = Gossip(members, overview = GossipOverview(reachability = reachability3)) + diffUnreachableDataCenter( + MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + diffReachableDataCenter( + MembershipState(g2, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + // - empty + // reachable again => ReachableDataCenter + val reachability4 = Reachability.empty + val g4 = Gossip(members, overview = GossipOverview(reachability = reachability4)) + diffUnreachableDataCenter( + MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + diffReachableDataCenter( + MembershipState(g3, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===( + Seq(ReachableDataCenter(dc1MemberA.dataCenter))) + + // - B --unreachable--> C + // unreachable opposite direction shouldn't publish anything new + val reachability5 = reachability4.unreachable(dc1MemberB.uniqueAddress, dc2MemberC.uniqueAddress) + val g5 = Gossip(members, overview = GossipOverview(reachability = reachability5)) + diffUnreachableDataCenter( + MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g5, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + diffReachableDataCenter( + MembershipState(g4, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5), + MembershipState(g5, dc2MemberC.uniqueAddress, dc2MemberC.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + } + "be produced for members becoming reachable after unreachable" in { val reachability1 = Reachability.empty .unreachable(aUp.uniqueAddress, cUp.uniqueAddress)