diff --git a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala index 2685920277..187ef31a52 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala @@ -101,7 +101,7 @@ private[cluster] abstract class AutoDownBase(autoDownUnreachableAfter: FiniteDur import context.dispatcher - val skipMemberStatus = Gossip.convergenceSkipUnreachableWithMemberStatus + val skipMemberStatus = MembershipState.convergenceSkipUnreachableWithMemberStatus var scheduledUnreachable: Map[UniqueAddress, Cancellable] = Map.empty var pendingUnreachable: Set[UniqueAddress] = Set.empty diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ae5815c978..5f8d539235 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -13,6 +13,7 @@ import akka.actor._ import akka.actor.SupervisorStrategy.Stop import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ +import akka.cluster.ClusterSettings.DataCenter import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import scala.collection.breakOut @@ -157,7 +158,7 @@ private[cluster] object InternalClusterAction { final case class SendCurrentClusterState(receiver: ActorRef) extends SubscriptionMessage sealed trait PublishMessage - final case class PublishChanges(newGossip: Gossip) extends PublishMessage + final case class PublishChanges(state: MembershipState) extends PublishMessage final case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage final case object ExitingCompleted @@ -277,6 +278,7 @@ private[cluster] object ClusterCoreDaemon { val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val MaxGossipsBeforeShuttingDownMyself = 5 + } /** @@ -287,6 +289,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ import ClusterCoreDaemon._ + import MembershipState._ val cluster = Cluster(context.system) import cluster.{ selfAddress, selfRoles, scheduler, failureDetector } @@ -299,7 +302,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - var latestGossip: Gossip = Gossip.empty + var membershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter) + def latestGossip: Gossip = membershipState.latestGossip val statsEnabled = PublishStatsInterval.isFinite var gossipStats = GossipStats() @@ -478,7 +482,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def initJoin(): Unit = { val selfStatus = latestGossip.member(selfUniqueAddress).status - if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) { + if (removeUnreachableWithMemberStatus.contains(selfStatus)) { // prevents a Down and Exiting node from being used for joining logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender()) sender() ! InitJoinNack(selfAddress) @@ -570,7 +574,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with log.warning( "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.system, joiningNode.address.system) - else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) + else if (removeUnreachableWithMemberStatus.contains(selfStatus)) logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus) else { val localMembers = latestGossip.members @@ -616,7 +620,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } else sender() ! Welcome(selfUniqueAddress, latestGossip) - publish(latestGossip) + publishMembershipState() } } } @@ -629,10 +633,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (joinWith != from.address) logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith) else { - latestGossip = gossip seen selfUniqueAddress + membershipState = membershipState.copy(latestGossip = gossip).seen() logInfo("Welcome from [{}]", from.address) assertLatestGossip() - publish(latestGossip) + publishMembershipState() if (from != selfUniqueAddress) gossipTo(from, sender()) becomeInitialized() @@ -653,7 +657,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with updateLatestGossip(newGossip) logInfo("Marked address [{}] as [{}]", address, Leaving) - publish(latestGossip) + publishMembershipState() // immediate gossip to speed up the leaving process gossip() } @@ -664,9 +668,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // ExitingCompleted sent via CoordinatedShutdown to continue the leaving process. exitingTasksInProgress = false // mark as seen - latestGossip = latestGossip seen selfUniqueAddress + membershipState = membershipState.seen() assertLatestGossip() - publish(latestGossip) + publishMembershipState() // Let others know (best effort) before shutdown. Otherwise they will not see // convergence of the Exiting state until they have detected this node as @@ -681,10 +685,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // send ExitingConfirmed to two potential leaders val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress) - latestGossip.leaderOf(selfDc, membersExceptSelf, selfUniqueAddress) match { + membershipState.leaderOf(membersExceptSelf) match { case Some(node1) ⇒ clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress) - latestGossip.leaderOf(selfDc, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match { + membershipState.leaderOf(membersExceptSelf.filterNot(_.uniqueAddress == node1)) match { case Some(node2) ⇒ clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress) case None ⇒ // no more potential leader @@ -723,7 +727,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localMembers = localGossip.members val localOverview = localGossip.overview val localSeen = localOverview.seen - val localReachability = localGossip.dcReachability(selfDc) + val localReachability = membershipState.dcReachability // check if the node to DOWN is in the `members` set localMembers.find(_.address == address) match { @@ -735,7 +739,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val newGossip = localGossip.markAsDown(m) updateLatestGossip(newGossip) - publish(latestGossip) + publishMembershipState() case Some(_) ⇒ // already down case None ⇒ logInfo("Ignoring down of unknown node [{}]", address) @@ -753,7 +757,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with log.warning( "Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]", selfAddress, node.address, selfRoles.mkString(",")) - publish(latestGossip) + publishMembershipState() downing(node.address) } } @@ -829,14 +833,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // Perform the same pruning (clear of VectorClock) as the leader did when removing a member. // Removal of member itself is handled in merge (pickHighestPriority) val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) ⇒ - if (Gossip.removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { + if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m) g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) } else g } val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) ⇒ - if (Gossip.removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { + if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m) g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) } else @@ -849,9 +853,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // Don't mark gossip state as seen while exiting is in progress, e.g. // shutting down singleton actors. This delays removal of the member until // the exiting tasks have been completed. - latestGossip = + membershipState = membershipState.copy(latestGossip = if (exitingTasksInProgress) winningGossip - else winningGossip seen selfUniqueAddress + else winningGossip seen selfUniqueAddress) assertLatestGossip() // for all new joining nodes we remove them from the failure detector @@ -877,7 +881,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } - publish(latestGossip) + publishMembershipState() val selfStatus = latestGossip.member(selfUniqueAddress).status if (selfStatus == Exiting && !exitingTasksInProgress) { @@ -1004,11 +1008,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * Runs periodic leader actions, such as member status transitions, assigning partitions etc. */ def leaderActions(): Unit = { - if (latestGossip.isDcLeader(selfDc, selfUniqueAddress, selfUniqueAddress)) { + if (membershipState.isLeader(selfUniqueAddress)) { // only run the leader actions if we are the LEADER of the data center val firstNotice = 20 val periodicNotice = 60 - if (latestGossip.convergence(selfDc, selfUniqueAddress, exitingConfirmed)) { + if (membershipState.convergence(exitingConfirmed)) { if (leaderActionCounter >= firstNotice) logInfo("Leader can perform its duties again") leaderActionCounter = 0 @@ -1021,7 +1025,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) logInfo( "Leader can currently not perform its duties, reachability status: [{}], member status: [{}]", - latestGossip.dcReachabilityExcludingDownedObservers(selfDc), + membershipState.dcReachabilityExcludingDownedObservers, latestGossip.members.collect { case m if m.dataCenter == selfDc ⇒ s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}" @@ -1036,8 +1040,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (latestGossip.member(selfUniqueAddress).status == Down) { // When all reachable have seen the state this member will shutdown itself when it has // status Down. The down commands should spread before we shutdown. - val unreachable = latestGossip.dcReachability(selfDc).allUnreachableOrTerminated - val downed = latestGossip.dcMembers(selfDc).collect { case m if m.status == Down ⇒ m.uniqueAddress } + val unreachable = membershipState.dcReachability.allUnreachableOrTerminated + val downed = membershipState.dcMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) { // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves @@ -1072,9 +1076,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def leaderActionsOnConvergence(): Unit = { val removedUnreachable = for { - node ← latestGossip.dcReachability(selfDc).allUnreachableOrTerminated + node ← membershipState.dcReachability.allUnreachableOrTerminated m = latestGossip.member(node) - if m.dataCenter == selfDc && Gossip.removeUnreachableWithMemberStatus(m.status) + if m.dataCenter == selfDc && removeUnreachableWithMemberStatus(m.status) } yield m val removedExitingConfirmed = exitingConfirmed.filter { n ⇒ @@ -1148,7 +1152,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis) if (pruned ne latestGossip) { updateLatestGossip(pruned) - publish(pruned) + publishMembershipState() } } @@ -1161,7 +1165,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with m.dataCenter == selfDc && m.status == Joining && enoughMembers && - latestGossip.dcReachabilityExcludingDownedObservers(selfDc).isReachable(m.uniqueAddress) + membershipState.dcReachabilityExcludingDownedObservers.isReachable(m.uniqueAddress) val changedMembers = localMembers.collect { case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp) } @@ -1177,7 +1181,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo("Leader is moving node [{}] to [{}]", m.address, m.status) } - publish(latestGossip) + publishMembershipState() } } @@ -1230,7 +1234,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (newlyDetectedReachableMembers.nonEmpty) logInfo("Marking node(s) as REACHABLE [{}]. Node roles [{}]", newlyDetectedReachableMembers.mkString(", "), selfRoles.mkString(",")) - publish(latestGossip) + publishMembershipState() } } } @@ -1269,23 +1273,25 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) def validNodeForGossip(node: UniqueAddress): Boolean = - node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfDc, node) + node != selfUniqueAddress && membershipState.isReachableExcludingDownedObservers(node) - def updateLatestGossip(newGossip: Gossip): Unit = { + def updateLatestGossip(gossip: Gossip): Unit = { // Updating the vclock version for the changes - val versionedGossip = newGossip :+ vclockNode + val versionedGossip = gossip :+ vclockNode // Don't mark gossip state as seen while exiting is in progress, e.g. // shutting down singleton actors. This delays removal of the member until // the exiting tasks have been completed. - if (exitingTasksInProgress) - latestGossip = versionedGossip.clearSeen() - else { - // Nobody else has seen this gossip but us - val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) - // Update the state with the new gossip - latestGossip = seenVersionedGossip - } + val newGossip = + if (exitingTasksInProgress) + versionedGossip.clearSeen() + else { + // Nobody else has seen this gossip but us + val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) + // Update the state with the new gossip + seenVersionedGossip + } + membershipState = membershipState.copy(newGossip) assertLatestGossip() } @@ -1293,11 +1299,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size) throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}") - def publish(newGossip: Gossip): Unit = { + def publishMembershipState(): Unit = { if (cluster.settings.Debug.VerboseGossipLogging) - log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, newGossip) + log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, membershipState.latestGossip) - publisher ! PublishChanges(newGossip) + publisher ! PublishChanges(membershipState) if (PublishStatsInterval == Duration.Zero) publishInternalStats() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index c2728d8eb1..b0b72633a1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -265,12 +265,14 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[UnreachableMember] = - if (newGossip eq oldGossip) Nil + private[cluster] def diffUnreachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[UnreachableMember] = + if (newState eq oldState) Nil else { + val oldGossip = oldState.latestGossip + val newGossip = newState.latestGossip val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated (newGossip.overview.reachability.allUnreachableOrTerminated.collect { - case node if !oldUnreachableNodes.contains(node) && node != selfUniqueAddress ⇒ + case node if !oldUnreachableNodes.contains(node) && node != newState.selfUniqueAddress ⇒ UnreachableMember(newGossip.member(node)) })(collection.breakOut) } @@ -278,11 +280,13 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[ReachableMember] = - if (newGossip eq oldGossip) Nil + private[cluster] def diffReachable(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachableMember] = + if (newState eq oldState) Nil else { - (oldGossip.overview.reachability.allUnreachable.collect { - case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != selfUniqueAddress ⇒ + val oldGossip = oldState.latestGossip + val newGossip = newState.latestGossip + (oldState.overview.reachability.allUnreachable.collect { + case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != newState.selfUniqueAddress ⇒ ReachableMember(newGossip.member(node)) })(collection.breakOut) @@ -291,9 +295,11 @@ object ClusterEvent { /** * INTERNAL API. */ - private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] = - if (newGossip eq oldGossip) Nil + private[cluster] def diffMemberEvents(oldState: MembershipState, newState: MembershipState): immutable.Seq[MemberEvent] = + if (newState eq oldState) Nil else { + val oldGossip = oldState.latestGossip + val newGossip = newState.latestGossip val newMembers = newGossip.members diff oldGossip.members val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress) val changedMembers = membersGroupedByAddress collect { @@ -319,9 +325,9 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = { - val newLeader = newGossip.dcLeader(dc, selfUniqueAddress) - if (newLeader != oldGossip.dcLeader(dc, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address))) + private[cluster] def diffLeader(oldState: MembershipState, newState: MembershipState): immutable.Seq[LeaderChanged] = { + val newLeader = newState.leader + if (newLeader != oldState.leader) List(LeaderChanged(newLeader.map(_.address))) else Nil } @@ -329,11 +335,11 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffRolesLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = { + private[cluster] def diffRolesLeader(oldState: MembershipState, newState: MembershipState): Set[RoleLeaderChanged] = { for { - role ← oldGossip.allRoles union newGossip.allRoles - newLeader = newGossip.roleLeader(dc, role, selfUniqueAddress) - if newLeader != oldGossip.roleLeader(dc, role, selfUniqueAddress) + role ← oldState.latestGossip.allRoles union newState.latestGossip.allRoles + newLeader = newState.roleLeader(role) + if newLeader != oldState.roleLeader(role) } yield RoleLeaderChanged(role, newLeader.map(_.address)) } @@ -341,12 +347,12 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffSeen(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] = - if (newGossip eq oldGossip) Nil + private[cluster] def diffSeen(oldState: MembershipState, newState: MembershipState): immutable.Seq[SeenChanged] = + if (oldState eq newState) Nil else { - val newConvergence = newGossip.convergence(dc, selfUniqueAddress, Set.empty) - val newSeenBy = newGossip.seenBy - if (newConvergence != oldGossip.convergence(dc, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy) + val newConvergence = newState.convergence(Set.empty) + val newSeenBy = newState.latestGossip.seenBy + if (newConvergence != oldState.convergence(Set.empty) || newSeenBy != oldState.latestGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } @@ -355,9 +361,9 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] = - if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil - else List(ReachabilityChanged(newGossip.overview.reachability)) + private[cluster] def diffReachability(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachabilityChanged] = + if (newState.overview.reachability eq oldState.overview.reachability) Nil + else List(ReachabilityChanged(newState.overview.reachability)) } @@ -372,7 +378,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val cluster = Cluster(context.system) val selfUniqueAddress = cluster.selfUniqueAddress - var latestGossip: Gossip = Gossip.empty + val emptyMembershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter) + var membershipState: MembershipState = emptyMembershipState def selfDc = cluster.settings.DataCenter override def preRestart(reason: Throwable, message: Option[Any]) { @@ -382,11 +389,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto override def postStop(): Unit = { // publish the final removed state before shutting down publish(ClusterShuttingDown) - publishChanges(Gossip.empty) + publishChanges(emptyMembershipState) } def receive = { - case PublishChanges(newGossip) ⇒ publishChanges(newGossip) + case PublishChanges(newState) ⇒ publishChanges(newState) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) case SendCurrentClusterState(receiver) ⇒ sendCurrentClusterState(receiver) case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to) @@ -401,16 +408,17 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto * to mimic what you would have seen if you were listening to the events. */ def sendCurrentClusterState(receiver: ActorRef): Unit = { - val unreachable: Set[Member] = latestGossip.overview.reachability.allUnreachableOrTerminated.collect { - case node if node != selfUniqueAddress ⇒ latestGossip.member(node) - } + val unreachable: Set[Member] = + membershipState.latestGossip.overview.reachability.allUnreachableOrTerminated.collect { + case node if node != selfUniqueAddress ⇒ membershipState.latestGossip.member(node) + } val state = CurrentClusterState( - members = latestGossip.members, + members = membershipState.latestGossip.members, unreachable = unreachable, - seenBy = latestGossip.seenBy.map(_.address), - leader = latestGossip.dcLeader(selfDc, selfUniqueAddress).map(_.address), - roleLeaderMap = latestGossip.allRoles.map(r ⇒ - r → latestGossip.roleLeader(selfDc, r, selfUniqueAddress).map(_.address))(collection.breakOut)) + seenBy = membershipState.latestGossip.seenBy.map(_.address), + leader = membershipState.leader.map(_.address), + roleLeaderMap = membershipState.latestGossip.allRoles.map(r ⇒ + r → membershipState.roleLeader(r).map(_.address))(collection.breakOut)) receiver ! state } @@ -421,7 +429,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto if (to.exists(_.isAssignableFrom(event.getClass))) subscriber ! event } - publishDiff(Gossip.empty, latestGossip, pub) + publishDiff(emptyMembershipState, membershipState, pub) case InitialStateAsSnapshot ⇒ sendCurrentClusterState(subscriber) } @@ -434,22 +442,22 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case Some(c) ⇒ eventStream.unsubscribe(subscriber, c) } - def publishChanges(newGossip: Gossip): Unit = { - val oldGossip = latestGossip - // keep the latestGossip to be sent to new subscribers - latestGossip = newGossip - publishDiff(oldGossip, newGossip, publish) + def publishChanges(newState: MembershipState): Unit = { + val oldState = membershipState + // keep the latest state to be sent to new subscribers + membershipState = newState + publishDiff(oldState, newState, publish) } - def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef ⇒ Unit): Unit = { - diffMemberEvents(oldGossip, newGossip) foreach pub - diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub - diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub - diffLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub - diffRolesLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub + def publishDiff(oldState: MembershipState, newState: MembershipState, pub: AnyRef ⇒ Unit): Unit = { + diffMemberEvents(oldState, newState) foreach pub + diffUnreachable(oldState, newState) foreach pub + diffReachable(oldState, newState) foreach pub + diffLeader(oldState, newState) foreach pub + diffRolesLeader(oldState, newState) foreach pub // publish internal SeenState for testing purposes - diffSeen(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub - diffReachability(oldGossip, newGossip) foreach pub + diffSeen(oldState, newState) foreach pub + diffReachability(oldState, newState) foreach pub } def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) @@ -457,6 +465,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publish(event: AnyRef): Unit = eventStream publish event def clearState(): Unit = { - latestGossip = Gossip.empty + membershipState = emptyMembershipState } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index a5e835113c..dcf3982355 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -22,11 +22,6 @@ private[cluster] object Gossip { def apply(members: immutable.SortedSet[Member]) = if (members.isEmpty) empty else empty.copy(members = members) - private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) - val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) - val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) - } /** @@ -75,7 +70,7 @@ private[cluster] final case class Gossip( private def assertInvariants(): Unit = { if (members.exists(_.status == Removed)) - throw new IllegalArgumentException(s"Live members must have status [${Removed}], " + + throw new IllegalArgumentException(s"Live members must not have status [${Removed}], " + s"got [${members.filter(_.status == Removed)}]") val inReachabilityButNotMember = overview.reachability.allObservers diff members.map(_.uniqueAddress) @@ -168,103 +163,17 @@ private[cluster] final case class Gossip( Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock, mergedTombstones) } - /** - * Checks if we have a cluster convergence. If there are any in data center node pairs that cannot reach each other - * then we can't have a convergence until those nodes reach each other again or one of them is downed - * - * @return true if convergence have been reached and false if not - */ - def convergence(dc: DataCenter, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = { - // Find cluster members in the data center that are unreachable from other members of the data center - // excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting. - val unreachableInDc = dcReachabilityExcludingDownedObservers(dc).allUnreachableOrTerminated.collect { - case node if node != selfUniqueAddress && !exitingConfirmed(node) ⇒ member(node) - } - - // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting - // convergence cannot be reached - def memberHinderingConvergenceExists = - members.exists(member ⇒ - member.dataCenter == dc && - Gossip.convergenceMemberStatus(member.status) && - !(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))) - - // unreachables outside of the data center or with status DOWN or EXITING does not affect convergence - def allUnreachablesCanBeIgnored = - unreachableInDc.forall(unreachable ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status)) - - allUnreachablesCanBeIgnored && !memberHinderingConvergenceExists - } - lazy val reachabilityExcludingDownedObservers: Reachability = { val downed = members.collect { case m if m.status == Down ⇒ m } overview.reachability.removeObservers(downed.map(_.uniqueAddress)) } - /** - * @return Reachability excluding observations from nodes outside of the data center, but including observed unreachable - * nodes outside of the data center - */ - def dcReachability(dc: DataCenter): Reachability = - overview.reachability.removeObservers(members.collect { case m if m.dataCenter != dc ⇒ m.uniqueAddress }) - - /** - * @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out - */ - def dcReachabilityExcludingDownedObservers(dc: DataCenter): Reachability = { - val membersToExclude = members.collect { case m if m.status == Down || m.dataCenter != dc ⇒ m.uniqueAddress } - overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != dc ⇒ m.uniqueAddress }) - } - - def dcMembers(dc: DataCenter): SortedSet[Member] = - members.filter(_.dataCenter == dc) - - def isDcLeader(dc: DataCenter, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean = - dcLeader(dc, selfUniqueAddress).contains(node) - - def dcLeader(dc: DataCenter, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = - leaderOf(dc, members, selfUniqueAddress) - - def roleLeader(dc: DataCenter, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = - leaderOf(dc, members.filter(_.hasRole(role)), selfUniqueAddress) - - def leaderOf(dc: DataCenter, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { - val reachability = dcReachability(dc) - - val reachableMembersInDc = - if (reachability.isAllReachable) mbrs.filter(m ⇒ m.dataCenter == dc && m.status != Down) - else mbrs.filter(m ⇒ - m.dataCenter == dc && - m.status != Down && - (reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress)) - if (reachableMembersInDc.isEmpty) None - else reachableMembersInDc.find(m ⇒ Gossip.leaderMemberStatus(m.status)) - .orElse(Some(reachableMembersInDc.min(Member.leaderStatusOrdering))) - .map(_.uniqueAddress) - } - def allDataCenters: Set[DataCenter] = members.map(_.dataCenter) def allRoles: Set[String] = members.flatMap(_.roles) def isSingletonCluster: Boolean = members.size == 1 - /** - * @return true if toAddress should be reachable from the fromDc in general, within a data center - * this means only caring about data center local observations, across data centers it - * means caring about all observations for the toAddress. - */ - def isReachableExcludingDownedObservers(fromDc: DataCenter, toAddress: UniqueAddress): Boolean = - if (!hasMember(toAddress)) false - else { - val to = member(toAddress) - - // if member is in the same data center, we ignore cross data center unreachability - if (fromDc == to.dataCenter) dcReachabilityExcludingDownedObservers(fromDc).isReachable(toAddress) - // if not it is enough that any non-downed node observed it as unreachable - else reachabilityExcludingDownedObservers.isReachable(toAddress) - } - /** * @return true if fromAddress should be able to reach toAddress based on the unreachability data and their * respective data centers diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 4fb0fbc73e..c258e0a871 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -163,7 +163,7 @@ object Member { if (members.size == 2) acc + members.reduceLeft(highestPriorityOf) else { val m = members.head - if (tombstones.contains(m.uniqueAddress) || Gossip.removeUnreachableWithMemberStatus(m.status)) acc // removed + if (tombstones.contains(m.uniqueAddress) || MembershipState.removeUnreachableWithMemberStatus(m.status)) acc // removed else acc + m } } diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala new file mode 100644 index 0000000000..e2375a4d5b --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.collection.SortedSet +import akka.cluster.ClusterSettings.DataCenter +import akka.cluster.MemberStatus._ +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] object MembershipState { + import MemberStatus._ + private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) + private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) + val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) + val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class MembershipState(latestGossip: Gossip, selfUniqueAddress: UniqueAddress, selfDc: DataCenter) { + import MembershipState._ + + def members: immutable.SortedSet[Member] = latestGossip.members + + def overview: GossipOverview = latestGossip.overview + + def seen(): MembershipState = copy(latestGossip = latestGossip.seen(selfUniqueAddress)) + + /** + * Checks if we have a cluster convergence. If there are any in data center node pairs that cannot reach each other + * then we can't have a convergence until those nodes reach each other again or one of them is downed + * + * @return true if convergence have been reached and false if not + */ + def convergence(exitingConfirmed: Set[UniqueAddress]): Boolean = { + + // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting + // convergence cannot be reached + def memberHinderingConvergenceExists = + members.exists(member ⇒ + member.dataCenter == selfDc && + convergenceMemberStatus(member.status) && + !(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))) + + // Find cluster members in the data center that are unreachable from other members of the data center + // excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting. + val unreachableInDc = dcReachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect { + case node if node != selfUniqueAddress && !exitingConfirmed(node) ⇒ latestGossip.member(node) + } + // unreachables outside of the data center or with status DOWN or EXITING does not affect convergence + val allUnreachablesCanBeIgnored = + unreachableInDc.forall(unreachable ⇒ convergenceSkipUnreachableWithMemberStatus(unreachable.status)) + + allUnreachablesCanBeIgnored && !memberHinderingConvergenceExists + } + + /** + * @return Reachability excluding observations from nodes outside of the data center, but including observed unreachable + * nodes outside of the data center + */ + lazy val dcReachability: Reachability = + overview.reachability.removeObservers( + members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) + + /** + * @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out + */ + lazy val dcReachabilityExcludingDownedObservers: Reachability = { + val membersToExclude = members.collect { case m if m.status == Down || m.dataCenter != selfDc ⇒ m.uniqueAddress } + overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) + } + + /** + * @return true if toAddress should be reachable from the fromDc in general, within a data center + * this means only caring about data center local observations, across data centers it + * means caring about all observations for the toAddress. + */ + def isReachableExcludingDownedObservers(toAddress: UniqueAddress): Boolean = + if (!latestGossip.hasMember(toAddress)) false + else { + val to = latestGossip.member(toAddress) + + // if member is in the same data center, we ignore cross data center unreachability + if (selfDc == to.dataCenter) dcReachabilityExcludingDownedObservers.isReachable(toAddress) + // if not it is enough that any non-downed node observed it as unreachable + else latestGossip.reachabilityExcludingDownedObservers.isReachable(toAddress) + } + + def dcMembers: SortedSet[Member] = + members.filter(_.dataCenter == selfDc) + + def isLeader(node: UniqueAddress): Boolean = + leader.contains(node) + + def leader: Option[UniqueAddress] = + leaderOf(members) + + def roleLeader(role: String): Option[UniqueAddress] = + leaderOf(members.filter(_.hasRole(role))) + + def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = { + val reachability = dcReachability + + val reachableMembersInDc = + if (reachability.isAllReachable) mbrs.filter(m ⇒ m.dataCenter == selfDc && m.status != Down) + else mbrs.filter(m ⇒ + m.dataCenter == selfDc && + m.status != Down && + (reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress)) + if (reachableMembersInDc.isEmpty) None + else reachableMembersInDc.find(m ⇒ leaderMemberStatus(m.status)) + .orElse(Some(reachableMembersInDc.min(Member.leaderStatusOrdering))) + .map(_.uniqueAddress) + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 538546c50d..68471ff5b8 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -19,6 +19,7 @@ import akka.testkit.ImplicitSender import akka.actor.ActorRef import akka.remote.RARP import akka.testkit.TestProbe +import akka.cluster.ClusterSettings.DefaultDataCenter object ClusterDomainEventPublisherSpec { val config = """ @@ -36,6 +37,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish else "akka.tcp" var publisher: ActorRef = _ + val aUp = TestMember(Address(protocol, "sys", "a", 2552), Up) val aLeaving = aUp.copy(status = Leaving) val aExiting = aLeaving.copy(status = Exiting) @@ -48,16 +50,27 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up) val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP")) + val emptyMembershipState = MembershipState(Gossip.empty, aUp.uniqueAddress, DefaultDataCenter) + val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress) + val state0 = MembershipState(g0, aUp.uniqueAddress, DefaultDataCenter) val g1 = Gossip(members = SortedSet(aUp, cJoining)).seen(aUp.uniqueAddress).seen(cJoining.uniqueAddress) + val state1 = MembershipState(g1, aUp.uniqueAddress, DefaultDataCenter) val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress) + val state2 = MembershipState(g2, aUp.uniqueAddress, DefaultDataCenter) val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress) + val state3 = MembershipState(g3, aUp.uniqueAddress, DefaultDataCenter) val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress) + val state4 = MembershipState(g4, aUp.uniqueAddress, DefaultDataCenter) val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress) + val state5 = MembershipState(g5, aUp.uniqueAddress, DefaultDataCenter) val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress) + val state6 = MembershipState(g6, aUp.uniqueAddress, DefaultDataCenter) val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress) + val state7 = MembershipState(g7, aUp.uniqueAddress, DefaultDataCenter) val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability = Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress) + val state8 = MembershipState(g8, aUp.uniqueAddress, DefaultDataCenter) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -69,7 +82,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish system.eventStream.subscribe(memberSubscriber.ref, ClusterShuttingDown.getClass) publisher = system.actorOf(Props[ClusterDomainEventPublisher]) - publisher ! PublishChanges(g0) + publisher ! PublishChanges(state0) memberSubscriber.expectMsg(MemberUp(aUp)) memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address))) } @@ -77,19 +90,19 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish "ClusterDomainEventPublisher" must { "publish MemberJoined" in { - publisher ! PublishChanges(g1) + publisher ! PublishChanges(state1) memberSubscriber.expectMsg(MemberJoined(cJoining)) } "publish MemberUp" in { - publisher ! PublishChanges(g2) - publisher ! PublishChanges(g3) + publisher ! PublishChanges(state2) + publisher ! PublishChanges(state3) memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) } "publish leader changed" in { - publisher ! PublishChanges(g4) + publisher ! PublishChanges(state4) memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) @@ -98,17 +111,17 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish } "publish leader changed when old leader leaves and is removed" in { - publisher ! PublishChanges(g3) + publisher ! PublishChanges(state3) memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) - publisher ! PublishChanges(g6) + publisher ! PublishChanges(state6) memberSubscriber.expectMsg(MemberLeft(aLeaving)) - publisher ! PublishChanges(g7) + publisher ! PublishChanges(state7) memberSubscriber.expectMsg(MemberExited(aExiting)) memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address))) memberSubscriber.expectNoMsg(500 millis) // at the removed member a an empty gossip is the last thing - publisher ! PublishChanges(Gossip.empty) + publisher ! PublishChanges(emptyMembershipState) memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting)) memberSubscriber.expectMsg(MemberRemoved(bRemoved, Exiting)) memberSubscriber.expectMsg(MemberRemoved(cRemoved, Up)) @@ -116,13 +129,13 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish } "not publish leader changed when same leader" in { - publisher ! PublishChanges(g4) + publisher ! PublishChanges(state4) memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) - publisher ! PublishChanges(g5) + publisher ! PublishChanges(state5) memberSubscriber.expectNoMsg(500 millis) } @@ -130,12 +143,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged])) subscriber.expectMsgType[CurrentClusterState] - publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp))) + publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cJoining, dUp)), dUp.uniqueAddress, DefaultDataCenter)) subscriber.expectMsgAllOf( RoleLeaderChanged("GRP", Some(dUp.address)), - RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address)) - ) - publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp))) + RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address))) + publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cUp, dUp)), dUp.uniqueAddress, DefaultDataCenter)) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) } @@ -150,7 +162,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish "send events corresponding to current state when subscribe" in { val subscriber = TestProbe() - publisher ! PublishChanges(g8) + publisher ! PublishChanges(state8) publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[MemberEvent], classOf[ReachabilityEvent])) subscriber.receiveN(4).toSet should be(Set(MemberUp(aUp), MemberUp(cUp), MemberUp(dUp), MemberExited(bExiting))) subscriber.expectMsg(UnreachableMember(dUp)) @@ -162,7 +174,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent])) subscriber.expectMsgType[CurrentClusterState] publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent])) - publisher ! PublishChanges(g3) + publisher ! PublishChanges(state3) subscriber.expectNoMsg(500 millis) // but memberSubscriber is still subscriber memberSubscriber.expectMsg(MemberExited(bExiting)) @@ -173,10 +185,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[SeenChanged])) subscriber.expectMsgType[CurrentClusterState] - publisher ! PublishChanges(g2) + publisher ! PublishChanges(state2) subscriber.expectMsgType[SeenChanged] subscriber.expectNoMsg(500 millis) - publisher ! PublishChanges(g3) + publisher ! PublishChanges(state3) subscriber.expectMsgType[SeenChanged] subscriber.expectNoMsg(500 millis) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 785b813d44..420a39d18a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -38,30 +38,33 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) = ((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.uniqueAddress), as + m.uniqueAddress) } + private def state(g: Gossip): MembershipState = + MembershipState(g, selfDummyAddress, ClusterSettings.DefaultDataCenter) + "Domain events" must { "be empty for the same gossip" in { val g1 = Gossip(members = SortedSet(aUp)) - diffUnreachable(g1, g1, selfDummyAddress) should ===(Seq.empty) + diffUnreachable(state(g1), state(g1)) should ===(Seq.empty) } "be produced for new members" in { val (g1, _) = converge(Gossip(members = SortedSet(aUp))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) - diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining))) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffMemberEvents(state(g1), state(g2)) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining))) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + diffSeen(state(g1), state(g2)) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for changed status of members" in { val (g1, _) = converge(Gossip(members = SortedSet(aJoining, bUp, cUp))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining))) - diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining))) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffMemberEvents(state(g1), state(g2)) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining))) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + diffSeen(state(g1), state(g2)) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for members in unreachable" in { @@ -73,10 +76,13 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { unreachable(aUp.uniqueAddress, bDown.uniqueAddress) val g2 = Gossip(members = SortedSet(aUp, cUp, bDown, eDown), overview = GossipOverview(reachability = reachability2)) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq(UnreachableMember(bDown))) + diffUnreachable(state(g1), state(g2)) should ===(Seq(UnreachableMember(bDown))) // never include self member in unreachable - diffUnreachable(g1, g2, bDown.uniqueAddress) should ===(Seq()) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq.empty) + + diffUnreachable( + MembershipState(g1, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter), + MembershipState(g2, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) + diffSeen(state(g1), state(g2)) should ===(Seq.empty) } "be produced for members becoming reachable after unreachable" in { @@ -90,50 +96,54 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { reachable(aUp.uniqueAddress, bUp.uniqueAddress) val g2 = Gossip(members = SortedSet(aUp, cUp, bUp, eUp), overview = GossipOverview(reachability = reachability2)) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq(UnreachableMember(cUp))) + diffUnreachable(state(g1), state(g2)) should ===(Seq(UnreachableMember(cUp))) // never include self member in unreachable - diffUnreachable(g1, g2, cUp.uniqueAddress) should ===(Seq()) - diffReachable(g1, g2, selfDummyAddress) should ===(Seq(ReachableMember(bUp))) + diffUnreachable( + MembershipState(g1, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter), + MembershipState(g2, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) + diffReachable(state(g1), state(g2)) should ===(Seq(ReachableMember(bUp))) // never include self member in reachable - diffReachable(g1, g2, bUp.uniqueAddress) should ===(Seq()) + diffReachable( + MembershipState(g1, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter), + MembershipState(g2, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) } "be produced for removed members" in { val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) - diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(dRemoved, Exiting))) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffMemberEvents(state(g1), state(g2)) should ===(Seq(MemberRemoved(dRemoved, Exiting))) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + diffSeen(state(g1), state(g2)) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for convergence changes" in { val g1 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress).seen(eJoining.uniqueAddress) val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress) - diffMemberEvents(g1, g2) should ===(Seq.empty) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) - diffMemberEvents(g2, g1) should ===(Seq.empty) - diffUnreachable(g2, g1, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) + diffMemberEvents(state(g1), state(g2)) should ===(Seq.empty) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + diffSeen(state(g1), state(g2)) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) + diffMemberEvents(state(g2), state(g1)) should ===(Seq.empty) + diffUnreachable(state(g2), state(g1)) should ===(Seq.empty) + diffSeen(state(g2), state(g1)) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) } "be produced for leader changes" in { val (g1, _) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining))) - diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up))) - diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) - diffLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address)))) + diffMemberEvents(state(g1), state(g2)) should ===(Seq(MemberRemoved(aRemoved, Up))) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + diffSeen(state(g1), state(g2)) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffLeader(state(g1), state(g2)) should ===(Seq(LeaderChanged(Some(bUp.address)))) } "be produced for role leader changes in the same data center" in { val g0 = Gossip.empty val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) - diffRolesLeader(ClusterSettings.DefaultDataCenter, g0, g1, selfDummyAddress) should ===( + diffRolesLeader(state(g0), state(g1)) should ===( Set( // since this role is implicitly added RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(aUp.address)), @@ -143,7 +153,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { RoleLeaderChanged("DD", Some(dLeaving.address)), RoleLeaderChanged("DE", Some(dLeaving.address)), RoleLeaderChanged("EE", Some(eUp.address)))) - diffRolesLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===( + diffRolesLeader(state(g1), state(g2)) should ===( Set( RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(bUp.address)), RoleLeaderChanged("AA", None), @@ -153,10 +163,14 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { "not be produced for role leader changes in other data centers" in { val g0 = Gossip.empty + val s0 = state(g0).copy(selfDc = "dc2") val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) + val s1 = state(g1).copy(selfDc = "dc2") val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) - diffRolesLeader("dc2", g0, g1, selfDummyAddress) should ===(Set.empty) - diffRolesLeader("dc2", g1, g2, selfDummyAddress) should ===(Set.empty) + val s2 = state(g2).copy(selfDc = "dc2") + + diffRolesLeader(s0, s1) should ===(Set.empty) + diffRolesLeader(s1, s2) should ===(Set.empty) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index ab3f9a484e..25e17d91c6 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import org.scalatest.WordSpec import org.scalatest.Matchers import akka.actor.Address +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DefaultDataCenter import scala.collection.immutable.SortedSet @@ -33,6 +34,9 @@ class GossipSpec extends WordSpec with Matchers { val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, dataCenter = "dc2") val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, dataCenter = dc2d1.dataCenter) + private def state(g: Gossip, selfMember: Member = a1): MembershipState = + MembershipState(g, selfMember.uniqueAddress, selfMember.dataCenter) + "A Gossip" must { "have correct test setup" in { @@ -41,40 +45,40 @@ class GossipSpec extends WordSpec with Matchers { } "reach convergence when it's empty" in { - Gossip.empty.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(Gossip.empty).convergence(Set.empty) should ===(true) } "reach convergence for one node" in { val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(g1).convergence(Set.empty) should ===(true) } "not reach convergence until all have seen version" in { val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(false) + state(g1).convergence(Set.empty) should ===(false) } "reach convergence for two nodes" in { val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(g1).convergence(Set.empty) should ===(true) } "reach convergence, skipping joining" in { // e1 is joining val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(g1).convergence(Set.empty) should ===(true) } "reach convergence, skipping down" in { // e3 is down val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(g1).convergence(Set.empty) should ===(true) } "reach convergence, skipping Leaving with exitingConfirmed" in { // c1 is Leaving val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) + state(g1).convergence(Set(c1.uniqueAddress)) should ===(true) } "reach convergence, skipping unreachable Leaving with exitingConfirmed" in { @@ -82,16 +86,16 @@ class GossipSpec extends WordSpec with Matchers { val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress) val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1)) .seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) + state(g1).convergence(Set(c1.uniqueAddress)) should ===(true) } "not reach convergence when unreachable" in { val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(false) + state(g1, b1).convergence(Set.empty) should ===(false) // but from a1's point of view (it knows that itself is not unreachable) - g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) + state(g1).convergence(Set.empty) should ===(true) } "reach convergence when downed node has observed unreachable" in { @@ -99,7 +103,7 @@ class GossipSpec extends WordSpec with Matchers { val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress) - g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(true) + state(g1, b1).convergence(Set.empty) should ===(true) } "merge members by status priority" in { @@ -146,37 +150,33 @@ class GossipSpec extends WordSpec with Matchers { } "have leader as first member based on ordering, except Exiting status" in { - Gossip(members = SortedSet(c2, e2)).dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) - Gossip(members = SortedSet(c3, e2)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(e2.uniqueAddress)) - Gossip(members = SortedSet(c3)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(c3.uniqueAddress)) + state(Gossip(members = SortedSet(c2, e2)), c2).leader should ===(Some(c2.uniqueAddress)) + state(Gossip(members = SortedSet(c3, e2)), c3).leader should ===(Some(e2.uniqueAddress)) + state(Gossip(members = SortedSet(c3)), c3).leader should ===(Some(c3.uniqueAddress)) } "have leader as first reachable member based on ordering" in { val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress) val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1)) - g1.dcLeader(DefaultDataCenter, e2.uniqueAddress) should ===(Some(e2.uniqueAddress)) + state(g1, e2).leader should ===(Some(e2.uniqueAddress)) // but when c2 is selfUniqueAddress - g1.dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) + state(g1, c2).leader should ===(Some(c2.uniqueAddress)) } "not have Down member as leader" in { - Gossip(members = SortedSet(e3)).dcLeader(DefaultDataCenter, e3.uniqueAddress) should ===(None) + state(Gossip(members = SortedSet(e3)), e3).leader should ===(None) } "have a leader per data center" in { val g1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) - // everybodys point of view is dc1a1 being leader of dc1 - g1.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.dcLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.dcLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.dcLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + // dc1a1 being leader of dc1 + state(g1, dc1a1).leader should ===(Some(dc1a1.uniqueAddress)) + state(g1, dc1b1).leader should ===(Some(dc1a1.uniqueAddress)) // and dc2c1 being leader of dc2 - g1.dcLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.dcLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.dcLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + state(g1, dc2c1).leader should ===(Some(dc2c1.uniqueAddress)) + state(g1, dc2d1).leader should ===(Some(dc2c1.uniqueAddress)) } "merge seen table correctly" in { @@ -218,11 +218,11 @@ class GossipSpec extends WordSpec with Matchers { .seen(dc1b1.uniqueAddress) .seen(dc2c1.uniqueAddress) .seen(dc2d1.uniqueAddress) - g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) + state(g, dc1a1).leader should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1a1).convergence(Set.empty) should ===(true) - g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true) + state(g, dc2c1).leader should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2c1).convergence(Set.empty) should ===(true) } "reach convergence per data center even if members of another data center has not seen the gossip" in { @@ -233,12 +233,12 @@ class GossipSpec extends WordSpec with Matchers { // dc2d1 has not seen the gossip // so dc1 can reach convergence - g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) + state(g, dc1a1).leader should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1a1).convergence(Set.empty) should ===(true) // but dc2 cannot - g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false) + state(g, dc2c1).leader should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2c1).convergence(Set.empty) should ===(false) } "reach convergence per data center even if another data center contains unreachable" in { @@ -251,12 +251,12 @@ class GossipSpec extends WordSpec with Matchers { .seen(dc2d1.uniqueAddress) // this data center doesn't care about dc2 having reachability problems and can reach convergence - g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) + state(g, dc1a1).leader should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1a1).convergence(Set.empty) should ===(true) // this data center is cannot reach convergence because of unreachability within the data center - g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false) + state(g, dc2c1).leader should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2c1).convergence(Set.empty) should ===(false) } "reach convergence per data center even if there is unreachable nodes in another data center" in { @@ -271,11 +271,11 @@ class GossipSpec extends WordSpec with Matchers { .seen(dc2d1.uniqueAddress) // neither data center is affected by the inter data center unreachability as far as convergence goes - g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) + state(g, dc1a1).leader should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1a1).convergence(Set.empty) should ===(true) - g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true) + state(g, dc2c1).leader should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2c1).convergence(Set.empty) should ===(true) } "ignore cross data center unreachability when determining inside of data center reachability" in { @@ -291,10 +291,10 @@ class GossipSpec extends WordSpec with Matchers { g.isReachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) should ===(true) g.isReachable(dc2d1.uniqueAddress, dc2c1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc1b1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc1a1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc2d1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc2c1.uniqueAddress) should ===(true) + state(g, dc1a1).isReachableExcludingDownedObservers(dc1b1.uniqueAddress) should ===(true) + state(g, dc1b1).isReachableExcludingDownedObservers(dc1a1.uniqueAddress) should ===(true) + state(g, dc2c1).isReachableExcludingDownedObservers(dc2d1.uniqueAddress) should ===(true) + state(g, dc2d1).isReachableExcludingDownedObservers(dc2c1.uniqueAddress) should ===(true) // between data centers it matters though g.isReachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress) should ===(false) @@ -304,22 +304,22 @@ class GossipSpec extends WordSpec with Matchers { g.isReachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) should ===(true) // this one looks at all unreachable-entries for the to-address - g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc2c1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2c1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc1a1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1a1.uniqueAddress) should ===(false) + state(g, dc1a1).isReachableExcludingDownedObservers(dc2c1.uniqueAddress) should ===(false) + state(g, dc1b1).isReachableExcludingDownedObservers(dc2c1.uniqueAddress) should ===(false) + state(g, dc2c1).isReachableExcludingDownedObservers(dc1a1.uniqueAddress) should ===(false) + state(g, dc2d1).isReachableExcludingDownedObservers(dc1a1.uniqueAddress) should ===(false) // between the two other nodes there is no unreachability g.isReachable(dc1b1.uniqueAddress, dc2d1.uniqueAddress) should ===(true) g.isReachable(dc2d1.uniqueAddress, dc1b1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2d1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1b1.uniqueAddress) should ===(true) + state(g, dc1b1).isReachableExcludingDownedObservers(dc2d1.uniqueAddress) should ===(true) + state(g, dc2d1).isReachableExcludingDownedObservers(dc1b1.uniqueAddress) should ===(true) } "not returning a downed data center leader" in { val g = Gossip(members = SortedSet(dc1a1.copy(Down), dc1b1)) - g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1b1.uniqueAddress)) + state(g, dc1b1).leaderOf(g.members) should ===(Some(dc1b1.uniqueAddress)) } "ignore cross data center unreachability when determining data center leader" in { @@ -329,15 +329,11 @@ class GossipSpec extends WordSpec with Matchers { val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1)) - g.leaderOf("dc1", g.members, dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.leaderOf("dc1", g.members, dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g.leaderOf("dc1", g.members, dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1a1).leaderOf(g.members) should ===(Some(dc1a1.uniqueAddress)) + state(g, dc1b1).leaderOf(g.members) should ===(Some(dc1a1.uniqueAddress)) - g.leaderOf("dc2", g.members, dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.leaderOf("dc2", g.members, dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.leaderOf("dc2", g.members, dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g.leaderOf("dc2", g.members, dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2c1).leaderOf(g.members) should ===(Some(dc2c1.uniqueAddress)) + state(g, dc2d1).leaderOf(g.members) should ===(Some(dc2c1.uniqueAddress)) } // TODO test coverage for when leaderOf returns None - I have not been able to figure it out diff --git a/project/MiMa.scala b/project/MiMa.scala index 73bf9c5897..92e229c589 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1221,14 +1221,6 @@ object MiMa extends AutoPlugin { // #22881 Make sure connections are aborted correctly on Windows ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"), - // #23231 multi-DC Sharding - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.leader"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveLeaderChanged"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.leader_="), - FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.proxyProps"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.this"), - // #23144 recoverWithRetries cleanup ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"), @@ -1237,23 +1229,29 @@ object MiMa extends AutoPlugin { // #23023 added a new overload with implementation to trait, so old transport implementations compiled against // older versions will be missing the method. We accept that incompatibility for now. - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate"), - + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate") + ), + "2.5.3" -> Seq( + // #23231 multi-DC Sharding + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.leader"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveLeaderChanged"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.leader_="), + FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.proxyProps"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.this"), + // #23228 single leader per cluster data center - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.copy"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.isLeader"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.leader"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.leaderOf"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.roleLeader"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.NumberOfGossipsBeforeShutdownWhenLeaderExits"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.vclockName"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.MaxGossipsBeforeShuttingDownMyself"), + FilterAnyProblemStartingWith("akka.cluster.Gossip"), + FilterAnyProblemStartingWith("akka.cluster.ClusterCoreDaemon"), + FilterAnyProblemStartingWith("akka.cluster.ClusterDomainEventPublisher"), + FilterAnyProblemStartingWith("akka.cluster.InternalClusterAction"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffReachable"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffLeader"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffRolesLeader"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffSeen"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterEvent.diffReachability"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffUnreachable"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterEvent.diffMemberEvents"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesCount"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstones"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesList"),