diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index cf50f8a54f..7bb523024f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -236,7 +236,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with var gossipStats = GossipStats() var seedNodeProcess: Option[ActorRef] = None - var seedNodeProcessCounter = 0 // for unique names + var seedNodeProcessCounter = 0 // for unique names + var leaderActionCounter = 0 /** * Looks up and returns the remote cluster command connection for the specific address. @@ -608,9 +609,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } else if (envelope.to != selfUniqueAddress) { logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) Ignored - } else if (!remoteGossip.overview.reachability.isReachable(selfUniqueAddress)) { - logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address) - Ignored } else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) { logInfo("Ignoring received gossip from unreachable [{}] ", from) Ignored @@ -758,8 +756,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def leaderActions(): Unit = if (latestGossip.isLeader(selfUniqueAddress)) { // only run the leader actions if we are the LEADER - if (latestGossip.convergence) + val firstNotice = 20 + val periodicNotice = 60 + if (latestGossip.convergence(selfUniqueAddress)) { + if (leaderActionCounter >= firstNotice) + logInfo("Leader can perform its duties again") + leaderActionCounter = 0 leaderActionsOnConvergence() + } else { + leaderActionCounter += 1 + if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) + logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]", + latestGossip.reachabilityExcludingDownedObservers, + latestGossip.members.map(m ⇒ + s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", ")) + } } /** @@ -956,7 +967,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def validNodeForGossip(node: UniqueAddress): Boolean = (node != selfUniqueAddress && latestGossip.hasMember(node) && - latestGossip.overview.reachability.isReachable(node)) + latestGossip.reachabilityExcludingDownedObservers.isReachable(node)) def updateLatestGossip(newGossip: Gossip): Unit = { // Updating the vclock version for the changes diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 78d3218939..367c4d1333 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -222,12 +222,12 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] = + private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[UnreachableMember] = if (newGossip eq oldGossip) Nil else { val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated (newGossip.overview.reachability.allUnreachableOrTerminated.collect { - case node if !oldUnreachableNodes.contains(node) ⇒ + case node if !oldUnreachableNodes.contains(node) && node != selfUniqueAddress ⇒ UnreachableMember(newGossip.member(node)) })(collection.breakOut) } @@ -235,11 +235,11 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] = + private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[ReachableMember] = if (newGossip eq oldGossip) Nil else { (oldGossip.overview.reachability.allUnreachable.collect { - case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) ⇒ + case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != selfUniqueAddress ⇒ ReachableMember(newGossip.member(node)) })(collection.breakOut) @@ -291,12 +291,12 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] = + private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] = if (newGossip eq oldGossip) Nil else { - val newConvergence = newGossip.convergence + val newConvergence = newGossip.convergence(selfUniqueAddress) val newSeenBy = newGossip.seenBy - if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy) + if (newConvergence != oldGossip.convergence(selfUniqueAddress) || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } @@ -319,6 +319,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ + val selfUniqueAddress = Cluster(context.system).selfUniqueAddress var latestGossip: Gossip = Gossip.empty override def preRestart(reason: Throwable, message: Option[Any]) { @@ -346,9 +347,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto * to mimic what you would have seen if you were listening to the events. */ def sendCurrentClusterState(receiver: ActorRef): Unit = { + val unreachable: Set[Member] = latestGossip.overview.reachability.allUnreachableOrTerminated.collect { + case node if node != selfUniqueAddress ⇒ latestGossip.member(node) + } val state = CurrentClusterState( members = latestGossip.members, - unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member, + unreachable = unreachable, seenBy = latestGossip.seenBy.map(_.address), leader = latestGossip.leader.map(_.address), roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut)) @@ -384,12 +388,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef ⇒ Unit): Unit = { diffMemberEvents(oldGossip, newGossip) foreach pub - diffUnreachable(oldGossip, newGossip) foreach pub - diffReachable(oldGossip, newGossip) foreach pub + diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub + diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub diffLeader(oldGossip, newGossip) foreach pub diffRolesLeader(oldGossip, newGossip) foreach pub // publish internal SeenState for testing purposes - diffSeen(oldGossip, newGossip) foreach pub + diffSeen(oldGossip, newGossip, selfUniqueAddress) foreach pub diffReachability(oldGossip, newGossip) foreach pub } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index d5054f3978..313fad160f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -20,7 +20,7 @@ private[cluster] object Gossip { if (members.isEmpty) empty else empty.copy(members = members) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) + val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) // FIXME private val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) @@ -159,19 +159,26 @@ private[cluster] final case class Gossip( * * @return true if convergence have been reached and false if not */ - def convergence: Boolean = { + def convergence(selfUniqueAddress: UniqueAddress): Boolean = { // First check that: - // 1. we don't have any members that are unreachable, or + // 1. we don't have any members that are unreachable, excluding observations from members + // that have status DOWN, or // 2. all unreachable members in the set have status DOWN or EXITING // Else we can't continue to check for convergence // When that is done we check that all members with a convergence - // status is in the seen table and has the latest vector clock - // version - val unreachable = overview.reachability.allUnreachableOrTerminated map member + // status is in the seen table, i.e. has seen this version + val unreachable = reachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect { + case node if (node != selfUniqueAddress) ⇒ member(node) + } unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) } + lazy val reachabilityExcludingDownedObservers: Reachability = { + val downed = members.collect { case m if m.status == Down ⇒ m } + overview.reachability.removeObservers(downed.map(_.uniqueAddress)) + } + def isLeader(node: UniqueAddress): Boolean = leader == Some(node) def leader: Option[UniqueAddress] = leaderOf(members) diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 64211f488a..a3ba198045 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -187,6 +187,18 @@ private[cluster] class Reachability private ( } } + def removeObservers(nodes: Set[UniqueAddress]): Reachability = + if (nodes.isEmpty) + this + else { + val newRecords = records.filterNot(r ⇒ nodes(r.observer)) + if (newRecords.size == records.size) this + else { + val newVersions = versions -- nodes + Reachability(newRecords, newVersions) + } + } + def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus = observerRows(observer) match { case None ⇒ Reachable diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 1f24e401b1..de7a981606 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -19,8 +19,15 @@ import akka.testkit.ImplicitSender import akka.actor.ActorRef import akka.testkit.TestProbe +object ClusterDomainEventPublisherSpec { + val config = """ + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.netty.tcp.port = 0 + """ +} + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ClusterDomainEventPublisherSpec extends AkkaSpec +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with BeforeAndAfterEach with ImplicitSender { var publisher: ActorRef = _ diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index efc86d6b55..204c4baecc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -34,6 +34,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val eJoining = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles) val eUp = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) val eDown = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) + val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17) private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) = ((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.uniqueAddress), as + m.uniqueAddress) } @@ -43,7 +44,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { "be empty for the same gossip" in { val g1 = Gossip(members = SortedSet(aUp)) - diffUnreachable(g1, g1) should be(Seq.empty) + diffUnreachable(g1, g1, selfDummyAddress) should be(Seq.empty) } "be produced for new members" in { @@ -51,8 +52,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) diffMemberEvents(g1, g2) should be(Seq(MemberUp(bUp))) - diffUnreachable(g1, g2) should be(Seq.empty) - diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty) + diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for changed status of members" in { @@ -60,8 +61,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining))) diffMemberEvents(g1, g2) should be(Seq(MemberUp(aUp))) - diffUnreachable(g1, g2) should be(Seq.empty) - diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty) + diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for members in unreachable" in { @@ -73,8 +74,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { unreachable(aUp.uniqueAddress, bDown.uniqueAddress) val g2 = Gossip(members = SortedSet(aUp, cUp, bDown, eDown), overview = GossipOverview(reachability = reachability2)) - diffUnreachable(g1, g2) should be(Seq(UnreachableMember(bDown))) - diffSeen(g1, g2) should be(Seq.empty) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq(UnreachableMember(bDown))) + // never include self member in unreachable + diffUnreachable(g1, g2, bDown.uniqueAddress) should be(Seq()) + diffSeen(g1, g2, selfDummyAddress) should be(Seq.empty) } "be produced for members becoming reachable after unreachable" in { @@ -88,8 +91,12 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { reachable(aUp.uniqueAddress, bUp.uniqueAddress) val g2 = Gossip(members = SortedSet(aUp, cUp, bUp, eUp), overview = GossipOverview(reachability = reachability2)) - diffUnreachable(g1, g2) should be(Seq(UnreachableMember(cUp))) - diffReachable(g1, g2) should be(Seq(ReachableMember(bUp))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq(UnreachableMember(cUp))) + // never include self member in unreachable + diffUnreachable(g1, g2, cUp.uniqueAddress) should be(Seq()) + diffReachable(g1, g2, selfDummyAddress) should be(Seq(ReachableMember(bUp))) + // never include self member in reachable + diffReachable(g1, g2, bUp.uniqueAddress) should be(Seq()) } "be produced for removed members" in { @@ -97,8 +104,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) diffMemberEvents(g1, g2) should be(Seq(MemberRemoved(dRemoved, Exiting))) - diffUnreachable(g1, g2) should be(Seq.empty) - diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty) + diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for convergence changes" in { @@ -106,11 +113,11 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress) diffMemberEvents(g1, g2) should be(Seq.empty) - diffUnreachable(g1, g2) should be(Seq.empty) - diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty) + diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) diffMemberEvents(g2, g1) should be(Seq.empty) - diffUnreachable(g2, g1) should be(Seq.empty) - diffSeen(g2, g1) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) + diffUnreachable(g2, g1, selfDummyAddress) should be(Seq.empty) + diffSeen(g2, g1, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) } "be produced for leader changes" in { @@ -118,8 +125,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining))) diffMemberEvents(g1, g2) should be(Seq(MemberRemoved(aRemoved, Up))) - diffUnreachable(g1, g2) should be(Seq.empty) - diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty) + diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) diffLeader(g1, g2) should be(Seq(LeaderChanged(Some(bUp.address)))) } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 4de5e17d1a..9f4f61517b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -29,7 +29,51 @@ class GossipSpec extends WordSpec with Matchers { "A Gossip" must { "reach convergence when it's empty" in { - Gossip.empty.convergence should be(true) + Gossip.empty.convergence(a1.uniqueAddress) should be(true) + } + + "reach convergence for one node" in { + val g1 = (Gossip(members = SortedSet(a1))).seen(a1.uniqueAddress) + g1.convergence(a1.uniqueAddress) should be(true) + } + + "not reach convergence until all have seen version" in { + val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress) + g1.convergence(a1.uniqueAddress) should be(false) + } + + "reach convergence for two nodes" in { + val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress) should be(true) + } + + "reach convergence, skipping joining" in { + // e1 is joining + val g1 = (Gossip(members = SortedSet(a1, b1, e1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress) should be(true) + } + + "reach convergence, skipping down" in { + // e3 is down + val g1 = (Gossip(members = SortedSet(a1, b1, e3))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress) should be(true) + } + + "not reach convergence when unreachable" in { + val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) + val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1))) + .seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(b1.uniqueAddress) should be(false) + // but from a1's point of view (it knows that itself is not unreachable) + g1.convergence(a1.uniqueAddress) should be(true) + } + + "reach convergence when downed node has observed unreachable" in { + // e3 is Down + val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress) + val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1))) + .seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress) + g1.convergence(b1.uniqueAddress) should be(true) } "merge members by status priority" in { diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala index f7bc4de786..8b10f8fe61 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -64,6 +64,19 @@ class ReachabilitySpec extends WordSpec with Matchers { r.isReachable(nodeA) should be(true) } + "exclude observations from specific (downed) nodes" in { + val r = Reachability.empty. + unreachable(nodeC, nodeA).reachable(nodeC, nodeA). + unreachable(nodeC, nodeB). + unreachable(nodeB, nodeA).unreachable(nodeB, nodeC) + + r.isReachable(nodeA) should be(false) + r.isReachable(nodeB) should be(false) + r.isReachable(nodeC) should be(false) + r.allUnreachableOrTerminated should be(Set(nodeA, nodeB, nodeC)) + r.removeObservers(Set(nodeB)).allUnreachableOrTerminated should be(Set(nodeB)) + } + "be pruned when all records of an observer are Reachable" in { val r = Reachability.empty. unreachable(nodeB, nodeA).unreachable(nodeB, nodeC).