From 212de410b04f1b32a2f0c1bf5112ff4217705651 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 17 Jul 2020 12:44:12 +0100 Subject: [PATCH] Avoid stuck rebalances during coordinator leaving (#29385) * Watch all regions as they may shutdown after rebalance starts * Send graceful shutdown to selection if no coordinator found * mima * Add missing new line * Make log markers consistent for rebalance worker --- .../29385-sharding-leaving-fix.excludes | 2 ++ .../cluster/sharding/ShardCoordinator.scala | 30 ++++++++++--------- .../akka/cluster/sharding/ShardRegion.scala | 4 ++- 3 files changed, 21 insertions(+), 15 deletions(-) create mode 100644 akka-cluster-sharding/src/main/mima-filters/2.6.8.backwards.excludes/29385-sharding-leaving-fix.excludes diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.8.backwards.excludes/29385-sharding-leaving-fix.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.8.backwards.excludes/29385-sharding-leaving-fix.excludes new file mode 100644 index 0000000000..32fad4ba8c --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.8.backwards.excludes/29385-sharding-leaving-fix.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 2e45332175..4b4adb3e19 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -460,27 +460,32 @@ object ShardCoordinator { shard: String, from: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef], - shuttingDownRegions: Set[ActorRef]) + regions: Set[ActorRef]) extends Actor with ActorLogging with Timers { import Internal._ - shuttingDownRegions.foreach(context.watch) - regions.foreach(_ ! BeginHandOff(shard)) + regions.foreach { region => + context.watch(region) + region ! BeginHandOff(shard) + } var remaining = regions + log.debug("Rebalance [{}] from region [{}]", shard, regions) + timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout) def receive = { case BeginHandOffAck(`shard`) => - log.debug("BeginHandOffAck for shard [{}] received from {}.", shard, sender()) + log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender()) acked(sender()) case Terminated(shardRegion) => - log.debug("ShardRegion {} terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) + log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) acked(shardRegion) - case ReceiveTimeout => done(ok = false) + case ReceiveTimeout => + log.debug("Rebalance of [{}] from [{}] timed out", shard, from) + done(ok = false) } private def acked(shardRegion: ActorRef) = { @@ -508,11 +513,8 @@ object ShardCoordinator { shard: String, from: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef], - // Note: must be a subset of regions - shuttingDownRegions: Set[ActorRef]): Props = { - require(shuttingDownRegions.size <= regions.size, "'shuttingDownRegions' must be a subset of 'regions'.") - Props(new RebalanceWorker(shard, from, handOffTimeout, regions, shuttingDownRegions)) + regions: Set[ActorRef]): Props = { + Props(new RebalanceWorker(shard, from, handOffTimeout, regions)) } } @@ -715,6 +717,7 @@ abstract class ShardCoordinator( gracefulShutdownInProgress += region continueRebalance(shards.toSet) case None => + log.debug("Unknown region requested graceful shutdown [{}]", region) } case ShardRegion.GetClusterShardingStats(waitMax) => @@ -976,8 +979,7 @@ abstract class ShardCoordinator( shard, rebalanceFromRegion, handOffTimeout, - state.regions.keySet.union(state.regionProxies), - gracefulShutdownInProgress).withDispatcher(context.props.dispatcher)) + state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher)) case None => log.debug("Rebalance of non-existing shard [{}] is ignored", shard) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 66507dad6a..01c86bfaba 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -1140,7 +1140,9 @@ private[akka] class ShardRegion( } def sendGracefulShutdownToCoordinator(): Unit = { - if (gracefulShutdownInProgress) + if (gracefulShutdownInProgress) { + log.debug("Sending graceful shutdown to {}", coordinatorSelection) coordinatorSelection.foreach(_ ! GracefulShutdownReq(self)) + } } }