From d30464c452030e06bff19418f9f0ca90b5aac8ac Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Dec 2017 17:23:15 +0100 Subject: [PATCH] Reply to GetShardHome requests after rebalance, #24191 * Some GetShardHome requests were ignored (by design) during rebalance and they would be retried later. * This optimization keeps tracks of such requests and reply to them immediately after rebalance has been completed and thereby the buffered messages in the region don't have to wait for next retry tick. * use regionTerminationInProgress also during the update since all GetShardHome requests are not stashed --- .../mima-filters/2.5.8.backwards.excludes | 3 ++ .../cluster/sharding/ShardCoordinator.scala | 52 ++++++++++++++----- ...terShardingCustomShardAllocationSpec.scala | 6 ++- 3 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes new file mode 100644 index 0000000000..28d6c525d3 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes @@ -0,0 +1,3 @@ +# #24191 +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceInProgress") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceInProgress_=") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 866b1d1904..a423c15ec5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -420,7 +420,8 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti var allRegionsRegistered = false var state = State.empty.withRememberEntities(settings.rememberEntities) - var rebalanceInProgress = Set.empty[ShardId] + // rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values + var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]] var unAckedHostShards = Map.empty[ShardId, Cancellable] // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] @@ -520,7 +521,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti case RebalanceTick ⇒ if (state.regions.nonEmpty) { - val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress) + val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet) shardsFuture.value match { case Some(Success(shards)) ⇒ continueRebalance(shards) @@ -537,18 +538,24 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti continueRebalance(shards) case RebalanceDone(shard, ok) ⇒ - rebalanceInProgress -= shard log.debug("Rebalance shard [{}] done [{}]", shard, ok) // The shard could have been removed by ShardRegionTerminated - if (state.shards.contains(shard)) + if (state.shards.contains(shard)) { if (ok) { update(ShardHomeDeallocated(shard)) { evt ⇒ + log.debug("Shard [{}] deallocated after rebalance", shard) state = state.updated(evt) - log.debug("Shard [{}] deallocated", evt.shard) + clearRebalanceInProgress(shard) allocateShardHomesForRememberEntities() } - } else // rebalance not completed, graceful shutdown will be retried + } else { + // rebalance not completed, graceful shutdown will be retried gracefulShutdownInProgress -= state.shards(shard) + clearRebalanceInProgress(shard) + } + } else { + clearRebalanceInProgress(shard) + } case GracefulShutdownReq(region) ⇒ if (!gracefulShutdownInProgress(region)) @@ -599,13 +606,31 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti }: Receive).orElse[Any, Unit](receiveTerminated) + private def clearRebalanceInProgress(shard: String): Unit = { + rebalanceInProgress.get(shard) match { + case Some(pendingGetShardHome) ⇒ + val msg = GetShardHome(shard) + pendingGetShardHome.foreach { getShardHomeSender ⇒ + self.tell(msg, getShardHomeSender) + } + rebalanceInProgress -= shard + case None ⇒ + } + } + + private def deferGetShardHomeRequest(shard: ShardId, from: ActorRef): Unit = { + log.debug("GetShardHome [{}] request from [{}] deferred, because rebalance is in progress for this shard. " + + "It will be handled when rebalance is done.", shard, from) + rebalanceInProgress = rebalanceInProgress.updated(shard, rebalanceInProgress(shard) + from) + } + /** * @return `true` if the message could be handled without state update, i.e. - * the shard location was known or the request was ignored + * the shard location was known or the request was deferred or ignored */ - def handleGetShardHome(shard: String): Boolean = { + def handleGetShardHome(shard: ShardId): Boolean = { if (rebalanceInProgress.contains(shard)) { - log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard) + deferGetShardHomeRequest(shard, sender()) true } else if (!hasAllRegionsRegistered()) { log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) @@ -689,6 +714,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti def regionTerminated(ref: ActorRef): Unit = if (state.regions.contains(ref)) { log.debug("ShardRegion terminated: [{}]", ref) + regionTerminationInProgress += ref state.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } update(ShardRegionTerminated(ref)) { evt ⇒ @@ -724,7 +750,9 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti } def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit = - if (!rebalanceInProgress.contains(shard)) { + if (rebalanceInProgress.contains(shard)) { + deferGetShardHomeRequest(shard, getShardHomeSender) + } else { state.shards.get(shard) match { case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref) case None ⇒ @@ -745,10 +773,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti def continueRebalance(shards: Set[ShardId]): Unit = shards.foreach { shard ⇒ - if (!rebalanceInProgress(shard)) { + if (!rebalanceInProgress.contains(shard)) { state.shards.get(shard) match { case Some(rebalanceFromRegion) ⇒ - rebalanceInProgress += shard + rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, state.regions.keySet union state.regionProxies) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index e5c2b01489..2bcc90bb99 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -80,7 +80,7 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) val second = role("second") commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = INFO + akka.loglevel = DEBUG akka.actor.provider = "cluster" akka.remote.log-remote-lifecycle-events = off akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" @@ -94,6 +94,8 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.rebalance-interval = 1 s + #akka.cluster.sharding.retry-interval = 5 s """).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -161,7 +163,7 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC } } - "use specified region" in within(10.seconds) { + "use specified region" in within(30.seconds) { join(first, first) runOn(first) {