diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 21910fa43e..969badd471 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -1569,6 +1569,8 @@ object ShardCoordinator { */ private final case class ResendShardHost(shard: ShardId, region: ActorRef) + private final case class DelayedShardRegionTerminated(region: ActorRef) + /** * INTERNAL API. Rebalancing process is performed by this actor. * It sends `BeginHandOff` to all `ShardRegion` actors followed by @@ -1632,11 +1634,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, override def snapshotPluginId: String = settings.snapshotPluginId + val removalMargin = Cluster(context.system).settings.DownRemovalMargin + var persistentState = State.empty var rebalanceInProgress = Set.empty[ShardId] var unAckedHostShards = Map.empty[ShardId, Cancellable] // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] + var aliveRegions = Set.empty[ActorRef] var persistCount = 0 import context.dispatcher @@ -1694,6 +1699,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, override def receiveCommand: Receive = { case Register(region) ⇒ log.debug("ShardRegion registered: [{}]", region) + aliveRegions += region if (persistentState.regions.contains(region)) sender() ! RegisterAck(self) else { @@ -1724,20 +1730,12 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, } } - case Terminated(ref) ⇒ + case t @ Terminated(ref) ⇒ if (persistentState.regions.contains(ref)) { - log.debug("ShardRegion terminated: [{}]", ref) - - require(persistentState.regions.contains(ref), s"Terminated region $ref not registered") - persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } - - gracefulShutdownInProgress -= ref - - saveSnapshotWhenNeeded() - persist(ShardRegionTerminated(ref)) { evt ⇒ - persistentState = persistentState.updated(evt) - allocateShardHomes() - } + if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref)) + context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref)) + else + regionTerminated(ref) } else if (persistentState.regionProxies.contains(ref)) { log.debug("ShardRegion proxy terminated: [{}]", ref) saveSnapshotWhenNeeded() @@ -1746,6 +1744,9 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, } } + case DelayedShardRegionTerminated(ref) ⇒ + regionTerminated(ref) + case GetShardHome(shard) ⇒ if (!rebalanceInProgress.contains(shard)) { persistentState.shards.get(shard) match { @@ -1859,6 +1860,20 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case _: CurrentClusterState ⇒ } + def regionTerminated(ref: ActorRef): Unit = + if (persistentState.regions.contains(ref)) { + log.debug("ShardRegion terminated: [{}]", ref) + persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } + + gracefulShutdownInProgress -= ref + + saveSnapshotWhenNeeded() + persist(ShardRegionTerminated(ref)) { evt ⇒ + persistentState = persistentState.updated(evt) + allocateShardHomes() + } + } + def shuttingDown: Receive = { case _ ⇒ // ignore all } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 458e673e7e..e07f6b62bb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -32,6 +32,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s akka.cluster.roles = ["backend"] akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index e9bdc2195a..def0d585bf 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -37,6 +37,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared { timeout = 5s diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index b5b0dc9f3e..644b6adf83 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -42,6 +42,7 @@ object ClusterShardingSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s akka.cluster.roles = ["backend"] akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared.store { diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 30e2c6a8b6..6b8a34cf14 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -110,29 +110,11 @@ akka.cluster.singleton { # If the role is not specified it's a singleton among all nodes in the cluster. role = "" - # When a node is becoming oldest it sends hand-over request to previous oldest. - # This is retried with the 'retry-interval' until the previous oldest confirms - # that the hand over has started, or this -max-hand-over-retries' limit has been - # reached. If the retry limit is reached it takes the decision to be the new oldest - # if previous oldest is unknown (typically removed), otherwise it initiates a new - # round by throwing 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting - # restart with fresh state. For a cluster with many members you might need to increase - # this retry limit because it takes longer time to propagate changes across all nodes. - max-hand-over-retries = 10 - - # When a oldest node leaves the cluster it is not oldest any more and then it sends - # take over request to the new oldest to initiate the hand-over process. This is - # retried with the 'retry-interval' until this retry limit has been reached. If the - # retry limit is reached it initiates a new round by throwing - # 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting restart with - # fresh state. This will also cause the singleton actor to be stopped. - # 'max-take-over-retries` must be less than 'max-hand-over-retries' to ensure that - # new oldest doesn't start singleton actor before previous is stopped for certain - # corner cases. - max-take-over-retries = 5 - - # Interval for hand over and take over messages - retry-interval = 1s + # When a node is becoming oldest it sends hand-over request to previous oldest, + # that might be leaving the cluster. This is retried with this interval until + # the previous oldest confirms that the hand over has started or the previous + # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin). + hand-over-retry-interval = 1s } akka.cluster.singleton-proxy { diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 1ac63338b7..9d0aa6edae 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -33,6 +33,7 @@ object ClusterSingletonManagerSettings { */ def apply(system: ActorSystem): ClusterSingletonManagerSettings = apply(system.settings.config.getConfig("akka.cluster.singleton")) + .withRemovalMargin(Cluster(system).settings.DownRemovalMargin) /** * Create settings from a configuration with the same layout as @@ -42,9 +43,8 @@ object ClusterSingletonManagerSettings { new ClusterSingletonManagerSettings( singletonName = config.getString("singleton-name"), role = roleOption(config.getString("role")), - maxHandOverRetries = config.getInt("max-hand-over-retries"), - maxTakeOverRetries = config.getInt("max-take-over-retries"), - retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis) + removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin + handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis) /** * Java API: Create settings from the default configuration @@ -73,40 +73,24 @@ object ClusterSingletonManagerSettings { * If the role is not specified it's a singleton among all nodes in * the cluster. * - * @param maxHandOverRetries When a node is becoming oldest it sends - * hand-over request to previous oldest. This is retried with the - * `retryInterval` until the previous oldest confirms that the hand - * over has started, or this `maxHandOverRetries` limit has been - * reached. If the retry limit is reached it takes the decision to be - * the new oldest if previous oldest is unknown (typically removed), - * otherwise it initiates a new round by throwing - * [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting - * restart with fresh state. For a cluster with many members you might - * need to increase this retry limit because it takes longer time to - * propagate changes across all nodes. + * @param removalMargin Margin until the singleton instance that belonged to + * a downed/removed partition is created in surviving partition. The purpose of + * this margin is that in case of a network partition the singleton actors + * in the non-surviving partitions must be stopped before corresponding actors + * are started somewhere else. This is especially important for persistent + * actors. * - * @param maxTakeOverRetries When a oldest node leaves the cluster it is - * not oldest any more and then it sends take over request to the new oldest to - * initiate the hand-over process. This is retried with the `retryInterval` until - * this retry limit has been reached. If the retry limit is reached it initiates - * a new round by throwing [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] - * and expecting restart with fresh state. This will also cause the singleton actor - * to be stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to - * ensure that new oldest doesn't start singleton actor before previous is - * stopped for certain corner cases. - * - * @param retryInterval Interval for hand over and take over messages + * @param handOverRetryInterval When a node is becoming oldest it sends hand-over + * request to previous oldest, that might be leaving the cluster. This is + * retried with this interval until the previous oldest confirms that the hand + * over has started or the previous oldest member is removed from the cluster + * (+ `removalMargin`). */ final class ClusterSingletonManagerSettings( val singletonName: String, val role: Option[String], - val maxHandOverRetries: Int, - val maxTakeOverRetries: Int, - val retryInterval: FiniteDuration) extends NoSerializationVerificationNeeded { - - // to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases - require(maxTakeOverRetries < maxHandOverRetries, - s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") + val removalMargin: FiniteDuration, + val handOverRetryInterval: FiniteDuration) extends NoSerializationVerificationNeeded { def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name) @@ -114,17 +98,17 @@ final class ClusterSingletonManagerSettings( def withRole(role: Option[String]) = copy(role = role) - def withRetry(maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration): ClusterSingletonManagerSettings = - copy(maxHandOverRetries = maxHandOverRetries, - maxTakeOverRetries = maxTakeOverRetries, - retryInterval = retryInterval) + def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings = + copy(removalMargin = removalMargin) + + def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = + copy(handOverRetryInterval = retryInterval) private def copy(singletonName: String = singletonName, role: Option[String] = role, - maxHandOverRetries: Int = maxHandOverRetries, - maxTakeOverRetries: Int = maxTakeOverRetries, - retryInterval: FiniteDuration = retryInterval): ClusterSingletonManagerSettings = - new ClusterSingletonManagerSettings(singletonName, role, maxHandOverRetries, maxTakeOverRetries, retryInterval) + removalMargin: FiniteDuration = removalMargin, + handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) } object ClusterSingletonManager { @@ -202,6 +186,7 @@ object ClusterSingletonManager { newOldestOption: Option[Address]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data case object EndData extends Data + final case class DelayedMemberRemoved(member: Member) val HandOverRetryTimer = "hand-over-retry" val TakeOverRetryTimer = "take-over-retry" @@ -390,6 +375,15 @@ class ClusterSingletonManager( require(role.forall(cluster.selfRoles.contains), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") + val removalMargin = + if (settings.removalMargin <= Duration.Zero) cluster.settings.DownRemovalMargin + else settings.removalMargin + + val (maxHandOverRetries, maxTakeOverRetries) = { + val n = (removalMargin.toMillis / handOverRetryInterval.toMillis).toInt + (n + 3, math.max(1, n - 3)) + } + // started when when self member is Up var oldestChangedBuffer: ActorRef = _ // Previous GetNext request delivered event and new GetNext is to be sent @@ -482,16 +476,19 @@ class ClusterSingletonManager( stay using YoungerData(oldestOption) } - case Event(MemberRemoved(m, _), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ - logInfo("Previous oldest removed [{}]", m.address) - addRemoved(m.address) - // transition when OldestChanged - stay using YoungerData(None) - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() + case Event(MemberRemoved(m, _), _) ⇒ + scheduleDelayedMemberRemoved(m) + stay + + case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ + logInfo("Previous oldest removed [{}]", m.address) + addRemoved(m.address) + // transition when OldestChanged + stay using YoungerData(None) } when(BecomingOldest) { @@ -511,10 +508,18 @@ class ClusterSingletonManager( stay } - case Event(MemberRemoved(m, _), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ + case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + + case Event(MemberRemoved(m, _), _) ⇒ + scheduleDelayedMemberRemoved(m) + stay + + case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ logInfo("Previous oldest [{}] removed", previousOldest) addRemoved(m.address) - stay + gotoOldest() case Event(TakeOverFromMe, BecomingOldestData(None)) ⇒ sender() ! HandOverToMe @@ -530,17 +535,23 @@ class ClusterSingletonManager( if (count <= maxHandOverRetries) { logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption) previousOldestOption foreach { peer(_) ! HandOverToMe } - setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false) + setTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval, repeat = false) stay() } else if (previousOldestOption forall removed.contains) { // can't send HandOverToMe, previousOldest unknown for new node (or restart) // previous oldest might be down or removed, so no TakeOverFromMe message is received logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.") gotoOldest() - } else + } else if (cluster.isTerminated) + stop() + else throw new ClusterSingletonManagerIsStuck( s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive") + } + def scheduleDelayedMemberRemoved(m: Member): Unit = { + log.debug("Schedule DelayedMemberRemoved for [{}]", m.address) + context.system.scheduler.scheduleOnce(removalMargin, self, DelayedMemberRemoved(m))(context.dispatcher) } def gotoOldest(): State = { @@ -562,11 +573,11 @@ class ClusterSingletonManager( case Some(a) ⇒ // send TakeOver request in case the new oldest doesn't know previous oldest peer(a) ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a)) case None ⇒ // new oldest will initiate the hand-over - setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = None) } @@ -582,14 +593,20 @@ class ClusterSingletonManager( if (count <= maxTakeOverRetries) { logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption) newOldestOption foreach { peer(_) ! TakeOverFromMe } - setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false) + setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false) stay - } else + } else if (cluster.isTerminated) + stop() + else throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured") case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, Some(sender())) + case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.address == newOldest ⇒ addRemoved(m.address) gotoHandingOver(singleton, singletonTerminated, None) @@ -654,6 +671,10 @@ class ClusterSingletonManager( if (!selfExited) logInfo("Member removed [{}]", m.address) addRemoved(m.address) stay + case Event(DelayedMemberRemoved(m), _) ⇒ + if (!selfExited) logInfo("Member removed [{}]", m.address) + addRemoved(m.address) + stay case Event(TakeOverFromMe, _) ⇒ logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address) stay @@ -667,7 +688,7 @@ class ClusterSingletonManager( } onTransition { - case _ -> BecomingOldest ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false) + case _ -> BecomingOldest ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), handOverRetryInterval, repeat = false) } onTransition { diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala index 227cd5d46d..312ae7816f 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala @@ -41,8 +41,7 @@ object ClusterSingletonProxySpec { system.actorOf(ClusterSingletonManager.props( singletonProps = Props[Singleton], terminationMessage = PoisonPill, - settings = ClusterSingletonManagerSettings(system) - .withRetry(maxHandOverRetries = 5, maxTakeOverRetries = 2, retryInterval = 1.second)), + settings = ClusterSingletonManagerSettings(system).withRemovalMargin(5.seconds)), name = "singletonManager") } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 1bf7f8ec23..0316215cd3 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -29,6 +29,12 @@ akka { # Disable with "off" or specify a duration to enable auto-down. auto-down-unreachable-after = off + # Margin until shards or singletons that belonged to a downed/removed + # partition are created in surviving partition. The purpose of this margin is that + # in case of a network partition the persistent actors in the non-surviving partitions + # must be stopped before corresponding persistent actors are started somewhere else. + down-removal-margin = 20s + # The roles of this member. List of strings, e.g. roles = ["A", "B"]. # The roles are part of the membership information and can be used by # routers or other services to distribute work to certain member types, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 0682a1cd3d..ba2b81e044 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -228,6 +228,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with protected def selfUniqueAddress = cluster.selfUniqueAddress val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 + val MaxGossipsBeforeShuttingDownMyself = 5 def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) @@ -371,7 +372,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case other ⇒ super.unhandled(other) } - def initJoin(): Unit = sender() ! InitJoinAck(selfAddress) + def initJoin(): Unit = { + val selfStatus = latestGossip.member(selfUniqueAddress).status + if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) + // prevents a Down and Exiting node from being used for joining + sender() ! InitJoinNack(selfAddress) + else + sender() ! InitJoinAck(selfAddress) + } def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = { if (newSeedNodes.nonEmpty) { @@ -444,12 +452,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * current gossip state, including the new joining member. */ def joining(node: UniqueAddress, roles: Set[String]): Unit = { + val selfStatus = latestGossip.member(selfUniqueAddress).status if (node.address.protocol != selfAddress.protocol) log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, node.address.protocol) else if (node.address.system != selfAddress.system) log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.system, node.address.system) + else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) + logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", node, selfStatus) else { val localMembers = latestGossip.members @@ -544,15 +555,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localReachability = localOverview.reachability // check if the node to DOWN is in the `members` set - localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } match { - case Some(m) ⇒ + localMembers.find(_.address == address) match { + case Some(m) if (m.status != Down) ⇒ if (localReachability.isReachable(m.uniqueAddress)) logInfo("Marking node [{}] as [{}]", m.address, Down) else logInfo("Marking unreachable node [{}] as [{}]", m.address, Down) // replace member (changed status) - val newMembers = localMembers - m + m + val newMembers = localMembers - m + m.copy(status = Down) // remove nodes marked as DOWN from the `seen` table val newSeen = localSeen - m.uniqueAddress @@ -562,6 +573,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with updateLatestGossip(newGossip) publish(latestGossip) + case Some(_) ⇒ // already down case None ⇒ logInfo("Ignoring down of unknown node [{}] as [{}]", address) } @@ -676,7 +688,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with publish(latestGossip) val selfStatus = latestGossip.member(selfUniqueAddress).status - if (selfStatus == Exiting || selfStatus == Down) + if (selfStatus == Exiting) shutdown() else if (talkback) { // send back gossip to sender() when sender() had different view, i.e. merge, or sender() had @@ -766,7 +778,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with /** * Runs periodic leader actions, such as member status transitions, assigning partitions etc. */ - def leaderActions(): Unit = + def leaderActions(): Unit = { if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) { // only run the leader actions if we are the LEADER val firstNotice = 20 @@ -785,6 +797,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", ")) } } + shutdownSelfWhenDown() + } + + def shutdownSelfWhenDown(): Unit = { + if (latestGossip.member(selfUniqueAddress).status == Down) { + // When all reachable have seen the state this member will shutdown itself when it has + // status Down. The down commands should spread before we shutdown. + val unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated + val downed = latestGossip.members.collect { case m if m.status == Down ⇒ m.uniqueAddress } + if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) { + // the reason for not shutting down immediately is to give the gossip a chance to spread + // the downing information to other downed nodes, so that they can shutdown themselves + logInfo("Shutting down myself") + // not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed + // if other downed know that this node has seen the version + downed.filterNot(n ⇒ unreachable(n) || n == selfUniqueAddress).take(MaxGossipsBeforeShuttingDownMyself) + .foreach(gossipTo) + shutdown() + } + } + } /** * Leader actions are as follows: diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 6e354b3388..c37eaa87b8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -67,6 +67,11 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + val DownRemovalMargin: FiniteDuration = { + val key = "down-removal-margin" + cc.getMillisDuration(key) requiring (_ > Duration.Zero, key + " > 0s") + } + val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet val MinNrOfMembers: Int = { cc.getInt("min-nr-of-members") diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 82e4558c7f..1b33eeaa54 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -20,8 +20,7 @@ import MemberStatus._ @SerialVersionUID(1L) class Member private[cluster] ( val uniqueAddress: UniqueAddress, - /** INTERNAL API **/ - private[cluster] val upNumber: Int, + private[cluster] val upNumber: Int, // INTERNAL API val status: MemberStatus, val roles: Set[String]) extends Serializable { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeDowningAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeDowningAndBeingRemovedSpec.scala new file mode 100644 index 0000000000..6b8240b53a --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeDowningAndBeingRemovedSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import scala.concurrent.duration._ + +object NodeDowningAndBeingRemovedMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + "akka.cluster.auto-down-unreachable-after = off").withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeDowningAndBeingRemovedMultiJvmNode1 extends NodeDowningAndBeingRemovedSpec +class NodeDowningAndBeingRemovedMultiJvmNode2 extends NodeDowningAndBeingRemovedSpec +class NodeDowningAndBeingRemovedMultiJvmNode3 extends NodeDowningAndBeingRemovedSpec + +abstract class NodeDowningAndBeingRemovedSpec + extends MultiNodeSpec(NodeDowningAndBeingRemovedMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeDowningAndBeingRemovedMultiJvmSpec._ + + "A node that is downed" must { + + "eventually be removed from membership" taggedAs LongRunningTest in { + + awaitClusterUp(first, second, third) + + within(30.seconds) { + runOn(first) { + cluster.down(second) + cluster.down(third) + } + enterBarrier("second-and-third-down") + + runOn(second, third) { + // verify that the node is shut down + awaitCond(cluster.isTerminated) + } + enterBarrier("second-and-third-shutdown") + + runOn(first) { + // verify that the nodes are no longer part of the 'members' set + awaitAssert { + clusterView.members.map(_.address) should not contain (address(second)) + clusterView.members.map(_.address) should not contain (address(third)) + } + } + } + + enterBarrier("finished") + } + } +}