[WIP] one leader per cluster team (#23239)

* Guarantee no sneaky type puts more teams in the role list

* Leader per team and initial tests

* MiMa filters

* Second iteration (not working though)

* Verbose gossip logging etc.

* Gossip to team-nodes even if there is inter-team unreachability

* More work ...

* Marking removed nodes with tombstones in Gossip

* More test coverage for Gossip.remove

* Bug failing other multi-node tests squashed

* Multi-node test for team-split

* Review fixes - only prune tombstones on leader ticks

* Clean code is happy code.

* All I want is for MiMa to be my friend

* These constants are internal

* Making the formatting gods happy

* I used the wrong reachability for ignoring gossip :/

* Still hadn't quite gotten how reachability was supposed to work

* Review feedback applied

* Cross-team downing should still work

* Actually prune tombstones in the prune tombstones method ...

* Another round against reachability. Reachability leading with 15 - 2 so far.
This commit is contained in:
Johan Andrén 2017-07-04 09:09:40 +01:00 committed by Patrik Nordwall
parent 0115d5fdda
commit 164387a89e
20 changed files with 1990 additions and 284 deletions

View file

@ -7,19 +7,24 @@ import language.existentials
import scala.collection.immutable
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.actor._
import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
import akka.pattern.ask
import akka.util.Timeout
import akka.Done
import akka.annotation.InternalApi
import scala.concurrent.Future
import scala.concurrent.Promise
@ -266,9 +271,22 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/**
* INTERNAL API.
*/
@InternalApi
private[cluster] object ClusterCoreDaemon {
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5
}
/**
* INTERNAL API.
*/
@InternalApi
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
import ClusterCoreDaemon._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector }
@ -277,10 +295,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
protected def selfUniqueAddress = cluster.selfUniqueAddress
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
// note that self is not initially member,
@ -316,6 +330,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
var exitingConfirmed = Set.empty[UniqueAddress]
def selfTeam = cluster.settings.Team
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@ -544,28 +560,28 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Received `Join` message and replies with `Welcome` message, containing
* current gossip state, including the new joining member.
*/
def joining(node: UniqueAddress, roles: Set[String]): Unit = {
def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (node.address.protocol != selfAddress.protocol)
if (joiningNode.address.protocol != selfAddress.protocol)
log.warning(
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, node.address.protocol)
else if (node.address.system != selfAddress.system)
selfAddress.protocol, joiningNode.address.protocol)
else if (joiningNode.address.system != selfAddress.system)
log.warning(
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.system, node.address.system)
selfAddress.system, joiningNode.address.system)
else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", node, selfStatus)
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus)
else {
val localMembers = latestGossip.members
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
localMembers.find(_.address == node.address) match {
case Some(m) if m.uniqueAddress == node
localMembers.find(_.address == joiningNode.address) match {
case Some(m) if m.uniqueAddress == joiningNode
// node retried join attempt, probably due to lost Welcome message
logInfo("Existing member [{}] is joining again.", m)
if (node != selfUniqueAddress)
if (joiningNode != selfUniqueAddress)
sender() ! Welcome(selfUniqueAddress, latestGossip)
case Some(m)
// node restarted, same host:port as existing member, but with different uid
@ -584,17 +600,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
case None
// remove the node from the failure detector
failureDetector.remove(node.address)
failureDetector.remove(joiningNode.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newMembers = localMembers + Member(joiningNode, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
updateLatestGossip(newGossip)
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
if (node == selfUniqueAddress) {
logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", "))
if (joiningNode == selfUniqueAddress) {
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else
@ -613,8 +629,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (joinWith != from.address)
logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
else {
logInfo("Welcome from [{}]", from.address)
latestGossip = gossip seen selfUniqueAddress
logInfo("Welcome from [{}]", from.address)
assertLatestGossip()
publish(latestGossip)
if (from != selfUniqueAddress)
@ -663,11 +679,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
// send ExitingConfirmed to two potential leaders
val membersWithoutSelf = latestGossip.members.filterNot(_.uniqueAddress == selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf, selfUniqueAddress) match {
val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress)
latestGossip.leaderOf(selfTeam, membersExceptSelf, selfUniqueAddress) match {
case Some(node1)
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
latestGossip.leaderOf(selfTeam, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
case Some(node2)
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None // no more potential leader
@ -706,26 +723,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localReachability = localOverview.reachability
val localReachability = localGossip.teamReachability(selfTeam)
// check if the node to DOWN is in the `members` set
localMembers.find(_.address == address) match {
case Some(m) if (m.status != Down)
case Some(m) if m.status != Down
if (localReachability.isReachable(m.uniqueAddress))
logInfo("Marking node [{}] as [{}]", m.address, Down)
else
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
// replace member (changed status)
val newMembers = localMembers - m + m.copy(status = Down)
// remove nodes marked as DOWN from the `seen` table
val newSeen = localSeen - m.uniqueAddress
// update gossip overview
val newOverview = localOverview copy (seen = newSeen)
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
val newGossip = localGossip.markAsDown(m)
updateLatestGossip(newGossip)
publish(latestGossip)
case Some(_) // already down
case None
@ -751,7 +760,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def receiveGossipStatus(status: GossipStatus): Unit = {
val from = status.from
if (!latestGossip.overview.reachability.isReachable(selfUniqueAddress, from))
if (!latestGossip.isReachable(selfUniqueAddress, from))
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else if (latestGossip.members.forall(_.uniqueAddress != from))
log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from)
@ -778,6 +787,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
@ -788,7 +798,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
} else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) {
} else if (!localGossip.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
@ -839,10 +849,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = winningGossip
else
latestGossip = winningGossip seen selfUniqueAddress
latestGossip =
if (exitingTasksInProgress) winningGossip
else winningGossip seen selfUniqueAddress
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
@ -852,7 +861,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (comparison == VectorClock.Concurrent) {
if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
@ -995,11 +1004,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
*/
def leaderActions(): Unit = {
if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) {
// only run the leader actions if we are the LEADER
if (latestGossip.isTeamLeader(selfTeam, selfUniqueAddress, selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the team
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfUniqueAddress, exitingConfirmed)) {
if (latestGossip.convergence(selfTeam, selfUniqueAddress, exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@ -1012,9 +1021,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
latestGossip.reachabilityExcludingDownedObservers,
latestGossip.members.map(m
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
latestGossip.teamReachabilityExcludingDownedObservers(selfTeam),
latestGossip.members.collect {
case m if m.team == selfTeam
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
}.mkString(", "))
}
}
cleanupExitingConfirmed()
@ -1025,8 +1036,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
// status Down. The down commands should spread before we shutdown.
val unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated
val downed = latestGossip.members.collect { case m if m.status == Down m.uniqueAddress }
val unreachable = latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
val downed = latestGossip.teamMembers(selfTeam).collect { case m if m.status == Down m.uniqueAddress }
if (downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
@ -1059,95 +1070,85 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* 9. Update the state with the new gossip
*/
def leaderActionsOnConvergence(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
val removedUnreachable = for {
node localOverview.reachability.allUnreachableOrTerminated
m = localGossip.member(node)
if Gossip.removeUnreachableWithMemberStatus(m.status)
node latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
m = latestGossip.member(node)
if m.team == selfTeam && Gossip.removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter(n localGossip.member(n).status == Exiting)
val removedExitingConfirmed = exitingConfirmed.filter { n
val member = latestGossip.member(n)
member.team == selfTeam && member.status == Exiting
}
val changedMembers = localMembers collect {
var upNumber = 0
val changedMembers = {
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
{
case m if isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
latestGossip.members collect {
var upNumber = 0
case m if m.status == Leaving
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m copy (status = Exiting)
{
case m if m.team == selfTeam && isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = latestGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
case m if m.team == selfTeam && m.status == Leaving
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m copy (status = Exiting)
}
}
}
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
// handle changes
val updatedGossip: Gossip =
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
// replace changed members
val newMembers = changedMembers.union(localMembers).diff(removedUnreachable)
.filterNot(m removedExitingConfirmed(m.uniqueAddress))
// replace changed members
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
val newGossip =
latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
// removing REMOVED nodes from the `seen` table
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
val newSeen = localSeen diff removed
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
// Clear the VectorClock when member is removed. The change made by the leader is stamped
// and will propagate as is if there are no other changes on other nodes.
// If other concurrent changes on other nodes (e.g. join) the pruning is also
// taken care of when receiving gossips.
val newVersion = removed.foldLeft(localGossip.version) { (v, node)
v.prune(VectorClock.Node(vclockName(node)))
}
val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion)
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
updateLatestGossip(newGossip)
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
changedMembers foreach { m
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
}
removedExitingConfirmed.foreach { n
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
}
// log status changes
changedMembers foreach { m
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
newGossip
} else
latestGossip
// log the removal of the unreachable nodes
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
}
removedExitingConfirmed.foreach { n
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
}
publish(latestGossip)
val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
if (pruned ne latestGossip) {
updateLatestGossip(pruned)
publish(pruned)
}
}
@ -1157,7 +1158,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToWeaklyUp(m: Member): Boolean =
m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
m.team == selfTeam &&
m.status == Joining &&
enoughMembers &&
latestGossip.teamReachabilityExcludingDownedObservers(selfTeam).isReachable(m.uniqueAddress)
val changedMembers = localMembers.collect {
case m if isJoiningToWeaklyUp(m) m.copy(status = WeaklyUp)
}
@ -1203,10 +1207,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (newlyDetectedUnreachableMembers.nonEmpty || newlyDetectedReachableMembers.nonEmpty) {
val newReachability1 = (localOverview.reachability /: newlyDetectedUnreachableMembers) {
val newReachability1 = newlyDetectedUnreachableMembers.foldLeft(localOverview.reachability) {
(reachability, m) reachability.unreachable(selfUniqueAddress, m.uniqueAddress)
}
val newReachability2 = (newReachability1 /: newlyDetectedReachableMembers) {
val newReachability2 = newlyDetectedReachableMembers.foldLeft(newReachability1) {
(reachability, m) reachability.reachable(selfUniqueAddress, m.uniqueAddress)
}
@ -1265,8 +1269,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.hasMember(node) &&
latestGossip.reachabilityExcludingDownedObservers.isReachable(node))
node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfTeam, node)
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
@ -1291,6 +1294,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}")
def publish(newGossip: Gossip): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] team [{}] - New gossip published [{}]", selfAddress, cluster.settings.Team, newGossip)
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}