From ba135e8e349a6729e498364c5b7d91cd58ff7c82 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Aug 2015 12:07:12 +0200 Subject: [PATCH] =clt #18314 Reduce message lost when region terminates The new akka.cluster.down-removal-margin comes into play. During that period messages are still routed to the old location, even though we have got the Terminated message. We can reduce (best effort) the message loss by not replying to GetShardHome during the period. --- .../akka/cluster/sharding/ShardCoordinator.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index d1ae87073b..fcde41e6d4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -382,6 +382,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti var gracefulShutdownInProgress = Set.empty[ActorRef] var aliveRegions = Set.empty[ActorRef] var members = Set.empty[Address] + var regionTerminationInProgress = Set.empty[ActorRef] import context.dispatcher val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) @@ -437,9 +438,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti case t @ Terminated(ref) ⇒ if (state.regions.contains(ref)) { - if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref)) + if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref)) { context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref)) - else + regionTerminationInProgress += ref + } else regionTerminated(ref) } else if (state.regionProxies.contains(ref)) { log.debug("ShardRegion proxy terminated: [{}]", ref) @@ -454,7 +456,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti case GetShardHome(shard) ⇒ if (!rebalanceInProgress.contains(shard)) { state.shards.get(shard) match { - case Some(ref) ⇒ sender() ! ShardHome(shard, ref) + case Some(ref) ⇒ + if (regionTerminationInProgress(ref)) + log.debug("GetShardHome [{}] request ignored, due to region [{}] termination in progress.", shard, ref) + else + sender() ! ShardHome(shard, ref) case None ⇒ val activeRegions = state.regions -- gracefulShutdownInProgress if (activeRegions.nonEmpty) { @@ -564,10 +570,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti log.debug("ShardRegion terminated: [{}]", ref) state.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } - gracefulShutdownInProgress -= ref - update(ShardRegionTerminated(ref)) { evt ⇒ state = state.updated(evt) + gracefulShutdownInProgress -= ref + regionTerminationInProgress -= ref allocateShardHomes() } }