=clt #18314 Reduce message lost when region terminates

The new akka.cluster.down-removal-margin comes into play.
During that period messages are still routed to the old location, even though we have got the Terminated message.

We can reduce (best effort) the message loss by not replying to GetShardHome during the period.
This commit is contained in:
Patrik Nordwall 2015-08-25 12:07:12 +02:00
parent 27b54627d8
commit ba135e8e34

View file

@ -382,6 +382,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
var gracefulShutdownInProgress = Set.empty[ActorRef]
var aliveRegions = Set.empty[ActorRef]
var members = Set.empty[Address]
var regionTerminationInProgress = Set.empty[ActorRef]
import context.dispatcher
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
@ -437,9 +438,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
case t @ Terminated(ref)
if (state.regions.contains(ref)) {
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref)) {
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
else
regionTerminationInProgress += ref
} else
regionTerminated(ref)
} else if (state.regionProxies.contains(ref)) {
log.debug("ShardRegion proxy terminated: [{}]", ref)
@ -454,7 +456,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
case GetShardHome(shard)
if (!rebalanceInProgress.contains(shard)) {
state.shards.get(shard) match {
case Some(ref) sender() ! ShardHome(shard, ref)
case Some(ref)
if (regionTerminationInProgress(ref))
log.debug("GetShardHome [{}] request ignored, due to region [{}] termination in progress.", shard, ref)
else
sender() ! ShardHome(shard, ref)
case None
val activeRegions = state.regions -- gracefulShutdownInProgress
if (activeRegions.nonEmpty) {
@ -564,10 +570,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
log.debug("ShardRegion terminated: [{}]", ref)
state.regions(ref).foreach { s self ! GetShardHome(s) }
gracefulShutdownInProgress -= ref
update(ShardRegionTerminated(ref)) { evt
state = state.updated(evt)
gracefulShutdownInProgress -= ref
regionTerminationInProgress -= ref
allocateShardHomes()
}
}