diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes new file mode 100644 index 0000000000..28d6c525d3 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.8.backwards.excludes @@ -0,0 +1,3 @@ +# #24191 +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceInProgress") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceInProgress_=") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 866b1d1904..a423c15ec5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -420,7 +420,8 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti var allRegionsRegistered = false var state = State.empty.withRememberEntities(settings.rememberEntities) - var rebalanceInProgress = Set.empty[ShardId] + // rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values + var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]] var unAckedHostShards = Map.empty[ShardId, Cancellable] // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] @@ -520,7 +521,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti case RebalanceTick ⇒ if (state.regions.nonEmpty) { - val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress) + val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet) shardsFuture.value match { case Some(Success(shards)) ⇒ continueRebalance(shards) @@ -537,18 +538,24 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti continueRebalance(shards) case RebalanceDone(shard, ok) ⇒ - rebalanceInProgress -= shard log.debug("Rebalance shard [{}] done [{}]", shard, ok) // The shard could have been removed by ShardRegionTerminated - if (state.shards.contains(shard)) + if (state.shards.contains(shard)) { if (ok) { update(ShardHomeDeallocated(shard)) { evt ⇒ + log.debug("Shard [{}] deallocated after rebalance", shard) state = state.updated(evt) - log.debug("Shard [{}] deallocated", evt.shard) + clearRebalanceInProgress(shard) allocateShardHomesForRememberEntities() } - } else // rebalance not completed, graceful shutdown will be retried + } else { + // rebalance not completed, graceful shutdown will be retried gracefulShutdownInProgress -= state.shards(shard) + clearRebalanceInProgress(shard) + } + } else { + clearRebalanceInProgress(shard) + } case GracefulShutdownReq(region) ⇒ if (!gracefulShutdownInProgress(region)) @@ -599,13 +606,31 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti }: Receive).orElse[Any, Unit](receiveTerminated) + private def clearRebalanceInProgress(shard: String): Unit = { + rebalanceInProgress.get(shard) match { + case Some(pendingGetShardHome) ⇒ + val msg = GetShardHome(shard) + pendingGetShardHome.foreach { getShardHomeSender ⇒ + self.tell(msg, getShardHomeSender) + } + rebalanceInProgress -= shard + case None ⇒ + } + } + + private def deferGetShardHomeRequest(shard: ShardId, from: ActorRef): Unit = { + log.debug("GetShardHome [{}] request from [{}] deferred, because rebalance is in progress for this shard. " + + "It will be handled when rebalance is done.", shard, from) + rebalanceInProgress = rebalanceInProgress.updated(shard, rebalanceInProgress(shard) + from) + } + /** * @return `true` if the message could be handled without state update, i.e. - * the shard location was known or the request was ignored + * the shard location was known or the request was deferred or ignored */ - def handleGetShardHome(shard: String): Boolean = { + def handleGetShardHome(shard: ShardId): Boolean = { if (rebalanceInProgress.contains(shard)) { - log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard) + deferGetShardHomeRequest(shard, sender()) true } else if (!hasAllRegionsRegistered()) { log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) @@ -689,6 +714,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti def regionTerminated(ref: ActorRef): Unit = if (state.regions.contains(ref)) { log.debug("ShardRegion terminated: [{}]", ref) + regionTerminationInProgress += ref state.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } update(ShardRegionTerminated(ref)) { evt ⇒ @@ -724,7 +750,9 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti } def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit = - if (!rebalanceInProgress.contains(shard)) { + if (rebalanceInProgress.contains(shard)) { + deferGetShardHomeRequest(shard, getShardHomeSender) + } else { state.shards.get(shard) match { case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref) case None ⇒ @@ -745,10 +773,10 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti def continueRebalance(shards: Set[ShardId]): Unit = shards.foreach { shard ⇒ - if (!rebalanceInProgress(shard)) { + if (!rebalanceInProgress.contains(shard)) { state.shards.get(shard) match { case Some(rebalanceFromRegion) ⇒ - rebalanceInProgress += shard + rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, state.regions.keySet union state.regionProxies) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index e5c2b01489..2bcc90bb99 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -80,7 +80,7 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) val second = role("second") commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = INFO + akka.loglevel = DEBUG akka.actor.provider = "cluster" akka.remote.log-remote-lifecycle-events = off akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" @@ -94,6 +94,8 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.rebalance-interval = 1 s + #akka.cluster.sharding.retry-interval = 5 s """).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -161,7 +163,7 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC } } - "use specified region" in within(10.seconds) { + "use specified region" in within(30.seconds) { join(first, first) runOn(first) {