Merge pull request #17646 from akka/wip-down-improvements-patriknw
=clu Improve cluster downing
This commit is contained in:
commit
57da65fa06
12 changed files with 227 additions and 101 deletions
|
|
@ -1569,6 +1569,8 @@ object ShardCoordinator {
|
|||
*/
|
||||
private final case class ResendShardHost(shard: ShardId, region: ActorRef)
|
||||
|
||||
private final case class DelayedShardRegionTerminated(region: ActorRef)
|
||||
|
||||
/**
|
||||
* INTERNAL API. Rebalancing process is performed by this actor.
|
||||
* It sends `BeginHandOff` to all `ShardRegion` actors followed by
|
||||
|
|
@ -1632,11 +1634,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
|
||||
override def snapshotPluginId: String = settings.snapshotPluginId
|
||||
|
||||
val removalMargin = Cluster(context.system).settings.DownRemovalMargin
|
||||
|
||||
var persistentState = State.empty
|
||||
var rebalanceInProgress = Set.empty[ShardId]
|
||||
var unAckedHostShards = Map.empty[ShardId, Cancellable]
|
||||
// regions that have requested handoff, for graceful shutdown
|
||||
var gracefulShutdownInProgress = Set.empty[ActorRef]
|
||||
var aliveRegions = Set.empty[ActorRef]
|
||||
var persistCount = 0
|
||||
|
||||
import context.dispatcher
|
||||
|
|
@ -1694,6 +1699,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
override def receiveCommand: Receive = {
|
||||
case Register(region) ⇒
|
||||
log.debug("ShardRegion registered: [{}]", region)
|
||||
aliveRegions += region
|
||||
if (persistentState.regions.contains(region))
|
||||
sender() ! RegisterAck(self)
|
||||
else {
|
||||
|
|
@ -1724,20 +1730,12 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
}
|
||||
}
|
||||
|
||||
case Terminated(ref) ⇒
|
||||
case t @ Terminated(ref) ⇒
|
||||
if (persistentState.regions.contains(ref)) {
|
||||
log.debug("ShardRegion terminated: [{}]", ref)
|
||||
|
||||
require(persistentState.regions.contains(ref), s"Terminated region $ref not registered")
|
||||
persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) }
|
||||
|
||||
gracefulShutdownInProgress -= ref
|
||||
|
||||
saveSnapshotWhenNeeded()
|
||||
persist(ShardRegionTerminated(ref)) { evt ⇒
|
||||
persistentState = persistentState.updated(evt)
|
||||
allocateShardHomes()
|
||||
}
|
||||
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
|
||||
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
|
||||
else
|
||||
regionTerminated(ref)
|
||||
} else if (persistentState.regionProxies.contains(ref)) {
|
||||
log.debug("ShardRegion proxy terminated: [{}]", ref)
|
||||
saveSnapshotWhenNeeded()
|
||||
|
|
@ -1746,6 +1744,9 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
}
|
||||
}
|
||||
|
||||
case DelayedShardRegionTerminated(ref) ⇒
|
||||
regionTerminated(ref)
|
||||
|
||||
case GetShardHome(shard) ⇒
|
||||
if (!rebalanceInProgress.contains(shard)) {
|
||||
persistentState.shards.get(shard) match {
|
||||
|
|
@ -1859,6 +1860,20 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
case _: CurrentClusterState ⇒
|
||||
}
|
||||
|
||||
def regionTerminated(ref: ActorRef): Unit =
|
||||
if (persistentState.regions.contains(ref)) {
|
||||
log.debug("ShardRegion terminated: [{}]", ref)
|
||||
persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) }
|
||||
|
||||
gracefulShutdownInProgress -= ref
|
||||
|
||||
saveSnapshotWhenNeeded()
|
||||
persist(ShardRegionTerminated(ref)) { evt ⇒
|
||||
persistentState = persistentState.updated(evt)
|
||||
allocateShardHomes()
|
||||
}
|
||||
}
|
||||
|
||||
def shuttingDown: Receive = {
|
||||
case _ ⇒ // ignore all
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
|
|||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.cluster.down-removal-margin = 5s
|
||||
akka.cluster.roles = ["backend"]
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||
akka.persistence.journal.leveldb-shared {
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
|
|||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.cluster.down-removal-margin = 5s
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||
akka.persistence.journal.leveldb-shared {
|
||||
timeout = 5s
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
|||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.cluster.down-removal-margin = 5s
|
||||
akka.cluster.roles = ["backend"]
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||
akka.persistence.journal.leveldb-shared.store {
|
||||
|
|
|
|||
|
|
@ -110,29 +110,11 @@ akka.cluster.singleton {
|
|||
# If the role is not specified it's a singleton among all nodes in the cluster.
|
||||
role = ""
|
||||
|
||||
# When a node is becoming oldest it sends hand-over request to previous oldest.
|
||||
# This is retried with the 'retry-interval' until the previous oldest confirms
|
||||
# that the hand over has started, or this -max-hand-over-retries' limit has been
|
||||
# reached. If the retry limit is reached it takes the decision to be the new oldest
|
||||
# if previous oldest is unknown (typically removed), otherwise it initiates a new
|
||||
# round by throwing 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting
|
||||
# restart with fresh state. For a cluster with many members you might need to increase
|
||||
# this retry limit because it takes longer time to propagate changes across all nodes.
|
||||
max-hand-over-retries = 10
|
||||
|
||||
# When a oldest node leaves the cluster it is not oldest any more and then it sends
|
||||
# take over request to the new oldest to initiate the hand-over process. This is
|
||||
# retried with the 'retry-interval' until this retry limit has been reached. If the
|
||||
# retry limit is reached it initiates a new round by throwing
|
||||
# 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting restart with
|
||||
# fresh state. This will also cause the singleton actor to be stopped.
|
||||
# 'max-take-over-retries` must be less than 'max-hand-over-retries' to ensure that
|
||||
# new oldest doesn't start singleton actor before previous is stopped for certain
|
||||
# corner cases.
|
||||
max-take-over-retries = 5
|
||||
|
||||
# Interval for hand over and take over messages
|
||||
retry-interval = 1s
|
||||
# When a node is becoming oldest it sends hand-over request to previous oldest,
|
||||
# that might be leaving the cluster. This is retried with this interval until
|
||||
# the previous oldest confirms that the hand over has started or the previous
|
||||
# oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
|
||||
hand-over-retry-interval = 1s
|
||||
}
|
||||
|
||||
akka.cluster.singleton-proxy {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ object ClusterSingletonManagerSettings {
|
|||
*/
|
||||
def apply(system: ActorSystem): ClusterSingletonManagerSettings =
|
||||
apply(system.settings.config.getConfig("akka.cluster.singleton"))
|
||||
.withRemovalMargin(Cluster(system).settings.DownRemovalMargin)
|
||||
|
||||
/**
|
||||
* Create settings from a configuration with the same layout as
|
||||
|
|
@ -42,9 +43,8 @@ object ClusterSingletonManagerSettings {
|
|||
new ClusterSingletonManagerSettings(
|
||||
singletonName = config.getString("singleton-name"),
|
||||
role = roleOption(config.getString("role")),
|
||||
maxHandOverRetries = config.getInt("max-hand-over-retries"),
|
||||
maxTakeOverRetries = config.getInt("max-take-over-retries"),
|
||||
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis)
|
||||
removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin
|
||||
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis)
|
||||
|
||||
/**
|
||||
* Java API: Create settings from the default configuration
|
||||
|
|
@ -73,40 +73,24 @@ object ClusterSingletonManagerSettings {
|
|||
* If the role is not specified it's a singleton among all nodes in
|
||||
* the cluster.
|
||||
*
|
||||
* @param maxHandOverRetries When a node is becoming oldest it sends
|
||||
* hand-over request to previous oldest. This is retried with the
|
||||
* `retryInterval` until the previous oldest confirms that the hand
|
||||
* over has started, or this `maxHandOverRetries` limit has been
|
||||
* reached. If the retry limit is reached it takes the decision to be
|
||||
* the new oldest if previous oldest is unknown (typically removed),
|
||||
* otherwise it initiates a new round by throwing
|
||||
* [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting
|
||||
* restart with fresh state. For a cluster with many members you might
|
||||
* need to increase this retry limit because it takes longer time to
|
||||
* propagate changes across all nodes.
|
||||
* @param removalMargin Margin until the singleton instance that belonged to
|
||||
* a downed/removed partition is created in surviving partition. The purpose of
|
||||
* this margin is that in case of a network partition the singleton actors
|
||||
* in the non-surviving partitions must be stopped before corresponding actors
|
||||
* are started somewhere else. This is especially important for persistent
|
||||
* actors.
|
||||
*
|
||||
* @param maxTakeOverRetries When a oldest node leaves the cluster it is
|
||||
* not oldest any more and then it sends take over request to the new oldest to
|
||||
* initiate the hand-over process. This is retried with the `retryInterval` until
|
||||
* this retry limit has been reached. If the retry limit is reached it initiates
|
||||
* a new round by throwing [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]]
|
||||
* and expecting restart with fresh state. This will also cause the singleton actor
|
||||
* to be stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
|
||||
* ensure that new oldest doesn't start singleton actor before previous is
|
||||
* stopped for certain corner cases.
|
||||
*
|
||||
* @param retryInterval Interval for hand over and take over messages
|
||||
* @param handOverRetryInterval When a node is becoming oldest it sends hand-over
|
||||
* request to previous oldest, that might be leaving the cluster. This is
|
||||
* retried with this interval until the previous oldest confirms that the hand
|
||||
* over has started or the previous oldest member is removed from the cluster
|
||||
* (+ `removalMargin`).
|
||||
*/
|
||||
final class ClusterSingletonManagerSettings(
|
||||
val singletonName: String,
|
||||
val role: Option[String],
|
||||
val maxHandOverRetries: Int,
|
||||
val maxTakeOverRetries: Int,
|
||||
val retryInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
|
||||
|
||||
// to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases
|
||||
require(maxTakeOverRetries < maxHandOverRetries,
|
||||
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]")
|
||||
val removalMargin: FiniteDuration,
|
||||
val handOverRetryInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
|
||||
|
||||
def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)
|
||||
|
||||
|
|
@ -114,17 +98,17 @@ final class ClusterSingletonManagerSettings(
|
|||
|
||||
def withRole(role: Option[String]) = copy(role = role)
|
||||
|
||||
def withRetry(maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
|
||||
copy(maxHandOverRetries = maxHandOverRetries,
|
||||
maxTakeOverRetries = maxTakeOverRetries,
|
||||
retryInterval = retryInterval)
|
||||
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings =
|
||||
copy(removalMargin = removalMargin)
|
||||
|
||||
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
|
||||
copy(handOverRetryInterval = retryInterval)
|
||||
|
||||
private def copy(singletonName: String = singletonName,
|
||||
role: Option[String] = role,
|
||||
maxHandOverRetries: Int = maxHandOverRetries,
|
||||
maxTakeOverRetries: Int = maxTakeOverRetries,
|
||||
retryInterval: FiniteDuration = retryInterval): ClusterSingletonManagerSettings =
|
||||
new ClusterSingletonManagerSettings(singletonName, role, maxHandOverRetries, maxTakeOverRetries, retryInterval)
|
||||
removalMargin: FiniteDuration = removalMargin,
|
||||
handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings =
|
||||
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
|
||||
}
|
||||
|
||||
object ClusterSingletonManager {
|
||||
|
|
@ -202,6 +186,7 @@ object ClusterSingletonManager {
|
|||
newOldestOption: Option[Address]) extends Data
|
||||
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
|
||||
case object EndData extends Data
|
||||
final case class DelayedMemberRemoved(member: Member)
|
||||
|
||||
val HandOverRetryTimer = "hand-over-retry"
|
||||
val TakeOverRetryTimer = "take-over-retry"
|
||||
|
|
@ -390,6 +375,15 @@ class ClusterSingletonManager(
|
|||
require(role.forall(cluster.selfRoles.contains),
|
||||
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
|
||||
|
||||
val removalMargin =
|
||||
if (settings.removalMargin <= Duration.Zero) cluster.settings.DownRemovalMargin
|
||||
else settings.removalMargin
|
||||
|
||||
val (maxHandOverRetries, maxTakeOverRetries) = {
|
||||
val n = (removalMargin.toMillis / handOverRetryInterval.toMillis).toInt
|
||||
(n + 3, math.max(1, n - 3))
|
||||
}
|
||||
|
||||
// started when when self member is Up
|
||||
var oldestChangedBuffer: ActorRef = _
|
||||
// Previous GetNext request delivered event and new GetNext is to be sent
|
||||
|
|
@ -482,16 +476,19 @@ class ClusterSingletonManager(
|
|||
stay using YoungerData(oldestOption)
|
||||
}
|
||||
|
||||
case Event(MemberRemoved(m, _), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒
|
||||
logInfo("Previous oldest removed [{}]", m.address)
|
||||
addRemoved(m.address)
|
||||
// transition when OldestChanged
|
||||
stay using YoungerData(None)
|
||||
|
||||
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒
|
||||
logInfo("Self removed, stopping ClusterSingletonManager")
|
||||
stop()
|
||||
|
||||
case Event(MemberRemoved(m, _), _) ⇒
|
||||
scheduleDelayedMemberRemoved(m)
|
||||
stay
|
||||
|
||||
case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒
|
||||
logInfo("Previous oldest removed [{}]", m.address)
|
||||
addRemoved(m.address)
|
||||
// transition when OldestChanged
|
||||
stay using YoungerData(None)
|
||||
}
|
||||
|
||||
when(BecomingOldest) {
|
||||
|
|
@ -511,10 +508,18 @@ class ClusterSingletonManager(
|
|||
stay
|
||||
}
|
||||
|
||||
case Event(MemberRemoved(m, _), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒
|
||||
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒
|
||||
logInfo("Self removed, stopping ClusterSingletonManager")
|
||||
stop()
|
||||
|
||||
case Event(MemberRemoved(m, _), _) ⇒
|
||||
scheduleDelayedMemberRemoved(m)
|
||||
stay
|
||||
|
||||
case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒
|
||||
logInfo("Previous oldest [{}] removed", previousOldest)
|
||||
addRemoved(m.address)
|
||||
stay
|
||||
gotoOldest()
|
||||
|
||||
case Event(TakeOverFromMe, BecomingOldestData(None)) ⇒
|
||||
sender() ! HandOverToMe
|
||||
|
|
@ -530,17 +535,23 @@ class ClusterSingletonManager(
|
|||
if (count <= maxHandOverRetries) {
|
||||
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption)
|
||||
previousOldestOption foreach { peer(_) ! HandOverToMe }
|
||||
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false)
|
||||
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval, repeat = false)
|
||||
stay()
|
||||
} else if (previousOldestOption forall removed.contains) {
|
||||
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
|
||||
// previous oldest might be down or removed, so no TakeOverFromMe message is received
|
||||
logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.")
|
||||
gotoOldest()
|
||||
} else
|
||||
} else if (cluster.isTerminated)
|
||||
stop()
|
||||
else
|
||||
throw new ClusterSingletonManagerIsStuck(
|
||||
s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive")
|
||||
}
|
||||
|
||||
def scheduleDelayedMemberRemoved(m: Member): Unit = {
|
||||
log.debug("Schedule DelayedMemberRemoved for [{}]", m.address)
|
||||
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedMemberRemoved(m))(context.dispatcher)
|
||||
}
|
||||
|
||||
def gotoOldest(): State = {
|
||||
|
|
@ -562,11 +573,11 @@ class ClusterSingletonManager(
|
|||
case Some(a) ⇒
|
||||
// send TakeOver request in case the new oldest doesn't know previous oldest
|
||||
peer(a) ! TakeOverFromMe
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false)
|
||||
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a))
|
||||
case None ⇒
|
||||
// new oldest will initiate the hand-over
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false)
|
||||
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = None)
|
||||
}
|
||||
|
||||
|
|
@ -582,14 +593,20 @@ class ClusterSingletonManager(
|
|||
if (count <= maxTakeOverRetries) {
|
||||
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption)
|
||||
newOldestOption foreach { peer(_) ! TakeOverFromMe }
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false)
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false)
|
||||
stay
|
||||
} else
|
||||
} else if (cluster.isTerminated)
|
||||
stop()
|
||||
else
|
||||
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured")
|
||||
|
||||
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒
|
||||
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
|
||||
|
||||
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited ⇒
|
||||
logInfo("Self removed, stopping ClusterSingletonManager")
|
||||
stop()
|
||||
|
||||
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.address == newOldest ⇒
|
||||
addRemoved(m.address)
|
||||
gotoHandingOver(singleton, singletonTerminated, None)
|
||||
|
|
@ -654,6 +671,10 @@ class ClusterSingletonManager(
|
|||
if (!selfExited) logInfo("Member removed [{}]", m.address)
|
||||
addRemoved(m.address)
|
||||
stay
|
||||
case Event(DelayedMemberRemoved(m), _) ⇒
|
||||
if (!selfExited) logInfo("Member removed [{}]", m.address)
|
||||
addRemoved(m.address)
|
||||
stay
|
||||
case Event(TakeOverFromMe, _) ⇒
|
||||
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address)
|
||||
stay
|
||||
|
|
@ -667,7 +688,7 @@ class ClusterSingletonManager(
|
|||
}
|
||||
|
||||
onTransition {
|
||||
case _ -> BecomingOldest ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false)
|
||||
case _ -> BecomingOldest ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), handOverRetryInterval, repeat = false)
|
||||
}
|
||||
|
||||
onTransition {
|
||||
|
|
|
|||
|
|
@ -41,8 +41,7 @@ object ClusterSingletonProxySpec {
|
|||
system.actorOf(ClusterSingletonManager.props(
|
||||
singletonProps = Props[Singleton],
|
||||
terminationMessage = PoisonPill,
|
||||
settings = ClusterSingletonManagerSettings(system)
|
||||
.withRetry(maxHandOverRetries = 5, maxTakeOverRetries = 2, retryInterval = 1.second)),
|
||||
settings = ClusterSingletonManagerSettings(system).withRemovalMargin(5.seconds)),
|
||||
name = "singletonManager")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,12 @@ akka {
|
|||
# Disable with "off" or specify a duration to enable auto-down.
|
||||
auto-down-unreachable-after = off
|
||||
|
||||
# Margin until shards or singletons that belonged to a downed/removed
|
||||
# partition are created in surviving partition. The purpose of this margin is that
|
||||
# in case of a network partition the persistent actors in the non-surviving partitions
|
||||
# must be stopped before corresponding persistent actors are started somewhere else.
|
||||
down-removal-margin = 20s
|
||||
|
||||
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
|
||||
# The roles are part of the membership information and can be used by
|
||||
# routers or other services to distribute work to certain member types,
|
||||
|
|
|
|||
|
|
@ -228,6 +228,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
protected def selfUniqueAddress = cluster.selfUniqueAddress
|
||||
|
||||
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
|
||||
val MaxGossipsBeforeShuttingDownMyself = 5
|
||||
|
||||
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
|
||||
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
|
||||
|
|
@ -371,7 +372,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
case other ⇒ super.unhandled(other)
|
||||
}
|
||||
|
||||
def initJoin(): Unit = sender() ! InitJoinAck(selfAddress)
|
||||
def initJoin(): Unit = {
|
||||
val selfStatus = latestGossip.member(selfUniqueAddress).status
|
||||
if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
|
||||
// prevents a Down and Exiting node from being used for joining
|
||||
sender() ! InitJoinNack(selfAddress)
|
||||
else
|
||||
sender() ! InitJoinAck(selfAddress)
|
||||
}
|
||||
|
||||
def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
if (newSeedNodes.nonEmpty) {
|
||||
|
|
@ -444,12 +452,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
* current gossip state, including the new joining member.
|
||||
*/
|
||||
def joining(node: UniqueAddress, roles: Set[String]): Unit = {
|
||||
val selfStatus = latestGossip.member(selfUniqueAddress).status
|
||||
if (node.address.protocol != selfAddress.protocol)
|
||||
log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.protocol, node.address.protocol)
|
||||
else if (node.address.system != selfAddress.system)
|
||||
log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.system, node.address.system)
|
||||
else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
|
||||
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", node, selfStatus)
|
||||
else {
|
||||
val localMembers = latestGossip.members
|
||||
|
||||
|
|
@ -544,15 +555,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val localReachability = localOverview.reachability
|
||||
|
||||
// check if the node to DOWN is in the `members` set
|
||||
localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } match {
|
||||
case Some(m) ⇒
|
||||
localMembers.find(_.address == address) match {
|
||||
case Some(m) if (m.status != Down) ⇒
|
||||
if (localReachability.isReachable(m.uniqueAddress))
|
||||
logInfo("Marking node [{}] as [{}]", m.address, Down)
|
||||
else
|
||||
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
|
||||
|
||||
// replace member (changed status)
|
||||
val newMembers = localMembers - m + m
|
||||
val newMembers = localMembers - m + m.copy(status = Down)
|
||||
// remove nodes marked as DOWN from the `seen` table
|
||||
val newSeen = localSeen - m.uniqueAddress
|
||||
|
||||
|
|
@ -562,6 +573,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
updateLatestGossip(newGossip)
|
||||
|
||||
publish(latestGossip)
|
||||
case Some(_) ⇒ // already down
|
||||
case None ⇒
|
||||
logInfo("Ignoring down of unknown node [{}] as [{}]", address)
|
||||
}
|
||||
|
|
@ -676,7 +688,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
publish(latestGossip)
|
||||
|
||||
val selfStatus = latestGossip.member(selfUniqueAddress).status
|
||||
if (selfStatus == Exiting || selfStatus == Down)
|
||||
if (selfStatus == Exiting)
|
||||
shutdown()
|
||||
else if (talkback) {
|
||||
// send back gossip to sender() when sender() had different view, i.e. merge, or sender() had
|
||||
|
|
@ -766,7 +778,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
/**
|
||||
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
|
||||
*/
|
||||
def leaderActions(): Unit =
|
||||
def leaderActions(): Unit = {
|
||||
if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) {
|
||||
// only run the leader actions if we are the LEADER
|
||||
val firstNotice = 20
|
||||
|
|
@ -785,6 +797,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
|
||||
}
|
||||
}
|
||||
shutdownSelfWhenDown()
|
||||
}
|
||||
|
||||
def shutdownSelfWhenDown(): Unit = {
|
||||
if (latestGossip.member(selfUniqueAddress).status == Down) {
|
||||
// When all reachable have seen the state this member will shutdown itself when it has
|
||||
// status Down. The down commands should spread before we shutdown.
|
||||
val unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated
|
||||
val downed = latestGossip.members.collect { case m if m.status == Down ⇒ m.uniqueAddress }
|
||||
if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) {
|
||||
// the reason for not shutting down immediately is to give the gossip a chance to spread
|
||||
// the downing information to other downed nodes, so that they can shutdown themselves
|
||||
logInfo("Shutting down myself")
|
||||
// not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed
|
||||
// if other downed know that this node has seen the version
|
||||
downed.filterNot(n ⇒ unreachable(n) || n == selfUniqueAddress).take(MaxGossipsBeforeShuttingDownMyself)
|
||||
.foreach(gossipTo)
|
||||
shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Leader actions are as follows:
|
||||
|
|
|
|||
|
|
@ -67,6 +67,11 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
}
|
||||
}
|
||||
|
||||
val DownRemovalMargin: FiniteDuration = {
|
||||
val key = "down-removal-margin"
|
||||
cc.getMillisDuration(key) requiring (_ > Duration.Zero, key + " > 0s")
|
||||
}
|
||||
|
||||
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
||||
val MinNrOfMembers: Int = {
|
||||
cc.getInt("min-nr-of-members")
|
||||
|
|
|
|||
|
|
@ -20,8 +20,7 @@ import MemberStatus._
|
|||
@SerialVersionUID(1L)
|
||||
class Member private[cluster] (
|
||||
val uniqueAddress: UniqueAddress,
|
||||
/** INTERNAL API **/
|
||||
private[cluster] val upNumber: Int,
|
||||
private[cluster] val upNumber: Int, // INTERNAL API
|
||||
val status: MemberStatus,
|
||||
val roles: Set[String]) extends Serializable {
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object NodeDowningAndBeingRemovedMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
|
||||
"akka.cluster.auto-down-unreachable-after = off").withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class NodeDowningAndBeingRemovedMultiJvmNode1 extends NodeDowningAndBeingRemovedSpec
|
||||
class NodeDowningAndBeingRemovedMultiJvmNode2 extends NodeDowningAndBeingRemovedSpec
|
||||
class NodeDowningAndBeingRemovedMultiJvmNode3 extends NodeDowningAndBeingRemovedSpec
|
||||
|
||||
abstract class NodeDowningAndBeingRemovedSpec
|
||||
extends MultiNodeSpec(NodeDowningAndBeingRemovedMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeDowningAndBeingRemovedMultiJvmSpec._
|
||||
|
||||
"A node that is downed" must {
|
||||
|
||||
"eventually be removed from membership" taggedAs LongRunningTest in {
|
||||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
within(30.seconds) {
|
||||
runOn(first) {
|
||||
cluster.down(second)
|
||||
cluster.down(third)
|
||||
}
|
||||
enterBarrier("second-and-third-down")
|
||||
|
||||
runOn(second, third) {
|
||||
// verify that the node is shut down
|
||||
awaitCond(cluster.isTerminated)
|
||||
}
|
||||
enterBarrier("second-and-third-shutdown")
|
||||
|
||||
runOn(first) {
|
||||
// verify that the nodes are no longer part of the 'members' set
|
||||
awaitAssert {
|
||||
clusterView.members.map(_.address) should not contain (address(second))
|
||||
clusterView.members.map(_.address) should not contain (address(third))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("finished")
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue