=clu #13875 Exclude unreachability observations from downed

* Skip observations from downed node (quarantined is marked down immediately)
  in convergence check
* Skip observations from downed node when picking "reachable" targets for gossip.
* This also means that we must accept gossip with own node marked as unreachable,
  but that should not be spread to the external membership events.
This commit is contained in:
Patrik Nordwall 2015-01-30 14:30:16 +01:00
parent 8e6d81242f
commit 71ccb4c21b
8 changed files with 147 additions and 42 deletions

View file

@ -237,6 +237,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
var seedNodeProcess: Option[ActorRef] = None
var seedNodeProcessCounter = 0 // for unique names
var leaderActionCounter = 0
/**
* Looks up and returns the remote cluster command connection for the specific address.
@ -608,9 +609,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
} else if (!remoteGossip.overview.reachability.isReachable(selfUniqueAddress)) {
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
Ignored
} else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
@ -758,8 +756,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def leaderActions(): Unit =
if (latestGossip.isLeader(selfUniqueAddress)) {
// only run the leader actions if we are the LEADER
if (latestGossip.convergence)
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfUniqueAddress)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
leaderActionsOnConvergence()
} else {
leaderActionCounter += 1
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
latestGossip.reachabilityExcludingDownedObservers,
latestGossip.members.map(m
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
}
}
/**
@ -956,7 +967,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.hasMember(node) &&
latestGossip.overview.reachability.isReachable(node))
latestGossip.reachabilityExcludingDownedObservers.isReachable(node))
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes

View file

@ -222,12 +222,12 @@ object ClusterEvent {
/**
* INTERNAL API
*/
private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] =
private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[UnreachableMember] =
if (newGossip eq oldGossip) Nil
else {
val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated
(newGossip.overview.reachability.allUnreachableOrTerminated.collect {
case node if !oldUnreachableNodes.contains(node)
case node if !oldUnreachableNodes.contains(node) && node != selfUniqueAddress
UnreachableMember(newGossip.member(node))
})(collection.breakOut)
}
@ -235,11 +235,11 @@ object ClusterEvent {
/**
* INTERNAL API
*/
private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] =
private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[ReachableMember] =
if (newGossip eq oldGossip) Nil
else {
(oldGossip.overview.reachability.allUnreachable.collect {
case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node)
case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) && node != selfUniqueAddress
ReachableMember(newGossip.member(node))
})(collection.breakOut)
@ -291,12 +291,12 @@ object ClusterEvent {
/**
* INTERNAL API
*/
private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] =
private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
val newConvergence = newGossip.convergence
val newConvergence = newGossip.convergence(selfUniqueAddress)
val newSeenBy = newGossip.seenBy
if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy)
if (newConvergence != oldGossip.convergence(selfUniqueAddress) || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}
@ -319,6 +319,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
var latestGossip: Gossip = Gossip.empty
override def preRestart(reason: Throwable, message: Option[Any]) {
@ -346,9 +347,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
* to mimic what you would have seen if you were listening to the events.
*/
def sendCurrentClusterState(receiver: ActorRef): Unit = {
val unreachable: Set[Member] = latestGossip.overview.reachability.allUnreachableOrTerminated.collect {
case node if node != selfUniqueAddress latestGossip.member(node)
}
val state = CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
unreachable = unreachable,
seenBy = latestGossip.seenBy.map(_.address),
leader = latestGossip.leader.map(_.address),
roleLeaderMap = latestGossip.allRoles.map(r r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
@ -384,12 +388,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef Unit): Unit = {
diffMemberEvents(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip) foreach pub
diffReachable(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffLeader(oldGossip, newGossip) foreach pub
diffRolesLeader(oldGossip, newGossip) foreach pub
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach pub
diffSeen(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachability(oldGossip, newGossip) foreach pub
}

View file

@ -20,7 +20,7 @@ private[cluster] object Gossip {
if (members.isEmpty) empty else empty.copy(members = members)
private val leaderMemberStatus = Set[MemberStatus](Up, Leaving)
private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving)
val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) // FIXME private
val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
@ -159,19 +159,26 @@ private[cluster] final case class Gossip(
*
* @return true if convergence have been reached and false if not
*/
def convergence: Boolean = {
def convergence(selfUniqueAddress: UniqueAddress): Boolean = {
// First check that:
// 1. we don't have any members that are unreachable, or
// 1. we don't have any members that are unreachable, excluding observations from members
// that have status DOWN, or
// 2. all unreachable members in the set have status DOWN or EXITING
// Else we can't continue to check for convergence
// When that is done we check that all members with a convergence
// status is in the seen table and has the latest vector clock
// version
val unreachable = overview.reachability.allUnreachableOrTerminated map member
// status is in the seen table, i.e. has seen this version
val unreachable = reachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect {
case node if (node != selfUniqueAddress) member(node)
}
unreachable.forall(m Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
!members.exists(m Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress))
}
lazy val reachabilityExcludingDownedObservers: Reachability = {
val downed = members.collect { case m if m.status == Down m }
overview.reachability.removeObservers(downed.map(_.uniqueAddress))
}
def isLeader(node: UniqueAddress): Boolean = leader == Some(node)
def leader: Option[UniqueAddress] = leaderOf(members)

View file

@ -187,6 +187,18 @@ private[cluster] class Reachability private (
}
}
def removeObservers(nodes: Set[UniqueAddress]): Reachability =
if (nodes.isEmpty)
this
else {
val newRecords = records.filterNot(r nodes(r.observer))
if (newRecords.size == records.size) this
else {
val newVersions = versions -- nodes
Reachability(newRecords, newVersions)
}
}
def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus =
observerRows(observer) match {
case None Reachable

View file

@ -19,8 +19,15 @@ import akka.testkit.ImplicitSender
import akka.actor.ActorRef
import akka.testkit.TestProbe
object ClusterDomainEventPublisherSpec {
val config = """
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port = 0
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterDomainEventPublisherSpec extends AkkaSpec
class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config)
with BeforeAndAfterEach with ImplicitSender {
var publisher: ActorRef = _

View file

@ -34,6 +34,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val eJoining = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles)
val eUp = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, eRoles)
val eDown = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, eRoles)
val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17)
private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) =
((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) (gs.seen(m.uniqueAddress), as + m.uniqueAddress) }
@ -43,7 +44,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
"be empty for the same gossip" in {
val g1 = Gossip(members = SortedSet(aUp))
diffUnreachable(g1, g1) should be(Seq.empty)
diffUnreachable(g1, g1, selfDummyAddress) should be(Seq.empty)
}
"be produced for new members" in {
@ -51,8 +52,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining)))
diffMemberEvents(g1, g2) should be(Seq(MemberUp(bUp)))
diffUnreachable(g1, g2) should be(Seq.empty)
diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for changed status of members" in {
@ -60,8 +61,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining)))
diffMemberEvents(g1, g2) should be(Seq(MemberUp(aUp)))
diffUnreachable(g1, g2) should be(Seq.empty)
diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for members in unreachable" in {
@ -73,8 +74,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
unreachable(aUp.uniqueAddress, bDown.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, cUp, bDown, eDown), overview = GossipOverview(reachability = reachability2))
diffUnreachable(g1, g2) should be(Seq(UnreachableMember(bDown)))
diffSeen(g1, g2) should be(Seq.empty)
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq(UnreachableMember(bDown)))
// never include self member in unreachable
diffUnreachable(g1, g2, bDown.uniqueAddress) should be(Seq())
diffSeen(g1, g2, selfDummyAddress) should be(Seq.empty)
}
"be produced for members becoming reachable after unreachable" in {
@ -88,8 +91,12 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
reachable(aUp.uniqueAddress, bUp.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, cUp, bUp, eUp), overview = GossipOverview(reachability = reachability2))
diffUnreachable(g1, g2) should be(Seq(UnreachableMember(cUp)))
diffReachable(g1, g2) should be(Seq(ReachableMember(bUp)))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq(UnreachableMember(cUp)))
// never include self member in unreachable
diffUnreachable(g1, g2, cUp.uniqueAddress) should be(Seq())
diffReachable(g1, g2, selfDummyAddress) should be(Seq(ReachableMember(bUp)))
// never include self member in reachable
diffReachable(g1, g2, bUp.uniqueAddress) should be(Seq())
}
"be produced for removed members" in {
@ -97,8 +104,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g2, s2) = converge(Gossip(members = SortedSet(aUp)))
diffMemberEvents(g1, g2) should be(Seq(MemberRemoved(dRemoved, Exiting)))
diffUnreachable(g1, g2) should be(Seq.empty)
diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for convergence changes" in {
@ -106,11 +113,11 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress)
diffMemberEvents(g1, g2) should be(Seq.empty)
diffUnreachable(g1, g2) should be(Seq.empty)
diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
diffMemberEvents(g2, g1) should be(Seq.empty)
diffUnreachable(g2, g1) should be(Seq.empty)
diffSeen(g2, g1) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
diffUnreachable(g2, g1, selfDummyAddress) should be(Seq.empty)
diffSeen(g2, g1, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
}
"be produced for leader changes" in {
@ -118,8 +125,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining)))
diffMemberEvents(g1, g2) should be(Seq(MemberRemoved(aRemoved, Up)))
diffUnreachable(g1, g2) should be(Seq.empty)
diffSeen(g1, g2) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffUnreachable(g1, g2, selfDummyAddress) should be(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffLeader(g1, g2) should be(Seq(LeaderChanged(Some(bUp.address))))
}

View file

@ -29,7 +29,51 @@ class GossipSpec extends WordSpec with Matchers {
"A Gossip" must {
"reach convergence when it's empty" in {
Gossip.empty.convergence should be(true)
Gossip.empty.convergence(a1.uniqueAddress) should be(true)
}
"reach convergence for one node" in {
val g1 = (Gossip(members = SortedSet(a1))).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should be(true)
}
"not reach convergence until all have seen version" in {
val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should be(false)
}
"reach convergence for two nodes" in {
val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should be(true)
}
"reach convergence, skipping joining" in {
// e1 is joining
val g1 = (Gossip(members = SortedSet(a1, b1, e1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should be(true)
}
"reach convergence, skipping down" in {
// e3 is down
val g1 = (Gossip(members = SortedSet(a1, b1, e3))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should be(true)
}
"not reach convergence when unreachable" in {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(b1.uniqueAddress) should be(false)
// but from a1's point of view (it knows that itself is not unreachable)
g1.convergence(a1.uniqueAddress) should be(true)
}
"reach convergence when downed node has observed unreachable" in {
// e3 is Down
val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress)
g1.convergence(b1.uniqueAddress) should be(true)
}
"merge members by status priority" in {

View file

@ -64,6 +64,19 @@ class ReachabilitySpec extends WordSpec with Matchers {
r.isReachable(nodeA) should be(true)
}
"exclude observations from specific (downed) nodes" in {
val r = Reachability.empty.
unreachable(nodeC, nodeA).reachable(nodeC, nodeA).
unreachable(nodeC, nodeB).
unreachable(nodeB, nodeA).unreachable(nodeB, nodeC)
r.isReachable(nodeA) should be(false)
r.isReachable(nodeB) should be(false)
r.isReachable(nodeC) should be(false)
r.allUnreachableOrTerminated should be(Set(nodeA, nodeB, nodeC))
r.removeObservers(Set(nodeB)).allUnreachableOrTerminated should be(Set(nodeB))
}
"be pruned when all records of an observer are Reachable" in {
val r = Reachability.empty.
unreachable(nodeB, nodeA).unreachable(nodeB, nodeC).