diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes index 11fadf550f..6961a5c6a8 100644 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes @@ -4,6 +4,11 @@ ProblemFilters.exclude[Problem]("akka.cluster.sharding.Shard.*") # #25191 ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask") +# Internal API change https://github.com/akka/akka/pull/27261 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps") + + # #27100 Productionize: GetShardRegionStats returns empty shard set on ask timeout # askAllShards, an internal function, was renamed and changed to query all or a subset of shards to try failures only ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.askAllShards") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index c3f71a6566..d4e2bd3aae 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -424,9 +424,13 @@ object ShardCoordinator { shard: String, from: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef]) - extends Actor { + regions: Set[ActorRef], + shuttingDownRegions: Set[ActorRef]) + extends Actor + with ActorLogging { import Internal._ + + shuttingDownRegions.foreach(context.watch) regions.foreach(_ ! BeginHandOff(shard)) var remaining = regions @@ -435,14 +439,24 @@ object ShardCoordinator { def receive = { case BeginHandOffAck(`shard`) => - remaining -= sender() - if (remaining.isEmpty) { - from ! HandOff(shard) - context.become(stoppingShard, discardOld = true) - } + log.debug("BeginHandOffAck for shard [{}] received from {}.", shard, sender()) + acked(sender()) + case Terminated(shardRegion) => + log.debug("ShardRegion {} terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) + acked(shardRegion) case ReceiveTimeout => done(ok = false) } + private def acked(shardRegion: ActorRef) = { + context.unwatch(shardRegion) + remaining -= shardRegion + if (remaining.isEmpty) { + log.debug("All shard regions acked, handing off shard [{}].", shard) + from ! HandOff(shard) + context.become(stoppingShard, discardOld = true) + } + } + def stoppingShard: Receive = { case ShardStopped(`shard`) => done(ok = true) case ReceiveTimeout => done(ok = false) @@ -458,9 +472,12 @@ object ShardCoordinator { shard: String, from: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef]): Props = - Props(new RebalanceWorker(shard, from, handOffTimeout, regions)) - + regions: Set[ActorRef], + // Note: must be a subset of regions + shuttingDownRegions: Set[ActorRef]): Props = { + require(shuttingDownRegions.size <= regions.size, "'shuttingDownRegions' must be a subset of 'regions'.") + Props(new RebalanceWorker(shard, from, handOffTimeout, regions, shuttingDownRegions)) + } } /** @@ -886,7 +903,8 @@ abstract class ShardCoordinator( shard, rebalanceFromRegion, handOffTimeout, - state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher)) + state.regions.keySet.union(state.regionProxies), + gracefulShutdownInProgress).withDispatcher(context.props.dispatcher)) case None => log.debug("Rebalance of non-existing shard [{}] is ignored", shard) }