!act #3583 Timer based auto-down
* Replace (deprecate) akka.cluster.auto-down config setting with akka.cluster.auto-down-unreachable-after * AutoDown actor that keeps track of unreachable members and performs down from the leader node when they have been unreachable for the specified duration * Migration guide
This commit is contained in:
parent
c55189f615
commit
d5b25cbbc6
27 changed files with 355 additions and 450 deletions
|
|
@ -219,7 +219,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
import cluster.settings._
|
||||
import cluster.settings.{ AutoDown ⇒ _, _ }
|
||||
import cluster.InfoLogger._
|
||||
|
||||
protected def selfUniqueAddress = cluster.selfUniqueAddress
|
||||
|
|
@ -267,6 +267,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
override def preStart(): Unit = {
|
||||
context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])
|
||||
|
||||
AutoDownUnreachableAfter match {
|
||||
case d: FiniteDuration ⇒
|
||||
context.actorOf(AutoDown.props(d) withDispatcher (context.props.dispatcher), name = "autoDown")
|
||||
case _ ⇒ // auto-down is disabled
|
||||
}
|
||||
|
||||
if (SeedNodes.isEmpty)
|
||||
logInfo("No seed-nodes configured, manual cluster join required")
|
||||
else
|
||||
|
|
@ -691,16 +698,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
|
||||
/**
|
||||
* Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes,
|
||||
* assigning partitions etc.
|
||||
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
|
||||
*/
|
||||
def leaderActions(): Unit =
|
||||
if (latestGossip.isLeader(selfUniqueAddress)) {
|
||||
// only run the leader actions if we are the LEADER
|
||||
|
||||
if (AutoDown)
|
||||
leaderAutoDownActions()
|
||||
|
||||
if (latestGossip.convergence)
|
||||
leaderActionsOnConvergence()
|
||||
}
|
||||
|
|
@ -812,44 +814,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When the node is in the UNREACHABLE set it can be auto-down by leader
|
||||
*/
|
||||
def leaderAutoDownActions(): Unit = {
|
||||
val localGossip = latestGossip
|
||||
val localMembers = localGossip.members
|
||||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
|
||||
val changedUnreachableMembers = for {
|
||||
node ← localOverview.reachability.allUnreachableOrTerminated
|
||||
m = localGossip.member(node)
|
||||
if m.status != Removed && !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)
|
||||
} yield m.copy(status = Down)
|
||||
|
||||
if (changedUnreachableMembers.nonEmpty) {
|
||||
// handle changes
|
||||
|
||||
// replace changed unreachable
|
||||
val newMembers = localMembers -- changedUnreachableMembers ++ changedUnreachableMembers
|
||||
|
||||
// removing nodes marked as Down/Exiting from the `seen` table
|
||||
val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress)
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen) // update gossip overview
|
||||
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
|
||||
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
// log the auto-downing of the unreachable nodes
|
||||
changedUnreachableMembers foreach { m ⇒
|
||||
logInfo("Leader is marking unreachable node [{}] as [{}]", m.address, m.status)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reaps the unreachable members according to the failure detector's verdict.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue