diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index a5df66b723..5dcf6173cd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -90,7 +90,9 @@ object ClusterShardingSettings { entityRecoveryConstantRateStrategyNumberOfEntities = settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities, coordinatorStateWriteMajorityPlus = settings.tuningParameters.coordinatorStateWriteMajorityPlus, - coordinatorStateReadMajorityPlus = settings.tuningParameters.coordinatorStateReadMajorityPlus), + coordinatorStateReadMajorityPlus = settings.tuningParameters.coordinatorStateReadMajorityPlus, + leastShardAllocationAbsoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit, + leastShardAllocationRelativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit), new ClassicClusterSingletonManagerSettings( settings.coordinatorSingletonSettings.singletonName, settings.coordinatorSingletonSettings.role, @@ -175,7 +177,9 @@ object ClusterShardingSettings { val updatingStateTimeout: FiniteDuration, val waitingForStateTimeout: FiniteDuration, val coordinatorStateWriteMajorityPlus: Int, - val coordinatorStateReadMajorityPlus: Int) { + val coordinatorStateReadMajorityPlus: Int, + val leastShardAllocationAbsoluteLimit: Int, + val leastShardAllocationRelativeLimit: Double) { def this(classic: ClassicShardingSettings.TuningParameters) = this( @@ -197,7 +201,9 @@ object ClusterShardingSettings { entityRecoveryConstantRateStrategyFrequency = classic.entityRecoveryConstantRateStrategyFrequency, entityRecoveryConstantRateStrategyNumberOfEntities = classic.entityRecoveryConstantRateStrategyNumberOfEntities, coordinatorStateWriteMajorityPlus = classic.coordinatorStateWriteMajorityPlus, - coordinatorStateReadMajorityPlus = classic.coordinatorStateReadMajorityPlus) + coordinatorStateReadMajorityPlus = classic.coordinatorStateReadMajorityPlus, + leastShardAllocationAbsoluteLimit = classic.leastShardAllocationAbsoluteLimit, + leastShardAllocationRelativeLimit = classic.leastShardAllocationRelativeLimit) require( entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", @@ -241,6 +247,10 @@ object ClusterShardingSettings { copy(coordinatorStateWriteMajorityPlus = value) def withCoordinatorStateReadMajorityPlus(value: Int): TuningParameters = copy(coordinatorStateReadMajorityPlus = value) + def withLeastShardAllocationAbsoluteLimit(value: Int): TuningParameters = + copy(leastShardAllocationAbsoluteLimit = value) + def withLeastShardAllocationRelativeLimit(value: Double): TuningParameters = + copy(leastShardAllocationRelativeLimit = value) private def copy( bufferSize: Int = bufferSize, @@ -261,7 +271,9 @@ object ClusterShardingSettings { updatingStateTimeout: FiniteDuration = updatingStateTimeout, waitingForStateTimeout: FiniteDuration = waitingForStateTimeout, coordinatorStateWriteMajorityPlus: Int = coordinatorStateWriteMajorityPlus, - coordinatorStateReadMajorityPlus: Int = coordinatorStateReadMajorityPlus): TuningParameters = + coordinatorStateReadMajorityPlus: Int = coordinatorStateReadMajorityPlus, + leastShardAllocationAbsoluteLimit: Int = leastShardAllocationAbsoluteLimit, + leastShardAllocationRelativeLimit: Double = leastShardAllocationRelativeLimit): TuningParameters = new TuningParameters( bufferSize = bufferSize, coordinatorFailureBackoff = coordinatorFailureBackoff, @@ -281,10 +293,12 @@ object ClusterShardingSettings { updatingStateTimeout = updatingStateTimeout, waitingForStateTimeout = waitingForStateTimeout, coordinatorStateWriteMajorityPlus = coordinatorStateWriteMajorityPlus, - coordinatorStateReadMajorityPlus = coordinatorStateReadMajorityPlus) + coordinatorStateReadMajorityPlus = coordinatorStateReadMajorityPlus, + leastShardAllocationAbsoluteLimit = leastShardAllocationAbsoluteLimit, + leastShardAllocationRelativeLimit = leastShardAllocationRelativeLimit) override def toString = - s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout,$coordinatorStateReadMajorityPlus,$coordinatorStateReadMajorityPlus)""" + s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout,$coordinatorStateReadMajorityPlus,$coordinatorStateReadMajorityPlus,$leastShardAllocationAbsoluteLimit,$leastShardAllocationRelativeLimit)""" } } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index f7a7138b02..4b9f376b94 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.compat.java8.FutureConverters._ import scala.concurrent.Future + import akka.actor.ActorRefProvider import akka.actor.ExtendedActorSystem import akka.actor.InternalActorRef @@ -28,7 +29,7 @@ import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors import akka.annotation.{ InternalApi, InternalStableApi } import akka.cluster.ClusterSettings.DataCenter -import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity } @@ -279,9 +280,17 @@ import akka.util.JavaDurationConverters._ } override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { - val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold - val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance - new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + if (settings.tuningParameters.leastShardAllocationAbsoluteLimit > 0) { + // new algorithm + val absoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit + val relativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit) + } else { + // old algorithm + val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold + val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance + new ShardCoordinator.LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + } } override lazy val shardState: ActorRef[ClusterShardingQuery] = { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 140631eddd..e6f556d74c 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -119,11 +119,10 @@ object ClusterSharding { * location. * * The logic that decides which shards to rebalance is defined in a plugable shard - * allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] - * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. + * allocation strategy. The default implementation `LeastShardAllocationStrategy` + * picks shards for handoff from the ShardRegion` with most number of previously allocated shards. * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, - * i.e. new members in the cluster. There is a configurable threshold of how large the difference - * must be to begin the rebalancing. This strategy can be replaced by an application specific + * i.e. new members in the cluster. This strategy can be replaced by an application specific * implementation. * * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or @@ -213,8 +212,7 @@ abstract class ClusterSharding { def shardState: ActorRef[ClusterShardingQuery] /** - * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the - * given `settings`. This could be changed in the future. + * The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties. */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 5bd2287750..5ef8b61db6 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -213,8 +213,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding = def shardState: ActorRef[ClusterShardingQuery] /** - * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the - * given `settings`. This could be changed in the future. + * The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties. */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/testkit/scaladsl/TestEntityRefSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/testkit/scaladsl/TestEntityRefSpec.scala index 1839e6641c..a1960e5d3e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/testkit/scaladsl/TestEntityRefSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/testkit/scaladsl/TestEntityRefSpec.scala @@ -14,7 +14,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator -import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.ClusterShardingQuery import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.javadsl @@ -73,7 +73,7 @@ class TestEntityRefSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike w override def defaultShardAllocationStrategy( settings: ClusterShardingSettings): ShardCoordinator.ShardAllocationStrategy = - new LeastShardAllocationStrategy(1, 1) + ShardAllocationStrategy.leastShardAllocationStrategy(1, 0.1) // below are for javadsl override def init[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = ??? diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 519fc8f6eb..617ac625ea 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -98,8 +98,28 @@ akka.cluster.sharding { # Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot) keep-nr-of-batches = 2 - # Setting for the default shard allocation strategy + # Settings for LeastShardAllocationStrategy. + # + # A new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in + # less rebalance rounds (typically 1 or 2 rounds). The amount of shards to rebalance in each + # round can still be limited to make it progress slower. For backwards compatibility + # the new algorithm is not enabled by default. Enable the new algorithm by setting + # `rebalance-absolute-limit` > 0, for example: + # akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit=20 + # The new algorithm is recommended and will become the default in future versions of Akka. least-shard-allocation-strategy { + # Maximum number of shards that will be rebalanced in one rebalance round. + # The lower of this and `rebalance-relative-limit` will be used. + rebalance-absolute-limit = 0 + + # Maximum number of shards that will be rebalanced in one rebalance round. + # Fraction of total number of (known) shards. + # The lower of this and `rebalance-absolute-limit` will be used. + rebalance-relative-limit = 0.1 + + # Deprecated: Use rebalance-absolute-limit and rebalance-relative-limit instead. This property is not used + # when rebalance-absolute-limit > 0. + # # Threshold of how large the difference between most and least number of # allocated shards must be to begin the rebalancing. # The difference between number of shards in the region with most shards and @@ -112,6 +132,9 @@ akka.cluster.sharding { # on different nodes before rebalance will occur. rebalance-threshold = 1 + # Deprecated: Use rebalance-absolute-limit and rebalance-relative-limit instead. This property is not used + # when rebalance-absolute-limit > 0. + # # The number of ongoing rebalancing processes is limited to this number. max-simultaneous-rebalance = 3 } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index b21b890397..7fdf4727f8 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -113,11 +113,10 @@ import akka.util.ccompat.JavaConverters._ * * '''Shard Allocation''': * The logic deciding which shards to rebalance is defined in a plugable shard allocation - * strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]] + * strategy. The default implementation `LeastShardAllocationStrategy` * picks shards for handoff from the `ShardRegion` with highest number of previously allocated shards. * They will then be allocated to the `ShardRegion` with lowest number of previously allocated shards, - * i.e. new members in the cluster. There is a configurable `rebalance-threshold` of how large - * the difference must be to begin the rebalancing. This strategy can be replaced by an application + * i.e. new members in the cluster. This strategy can be replaced by an application * specific implementation. * * '''Recovery''': @@ -172,7 +171,6 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv */ class ClusterSharding(system: ExtendedActorSystem) extends Extension { import ClusterShardingGuardian._ - import ShardCoordinator.LeastShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy private val log = Logging(system, this.getClass) @@ -656,13 +654,20 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } /** - * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the - * given `settings`. This could be changed in the future. + * The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties. */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { - val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold - val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance - new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + if (settings.tuningParameters.leastShardAllocationAbsoluteLimit > 0) { + // new algorithm + val absoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit + val relativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit) + } else { + // old algorithm + val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold + val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance + new ShardCoordinator.LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + } } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 1065d7b39c..d4020f2c89 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -84,7 +84,9 @@ object ClusterShardingSettings { entityRecoveryConstantRateStrategyNumberOfEntities = config.getInt("entity-recovery-constant-rate-strategy.number-of-entities"), coordinatorStateWriteMajorityPlus = configMajorityPlus("coordinator-state.write-majority-plus"), - coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus")) + coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus"), + leastShardAllocationAbsoluteLimit = config.getInt("least-shard-allocation-strategy.rebalance-absolute-limit"), + leastShardAllocationRelativeLimit = config.getDouble("least-shard-allocation-strategy.rebalance-relative-limit")) val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) @@ -148,12 +150,62 @@ object ClusterShardingSettings { val entityRecoveryConstantRateStrategyFrequency: FiniteDuration, val entityRecoveryConstantRateStrategyNumberOfEntities: Int, val coordinatorStateWriteMajorityPlus: Int, - val coordinatorStateReadMajorityPlus: Int) { + val coordinatorStateReadMajorityPlus: Int, + val leastShardAllocationAbsoluteLimit: Int, + val leastShardAllocationRelativeLimit: Double) { require( entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") + // included for binary compatibility + @deprecated( + "Use the ClusterShardingSettings factory methods or the constructor including " + + "leastShardAllocationAbsoluteLimit and leastShardAllocationRelativeLimit instead", + since = "2.6.10") + def this( + coordinatorFailureBackoff: FiniteDuration, + retryInterval: FiniteDuration, + bufferSize: Int, + handOffTimeout: FiniteDuration, + shardStartTimeout: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entityRestartBackoff: FiniteDuration, + rebalanceInterval: FiniteDuration, + snapshotAfter: Int, + keepNrOfBatches: Int, + leastShardAllocationRebalanceThreshold: Int, + leastShardAllocationMaxSimultaneousRebalance: Int, + waitingForStateTimeout: FiniteDuration, + updatingStateTimeout: FiniteDuration, + entityRecoveryStrategy: String, + entityRecoveryConstantRateStrategyFrequency: FiniteDuration, + entityRecoveryConstantRateStrategyNumberOfEntities: Int, + coordinatorStateWriteMajorityPlus: Int, + coordinatorStateReadMajorityPlus: Int) = + this( + coordinatorFailureBackoff, + retryInterval, + bufferSize, + handOffTimeout, + shardStartTimeout, + shardFailureBackoff, + entityRestartBackoff, + rebalanceInterval, + snapshotAfter, + keepNrOfBatches, + leastShardAllocationRebalanceThreshold, + leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout, + updatingStateTimeout, + entityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities, + coordinatorStateWriteMajorityPlus, + coordinatorStateReadMajorityPlus, + leastShardAllocationAbsoluteLimit = 100, + leastShardAllocationRelativeLimit = 0.1) + // included for binary compatibility @deprecated( "Use the ClusterShardingSettings factory methods or the constructor including " + diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 081c600749..e5e4b6f43f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -73,6 +73,51 @@ object ShardCoordinator { majorityMinCap, rememberEntitiesStoreProvider)).withDeploy(Deploy.local) + /** + * Java API: `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least + * number of previously allocated shards. + * + * When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + * The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number + * of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of + * previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each + * round can be limited to make it progress slower since rebalancing too many shards at the same time could + * result in additional load on the system. For example, causing many Event Sourced entites to be started + * at the same time. + * + * It will not rebalance when there is already an ongoing rebalance in progress. + * + * @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round + * @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced + * in one rebalance round + */ + def leastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double): ShardAllocationStrategy = + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit) + + object ShardAllocationStrategy { + + /** + * Scala API: `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least + * number of previously allocated shards. + * + * When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + * The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number + * of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of + * previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each + * round can be limited to make it progress slower since rebalancing too many shards at the same time could + * result in additional load on the system. For example, causing many Event Sourced entites to be started + * at the same time. + * + * It will not rebalance when there is already an ongoing rebalance in progress. + * + * @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round + * @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced + * in one rebalance round + */ + def leastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double): ShardAllocationStrategy = + new internal.LeastShardAllocationStrategy(absoluteLimit, relativeLimit) + } + /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. * @@ -177,7 +222,12 @@ object ShardCoordinator { private val emptyRebalanceResult = Future.successful(Set.empty[ShardId]) /** - * The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]] + * Use [[akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy.leastShardAllocationStrategy]] instead. + * The new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in + * less rebalance rounds (typically 1 or 2 rounds). The amount of shards to rebalance in each + * round can still be limited to make it progress slower. + * + * This implementation of [[ShardCoordinator.ShardAllocationStrategy]] * allocates new shards to the `ShardRegion` with least number of previously allocated shards. * * When a node is removed from the cluster the shards on that node will be started on the remaining nodes, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/LeastShardAllocationStrategy.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/LeastShardAllocationStrategy.scala new file mode 100644 index 0000000000..31c6f29842 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/LeastShardAllocationStrategy.scala @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal + +import scala.collection.immutable +import scala.concurrent.Future + +import akka.actor.ActorRef +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion.ShardId + +/** + * INTERNAL API + */ +@InternalApi private[akka] object LeastShardAllocationStrategy { + private val emptyRebalanceResult = Future.successful(Set.empty[ShardId]) +} + +/** + * INTERNAL API: Use `ShardCoordinator.ShardAllocationStrategy.leastShardAllocationStrategy` factory method. + * + * `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least + * number of previously allocated shards. + * + * When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + * The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number + * of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of + * previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each + * round can be limited to make it progress slower since rebalancing too many shards at the same time could + * result in additional load on the system. For example, causing many Event Sourced entites to be started + * at the same time. + * + * It will not rebalance when there is already an ongoing rebalance in progress. + * + * @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round + * @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced + * in one rebalance round + */ +@InternalApi private[akka] final class LeastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double) + extends ShardAllocationStrategy { + import LeastShardAllocationStrategy.emptyRebalanceResult + + override def allocateShard( + requester: ActorRef, + shardId: ShardId, + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = { + val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) => v.size } + Future.successful(regionWithLeastShards) + } + + override def rebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { + import math.max + import math.min + + def limit(numberOfShards: Int): Int = + max(1, min((relativeLimit * numberOfShards).toInt, absoluteLimit)) + + def rebalancePhase1( + numberOfShards: Int, + optimalPerRegion: Int, + sortedAllocations: Vector[immutable.IndexedSeq[ShardId]]): Set[ShardId] = { + val selected = Vector.newBuilder[ShardId] + sortedAllocations.foreach { shards => + if (shards.size > optimalPerRegion) { + selected ++= shards.take(shards.size - optimalPerRegion) + } + } + val result = selected.result() + result.take(limit(numberOfShards)).toSet + } + + def rebalancePhase2( + numberOfShards: Int, + optimalPerRegion: Int, + sortedAllocations: Vector[immutable.IndexedSeq[ShardId]]): Future[Set[ShardId]] = { + // In the first phase the optimalPerRegion is rounded up, and depending on number of shards per region and number + // of regions that might not be the exact optimal. + // In second phase we look for diff of >= 2 below optimalPerRegion and rebalance that number of shards. + val countBelowOptimal = + sortedAllocations.iterator.map(shards => max(0, (optimalPerRegion - 1) - shards.size)).sum + if (countBelowOptimal == 0) { + emptyRebalanceResult + } else { + val selected = Vector.newBuilder[ShardId] + sortedAllocations.foreach { shards => + if (shards.size >= optimalPerRegion) { + selected += shards.head + } + } + val result = selected.result().take(min(countBelowOptimal, limit(numberOfShards))).toSet + Future.successful(result) + } + } + + if (rebalanceInProgress.nonEmpty) { + // one rebalance at a time + emptyRebalanceResult + } else { + val numberOfShards = currentShardAllocations.valuesIterator.map(_.size).sum + val numberOfRegions = currentShardAllocations.size + if (numberOfRegions == 0 || numberOfShards == 0) { + emptyRebalanceResult + } else { + val sortedAllocations = currentShardAllocations.valuesIterator.toVector.sortBy(_.size) + val optimalPerRegion = numberOfShards / numberOfRegions + (if (numberOfShards % numberOfRegions == 0) 0 else 1) + + val result1 = rebalancePhase1(numberOfShards, optimalPerRegion, sortedAllocations) + + if (result1.nonEmpty) { + Future.successful(result1) + } else { + rebalancePhase2(numberOfShards, optimalPerRegion, sortedAllocations) + } + } + } + } + + override def toString: ShardId = + s"LeastShardAllocationStrategy($absoluteLimit,$relativeLimit)" +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 1018d2dc75..c0b8afc8c6 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -7,6 +7,7 @@ package akka.cluster.sharding import scala.concurrent.duration._ import akka.actor._ +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.remote.testconductor.RoleName import akka.testkit._ @@ -59,8 +60,7 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard entityProps = Props[ShardedEntity](), extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, extractShardId = MultiNodeClusterShardingSpec.intExtractShardId, - allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1), + allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0), handOffStopMessage = ShardedEntity.Stop) lazy val region = ClusterSharding(system).shardRegion(typeName) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index 339da2728a..8e5b65e834 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.actor._ import akka.cluster.MemberStatus +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats } import akka.testkit._ import akka.util.ccompat._ @@ -57,8 +58,7 @@ abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMin entityProps = TestActors.echoActorProps, extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, extractShardId = MultiNodeClusterShardingSpec.intExtractShardId, - allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1), + allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0), handOffStopMessage = ShardedEntity.Stop) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 1d4b20033c..fe14bb4fc7 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -13,6 +13,7 @@ import akka.actor._ import akka.cluster.Cluster import akka.cluster.ddata.{ Replicator, ReplicatorSettings } import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate } import akka.cluster.sharding.internal.{ DDataRememberEntitiesProvider, EventSourcedRememberEntitiesProvider } import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings } @@ -150,8 +151,8 @@ abstract class ClusterShardingSpecConfig( number-of-entities = 1 } least-shard-allocation-strategy { - rebalance-threshold = 1 - max-simultaneous-rebalance = 1 + rebalance-absolute-limit = 1 + rebalance-relative-limit = 1.0 } } akka.testconductor.barrier-timeout = 70s @@ -299,7 +300,7 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) def coordinatorProps(typeName: String, rebalanceEnabled: Boolean, rememberEntities: Boolean): Props = { val allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0) val cfg = ConfigFactory.parseString(s""" handoff-timeout = 10s shard-start-timeout = 10s diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index 0499ffe829..5cb4c79c4c 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration._ import akka.actor.{ Actor, ActorRef, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator.Internal.ShardStopped -import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.{ ExtractEntityId, ExtractShardId, HandOffStopper, Msg } import akka.testkit.WithLogCapturing import akka.testkit.{ AkkaSpec, TestProbe } @@ -74,7 +74,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" settings = settingsWithRole, extractEntityId = extractEntityId, extractShardId = extractShardId, - allocationStrategy = new LeastShardAllocationStrategy(3, 4), + allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(3, 0.1), handOffStopMessage = PoisonPill) probe.expectMsg(StartingProxy(typeName, settingsWithRole.role, None, extractEntityId, extractShardId)) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/DeprecatedLeastShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/DeprecatedLeastShardAllocationStrategySpec.scala new file mode 100644 index 0000000000..c2f056f4c3 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/DeprecatedLeastShardAllocationStrategySpec.scala @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.ActorRef +import akka.actor.Props +import akka.testkit.AkkaSpec + +class DeprecatedLeastShardAllocationStrategySpec extends AkkaSpec { + import ShardCoordinator._ + + val regionA = system.actorOf(Props.empty, "regionA") + val regionB = system.actorOf(Props.empty, "regionB") + val regionC = system.actorOf(Props.empty, "regionC") + + def createAllocations(aCount: Int, bCount: Int = 0, cCount: Int = 0): Map[ActorRef, Vector[String]] = { + val shards = (1 to (aCount + bCount + cCount)).map(n => ("00" + n.toString).takeRight(3)) + Map( + regionA -> shards.take(aCount).toVector, + regionB -> shards.slice(aCount, aCount + bCount).toVector, + regionC -> shards.takeRight(cCount).toVector) + } + + "DeprecatedLeastShardAllocationStrategy" must { + "allocate to region with least number of shards" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 1) + allocationStrategy.allocateShard(regionA, "003", allocations).futureValue should ===(regionC) + } + + "rebalance from region with most number of shards [2, 0, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 2) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set.empty[String]) + } + + "not rebalance when diff equal to threshold, [1, 1, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 1) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) + } + + "rebalance from region with most number of shards [1, 2, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 2) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002")) + allocationStrategy.rebalance(allocations, Set("002")).futureValue should ===(Set.empty[String]) + } + + "rebalance from region with most number of shards [3, 0, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 3) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("002")) + } + + "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + } + + "rebalance from region with most number of shards [4, 4, 2], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4, cCount = 2) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + // not optimal, 005 stopped and started again, but ok + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + } + + "rebalance from region with most number of shards [1, 3, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 2) + + // so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) + + val allocations2 = createAllocations(aCount = 1, bCount = 3) + allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("002")) + allocationStrategy.rebalance(allocations2, Set("002")).futureValue should ===(Set.empty[String]) + } + + "not rebalance when diff equal to threshold, [2, 2, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 2, bCount = 2) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) + } + + "rebalance from region with most number of shards [3, 3, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 3, bCount = 3) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("004")) + allocationStrategy.rebalance(allocations, Set("001", "004")).futureValue should ===(Set.empty) + } + + "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("005", "006")) + allocationStrategy.rebalance(allocations, Set("001", "002", "005", "006")).futureValue should ===(Set.empty) + } + + "rebalance from region with most number of shards [5, 5, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 5, bCount = 5) + // optimal would => [4, 4, 2] or even => [3, 4, 3] + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + // if 001 and 002 are not started quickly enough this is stopping more than optimal + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("006", "007")) + allocationStrategy.rebalance(allocations, Set("001", "002", "006", "007")).futureValue should ===(Set("003")) + } + + "rebalance from region with most number of shards [50, 50, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 100) + val allocations = createAllocations(aCount = 50, cCount = 50) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("051", "052")) + allocationStrategy.rebalance(allocations, Set("001", "002", "051", "052")).futureValue should ===( + Set("003", "004")) + } + + "limit number of simultaneous rebalance" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2) + val allocations = createAllocations(aCount = 1, bCount = 10) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002", "003")) + allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String]) + } + + "not pick shards that are in progress" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 4) + val allocations = createAllocations(aCount = 10) + allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set("001", "004")) + } + + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategyRandomizedSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategyRandomizedSpec.scala new file mode 100644 index 0000000000..e8eebd5bfe --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategyRandomizedSpec.scala @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.util.Random + +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion.ShardId +import akka.testkit.AkkaSpec + +class LeastShardAllocationStrategyRandomizedSpec extends AkkaSpec("akka.loglevel = INFO") { + import LeastShardAllocationStrategySpec.{ afterRebalance, countShards, countShardsPerRegion } + + def createAllocations(countPerRegion: Map[ActorRef, Int]): Map[ActorRef, immutable.IndexedSeq[ShardId]] = { + countPerRegion.map { + case (region, count) => + region -> (1 to count).map(n => ("00" + n.toString).takeRight(3)).map(n => s"${region.path.name}-$n").toVector + } + } + + private val strategyWithoutLimits = + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 100000, relativeLimit = 1.0) + + private val rndSeed = System.currentTimeMillis() + private val rnd = new Random(rndSeed) + info(s"Random seed: $rndSeed") + + private var iteration = 1 + private val iterationsPerTest = 10 + + private def testRebalance( + allocationStrategy: ShardAllocationStrategy, + maxRegions: Int, + maxShardsPerRegion: Int, + expectedMaxSteps: Int): Unit = { + (1 to iterationsPerTest).foreach { _ => + iteration += 1 + val numberOfRegions = rnd.nextInt(maxRegions) + 1 + val regions = (1 to numberOfRegions).map(n => system.actorOf(Props.empty, s"$iteration-R$n")) + val countPerRegion = regions.map { region => + region -> rnd.nextInt(maxShardsPerRegion) + }.toMap + val allocations = createAllocations(countPerRegion) + withClue(s"test $allocationStrategy [${countShardsPerRegion(allocations).mkString(",")}]: ") { + testRebalance(allocationStrategy, allocations, Vector(allocations), expectedMaxSteps) + } + regions.foreach(system.stop) + } + } + + @tailrec private def testRebalance( + allocationStrategy: ShardAllocationStrategy, + allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], + steps: Vector[Map[ActorRef, immutable.IndexedSeq[ShardId]]], + maxSteps: Int): Unit = { + val round = steps.size + val rebalanceResult = allocationStrategy.rebalance(allocations, Set.empty).value.get.get + val newAllocations = afterRebalance(allocationStrategy, allocations, rebalanceResult) + + countShards(newAllocations) should ===(countShards(allocations)) + val min = countShardsPerRegion(newAllocations).min + val max = countShardsPerRegion(newAllocations).max + val diff = max - min + val newSteps = steps :+ newAllocations + if (diff <= 1) { + if (round >= 3 && maxSteps <= 10) { + // Should be very rare (I have not seen it) + system.log.info( + s"rebalance solved in round $round, [${newSteps.map(step => countShardsPerRegion(step).mkString(",")).mkString(" => ")}]") + } + () + } else if (round == maxSteps) { + fail( + s"Couldn't solve rebalance in $round rounds, [${newSteps.map(step => countShardsPerRegion(step).mkString(",")).mkString(" => ")}]") + } else { + testRebalance(allocationStrategy, newAllocations, newSteps, maxSteps) + } + } + + "LeastShardAllocationStrategy with random scenario" must { + + "rebalance shards with max 5 regions / 5 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 5, maxShardsPerRegion = 5, expectedMaxSteps = 2) + } + + "rebalance shards with max 5 regions / 100 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 5, maxShardsPerRegion = 100, expectedMaxSteps = 2) + } + + "rebalance shards with max 20 regions / 5 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 5, expectedMaxSteps = 2) + } + + "rebalance shards with max 20 regions / 20 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 20, expectedMaxSteps = 2) + } + + "rebalance shards with max 20 regions / 200 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 200, expectedMaxSteps = 5) + } + + "rebalance shards with max 100 regions / 100 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 100, maxShardsPerRegion = 100, expectedMaxSteps = 5) + } + + "rebalance shards with max 100 regions / 1000 shards" in { + testRebalance(strategyWithoutLimits, maxRegions = 100, maxShardsPerRegion = 1000, expectedMaxSteps = 5) + } + + "rebalance shards with max 20 regions / 20 shards and limits" in { + val absoluteLimit = 3 + rnd.nextInt(7) + 3 + val relativeLimit = 0.05 + (rnd.nextDouble() * 0.95) + + val strategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit) + testRebalance(strategy, maxRegions = 20, maxShardsPerRegion = 20, expectedMaxSteps = 20) + } + + } + +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala index 616b0f2f65..175d69edcb 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala @@ -4,142 +4,185 @@ package akka.cluster.sharding +import scala.collection.immutable + +import akka.actor.ActorPath import akka.actor.ActorRef +import akka.actor.ActorRefProvider +import akka.actor.Address +import akka.actor.MinimalActorRef import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion.ShardId import akka.testkit.AkkaSpec -class LeastShardAllocationStrategySpec extends AkkaSpec { - import ShardCoordinator._ +object LeastShardAllocationStrategySpec { - val regionA = system.actorOf(Props.empty, "regionA") - val regionB = system.actorOf(Props.empty, "regionB") - val regionC = system.actorOf(Props.empty, "regionC") + private object DummyActorRef extends MinimalActorRef { + override val path: ActorPath = RootActorPath(Address("akka", "myapp")) / "system" / "fake" + + override def provider: ActorRefProvider = ??? + } + + def afterRebalance( + allocationStrategy: ShardAllocationStrategy, + allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], + rebalance: Set[ShardId]): Map[ActorRef, immutable.IndexedSeq[ShardId]] = { + val allocationsAfterRemoval = allocations.map { + case (region, shards) => region -> shards.filterNot(rebalance) + } + + rebalance.toList.sorted.foldLeft(allocationsAfterRemoval) { + case (acc, shard) => + val region = allocationStrategy.allocateShard(DummyActorRef, shard, acc).value.get.get + acc.updated(region, acc(region) :+ shard) + } + } + + def countShardsPerRegion(newAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Vector[Int] = { + newAllocations.valuesIterator.map(_.size).toVector + } + + def countShards(allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Int = { + countShardsPerRegion(allocations).sum + } + + def allocationCountsAfterRebalance( + allocationStrategy: ShardAllocationStrategy, + allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], + rebalance: Set[ShardId]): Vector[Int] = { + countShardsPerRegion(afterRebalance(allocationStrategy, allocations, rebalance)) + } +} + +class LeastShardAllocationStrategySpec extends AkkaSpec { + import LeastShardAllocationStrategySpec.{ afterRebalance, allocationCountsAfterRebalance } + + private val regionA = system.actorOf(Props.empty, "regionA") + private val regionB = system.actorOf(Props.empty, "regionB") + private val regionC = system.actorOf(Props.empty, "regionC") + + private val shards = (1 to 999).map(n => ("00" + n.toString).takeRight(3)) def createAllocations(aCount: Int, bCount: Int = 0, cCount: Int = 0): Map[ActorRef, Vector[String]] = { - val shards = (1 to (aCount + bCount + cCount)).map(n => ("00" + n.toString).takeRight(3)) Map( regionA -> shards.take(aCount).toVector, regionB -> shards.slice(aCount, aCount + bCount).toVector, - regionC -> shards.takeRight(cCount).toVector) + regionC -> shards.slice(aCount + bCount, aCount + bCount + cCount).toVector) } + private val strategyWithoutLimits = + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1000, relativeLimit = 1.0) + "LeastShardAllocationStrategy" must { "allocate to region with least number of shards" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 10) + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 1, bCount = 1) allocationStrategy.allocateShard(regionA, "003", allocations).futureValue should ===(regionC) } - "rebalance from region with most number of shards [2, 0, 0], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 2) - - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) - allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set.empty[String]) + "rebalance shards [1, 2, 0]" in { + val allocationStrategy = strategyWithoutLimits + val allocations = createAllocations(aCount = 1, bCount = 2) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("002")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 1)) } - "not rebalance when diff equal to threshold, [1, 1, 0], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + "rebalance shards [2, 0, 0]" in { + val allocationStrategy = strategyWithoutLimits + val allocations = createAllocations(aCount = 2) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("001")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 0)) + } + + "not rebalance shards [1, 1, 0]" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 1, bCount = 1) allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) } - "rebalance from region with most number of shards [1, 2, 0], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 1, bCount = 2) - - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002")) - allocationStrategy.rebalance(allocations, Set("002")).futureValue should ===(Set.empty[String]) - } - - "rebalance from region with most number of shards [3, 0, 0], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + "rebalance shards [3, 0, 0]" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 3) - - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) - allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("002")) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("001", "002")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 1)) } - "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + "rebalance shards [4, 4, 0]" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 4, bCount = 4) - - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) - allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("001", "005")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(3, 3, 2)) } - "rebalance from region with most number of shards [4, 4, 2], rebalanceThreshold=1" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + "rebalance shards [4, 4, 2]" in { + // this is handled by phase 2, to find diff of 2 + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 4, bCount = 4, cCount = 2) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) - // not optimal, 005 stopped and started again, but ok - allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("001")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(3, 4, 3)) } - "rebalance from region with most number of shards [1, 3, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 1, bCount = 2) - - // so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) - - val allocations2 = createAllocations(aCount = 1, bCount = 3) - allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("002")) - allocationStrategy.rebalance(allocations2, Set("002")).futureValue should ===(Set.empty[String]) - } - - "not rebalance when diff equal to threshold, [2, 2, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 2, bCount = 2) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) - } - - "rebalance from region with most number of shards [3, 3, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 3, bCount = 3) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) - allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("004")) - allocationStrategy.rebalance(allocations, Set("001", "004")).futureValue should ===(Set.empty) - } - - "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) - val allocations = createAllocations(aCount = 4, bCount = 4) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) - allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("005", "006")) - allocationStrategy.rebalance(allocations, Set("001", "002", "005", "006")).futureValue should ===(Set.empty) - } - - "rebalance from region with most number of shards [5, 5, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + "rebalance shards [5, 5, 0]" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 5, bCount = 5) - // optimal would => [4, 4, 2] or even => [3, 4, 3] - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) - // if 001 and 002 are not started quickly enough this is stopping more than optimal - allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("006", "007")) - allocationStrategy.rebalance(allocations, Set("001", "002", "006", "007")).futureValue should ===(Set("003")) + val result1 = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result1 should ===(Set("001", "006")) + + // so far [4, 4, 2] + allocationCountsAfterRebalance(allocationStrategy, allocations, result1) should ===(Vector(4, 4, 2)) + val allocations2 = afterRebalance(allocationStrategy, allocations, result1) + // second phase will find the diff of 2, resulting in [3, 4, 3] + val result2 = allocationStrategy.rebalance(allocations2, Set.empty).futureValue + result2 should ===(Set("002")) + allocationCountsAfterRebalance(allocationStrategy, allocations2, result2) should ===(Vector(3, 4, 3)) } - "rebalance from region with most number of shards [50, 50, 0], rebalanceThreshold=2" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 100) + "rebalance shards [50, 50, 0]" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 50, cCount = 50) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) - allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("051", "052")) - allocationStrategy.rebalance(allocations, Set("001", "002", "051", "052")).futureValue should ===( - Set("003", "004")) + val result1 = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result1 should ===(shards.take(50 - 34).toSet ++ shards.drop(50).take(50 - 34)) + + // so far [34, 34, 32] + allocationCountsAfterRebalance(allocationStrategy, allocations, result1).sorted should ===( + Vector(34, 34, 32).sorted) + val allocations2 = afterRebalance(allocationStrategy, allocations, result1) + // second phase will find the diff of 2, resulting in [33, 34, 33] + val result2 = allocationStrategy.rebalance(allocations2, Set.empty).futureValue + result2 should ===(Set("017")) + allocationCountsAfterRebalance(allocationStrategy, allocations2, result2).sorted should ===( + Vector(33, 34, 33).sorted) } - "limit number of simultaneous rebalance" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2) - val allocations = createAllocations(aCount = 1, bCount = 10) - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002", "003")) - allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String]) + "respect absolute limit of number shards" in { + val allocationStrategy = + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 3, relativeLimit = 1.0) + val allocations = createAllocations(aCount = 1, bCount = 9) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("002", "003", "004")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(2, 6, 2)) } - "not pick shards that are in progress" in { - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 4) + "respect relative limit of number shards" in { + val allocationStrategy = + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 5, relativeLimit = 0.3) + val allocations = createAllocations(aCount = 1, bCount = 9) + val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue + result should ===(Set("002", "003", "004")) + allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(2, 6, 2)) + } + + "not rebalance when in progress" in { + val allocationStrategy = strategyWithoutLimits val allocations = createAllocations(aCount = 10) - allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set("001", "004")) + allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String]) } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala index f34aefb0f1..3caaba0b31 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala @@ -19,9 +19,10 @@ import akka.testkit.WithLogCapturing import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike - import scala.concurrent.duration._ +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy + object RememberEntitiesFailureSpec { val config = ConfigFactory.parseString(s""" akka.loglevel = DEBUG @@ -320,7 +321,7 @@ class RememberEntitiesFailureSpec ClusterShardingSettings(system).withRememberEntities(true), extractEntityId, extractShardId, - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 3), + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1, relativeLimit = 0.1), "graceful-stop") val probe = TestProbe() @@ -361,7 +362,7 @@ class RememberEntitiesFailureSpec ClusterShardingSettings(system).withRememberEntities(true), extractEntityId, extractShardId, - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 3), + ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1, relativeLimit = 0.1), "graceful-stop") val probe = TestProbe() diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 5975e6fa94..3161c5254c 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -164,9 +164,33 @@ The `number-of-shards` configuration value must be the same for all nodes in the configuration check when joining. Changing the value requires stopping all nodes in the cluster. The shards are allocated to the nodes in the cluster. The decision of where to allocate a shard is done -by a shard allocation strategy. The default implementation @apidoc[ShardCoordinator.LeastShardAllocationStrategy] -allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards. -This strategy can be replaced by an application specific implementation. +by a shard allocation strategy. + +The default implementation `LeastShardAllocationStrategy` allocates new shards to the `ShardRegion` (node) with least +number of previously allocated shards. This strategy can be replaced by an application specific implementation. + +When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. +The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number +of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of +previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each +round can be limited to make it progress slower since rebalancing too many shards at the same time could +result in additional load on the system. For example, causing many Event Sourced entites to be started +at the same time. + +A new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in a few rebalance rounds +(typically 1 or 2 rounds). For backwards compatibility the new algorithm is not enabled by default. +The new algorithm is recommended and will become the default in future versions of Akka. +You enable the new algorithm by setting `rebalance-absolute-limit` > 0, for example: + +``` +akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit = 20 +``` + +The `rebalance-absolute-limit` is the maximum number of shards that will be rebalanced in one rebalance round. + +You may also want to tune the `akka.cluster.sharding.least-shard-allocation-strategy.rebalance-relative-limit`. +The `rebalance-relative-limit` is a fraction (< 1.0) of total number of (known) shards that will be rebalanced +in one rebalance round. The lower result of `rebalance-relative-limit` and `rebalance-absolute-limit` will be used. ### External shard allocation @@ -538,7 +562,10 @@ properties are read by the `ClusterShardingSettings` when created with an ActorS It is also possible to amend the `ClusterShardingSettings` or create it from another config section with the same layout as below. -One important configuration property is `number-of-shards` as described in @ref:[Shard allocation](#shard-allocation) +One important configuration property is `number-of-shards` as described in @ref:[Shard allocation](#shard-allocation). + +You may also need to tune the configuration properties is `rebalance-absolute-limit` and `rebalance-relative-limit` +as described in @ref:[Shard allocation](#shard-allocation). @@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config }