=clu Improve cluster downing

* avoid using Down and Exiting member from being used for joining
* delay shut down of Down member until the information is spread
  to all reachable members, e.g. downing several nodes via one node
* akka.cluster.down-removal-margin setting
  Margin until shards or singletons that belonged to a
  downed/removed partition are created in surviving partition.
  Used by singleton and sharding.
* remove the retry count parameters/settings for singleton in
  favor of deriving those from the removal-margin
This commit is contained in:
Patrik Nordwall 2015-05-30 16:12:22 +02:00
parent bf28260cd0
commit 2a88f4fb29
12 changed files with 227 additions and 101 deletions

View file

@ -1569,6 +1569,8 @@ object ShardCoordinator {
*/
private final case class ResendShardHost(shard: ShardId, region: ActorRef)
private final case class DelayedShardRegionTerminated(region: ActorRef)
/**
* INTERNAL API. Rebalancing process is performed by this actor.
* It sends `BeginHandOff` to all `ShardRegion` actors followed by
@ -1632,11 +1634,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
override def snapshotPluginId: String = settings.snapshotPluginId
val removalMargin = Cluster(context.system).settings.DownRemovalMargin
var persistentState = State.empty
var rebalanceInProgress = Set.empty[ShardId]
var unAckedHostShards = Map.empty[ShardId, Cancellable]
// regions that have requested handoff, for graceful shutdown
var gracefulShutdownInProgress = Set.empty[ActorRef]
var aliveRegions = Set.empty[ActorRef]
var persistCount = 0
import context.dispatcher
@ -1694,6 +1699,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
override def receiveCommand: Receive = {
case Register(region)
log.debug("ShardRegion registered: [{}]", region)
aliveRegions += region
if (persistentState.regions.contains(region))
sender() ! RegisterAck(self)
else {
@ -1724,20 +1730,12 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
}
}
case Terminated(ref)
case t @ Terminated(ref)
if (persistentState.regions.contains(ref)) {
log.debug("ShardRegion terminated: [{}]", ref)
require(persistentState.regions.contains(ref), s"Terminated region $ref not registered")
persistentState.regions(ref).foreach { s self ! GetShardHome(s) }
gracefulShutdownInProgress -= ref
saveSnapshotWhenNeeded()
persist(ShardRegionTerminated(ref)) { evt
persistentState = persistentState.updated(evt)
allocateShardHomes()
}
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
else
regionTerminated(ref)
} else if (persistentState.regionProxies.contains(ref)) {
log.debug("ShardRegion proxy terminated: [{}]", ref)
saveSnapshotWhenNeeded()
@ -1746,6 +1744,9 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
}
}
case DelayedShardRegionTerminated(ref)
regionTerminated(ref)
case GetShardHome(shard)
if (!rebalanceInProgress.contains(shard)) {
persistentState.shards.get(shard) match {
@ -1859,6 +1860,20 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case _: CurrentClusterState
}
def regionTerminated(ref: ActorRef): Unit =
if (persistentState.regions.contains(ref)) {
log.debug("ShardRegion terminated: [{}]", ref)
persistentState.regions(ref).foreach { s self ! GetShardHome(s) }
gracefulShutdownInProgress -= ref
saveSnapshotWhenNeeded()
persist(ShardRegionTerminated(ref)) { evt
persistentState = persistentState.updated(evt)
allocateShardHomes()
}
}
def shuttingDown: Receive = {
case _ // ignore all
}

View file

@ -32,6 +32,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {

View file

@ -37,6 +37,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s

View file

@ -42,6 +42,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.store {