diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.10.backwards.excludes/issue-29742-unstash-GetShardHome.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.10.backwards.excludes/issue-29742-unstash-GetShardHome.excludes new file mode 100644 index 0000000000..ed65f1dacf --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.10.backwards.excludes/issue-29742-unstash-GetShardHome.excludes @@ -0,0 +1,2 @@ +# #29742 Unstash GetShardHome requests one-by-one +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.unstashOneGetShardHomeRequest") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 6f28ac7313..7539a686df 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -951,6 +951,7 @@ abstract class ShardCoordinator( def handleGetShardHome(shard: ShardId): Boolean = { if (rebalanceInProgress.contains(shard)) { deferGetShardHomeRequest(shard, sender()) + unstashOneGetShardHomeRequest() // continue unstashing true } else if (!hasAllRegionsRegistered()) { log.debug( @@ -970,6 +971,8 @@ abstract class ShardCoordinator( shardRegionRef) else sender() ! ShardHome(shard, shardRegionRef) + + unstashOneGetShardHomeRequest() // continue unstashing true case None => false // location not known, yet, caller will handle allocation @@ -1124,6 +1127,8 @@ abstract class ShardCoordinator( } } + protected def unstashOneGetShardHomeRequest(): Unit + private def regionAddress(region: ActorRef): Address = { if (region.path.address.host.isEmpty) cluster.selfAddress else region.path.address @@ -1310,6 +1315,8 @@ class PersistentShardCoordinator( saveSnapshot(state) } } + + override protected def unstashOneGetShardHomeRequest(): Unit = () } /** @@ -1463,7 +1470,7 @@ private[akka] class DDataShardCoordinator( // which was scheduled by previous watchStateActors def waitingForStateInitialized: Receive = { case StateInitialized => - unstashGetShardHomeRequests() + unstashOneGetShardHomeRequest() unstashAll() stateInitialized() activate() @@ -1625,7 +1632,7 @@ private[akka] class DDataShardCoordinator( afterUpdateCallback(evt) if (verboseDebug) log.debug("{}: New coordinator state after [{}]: [{}]", typeName, evt, state) - unstashGetShardHomeRequests() + unstashOneGetShardHomeRequest() unstashAll() } @@ -1639,16 +1646,21 @@ private[akka] class DDataShardCoordinator( getShardHomeRequests += (sender -> request) } - private def unstashGetShardHomeRequests(): Unit = { - getShardHomeRequests.foreach { - case (originalSender, request) => self.tell(request, sender = originalSender) + override protected def unstashOneGetShardHomeRequest(): Unit = { + if (getShardHomeRequests.nonEmpty) { + // unstash one, will continue unstash of next after receive GetShardHome or update completed + val requestTuple = getShardHomeRequests.head + val (originalSender, request) = requestTuple + self.tell(request, sender = originalSender) + getShardHomeRequests -= requestTuple } - getShardHomeRequests = Set.empty } - def activate() = { + def activate(): Unit = { context.become(active.orElse(receiveLateRememberedEntities)) - log.info("{}: ShardCoordinator was moved to the active state {}", typeName, state) + log.info("{}: ShardCoordinator was moved to the active state with [{}] shards", typeName, state.shards.size) + if (verboseDebug) + log.debug("{}: Full ShardCoordinator initial state {}", typeName, state) } // only used once the coordinator is initialized