Holistic shard allocation strategy, #29543 (#29555)

* The rebalance in the LeastShardAllocationStrategy is only comparing the region
  with most shards with the one with least shards. Makes the rebalance rather
  slow. By default it's only rebalancing 1 shard at a time.
* This new strategy looks at all current allocations to find the optimal
  number of shards per region and tries to adjust towards that value.
  Picking from all regions with more shards than the optimal.
* Absolute and relative limit on how many shards that can be rebalanced
  in one round.
* It's also not starting a new rebalance round until the previous has
  completed.
* unit tests
* second phase for fine grained rebalance, due to rounding it will not be perfect in the first phase
* randomized unit test
* configuration settings
* docs
This commit is contained in:
Patrik Nordwall 2020-09-11 08:49:45 +02:00 committed by GitHub
parent 1b4e6c2d30
commit f0b3c9089b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 762 additions and 143 deletions

View file

@ -90,7 +90,9 @@ object ClusterShardingSettings {
entityRecoveryConstantRateStrategyNumberOfEntities =
settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus = settings.tuningParameters.coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus = settings.tuningParameters.coordinatorStateReadMajorityPlus),
coordinatorStateReadMajorityPlus = settings.tuningParameters.coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit),
new ClassicClusterSingletonManagerSettings(
settings.coordinatorSingletonSettings.singletonName,
settings.coordinatorSingletonSettings.role,
@ -175,7 +177,9 @@ object ClusterShardingSettings {
val updatingStateTimeout: FiniteDuration,
val waitingForStateTimeout: FiniteDuration,
val coordinatorStateWriteMajorityPlus: Int,
val coordinatorStateReadMajorityPlus: Int) {
val coordinatorStateReadMajorityPlus: Int,
val leastShardAllocationAbsoluteLimit: Int,
val leastShardAllocationRelativeLimit: Double) {
def this(classic: ClassicShardingSettings.TuningParameters) =
this(
@ -197,7 +201,9 @@ object ClusterShardingSettings {
entityRecoveryConstantRateStrategyFrequency = classic.entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities = classic.entityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus = classic.coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus = classic.coordinatorStateReadMajorityPlus)
coordinatorStateReadMajorityPlus = classic.coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit = classic.leastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit = classic.leastShardAllocationRelativeLimit)
require(
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
@ -241,6 +247,10 @@ object ClusterShardingSettings {
copy(coordinatorStateWriteMajorityPlus = value)
def withCoordinatorStateReadMajorityPlus(value: Int): TuningParameters =
copy(coordinatorStateReadMajorityPlus = value)
def withLeastShardAllocationAbsoluteLimit(value: Int): TuningParameters =
copy(leastShardAllocationAbsoluteLimit = value)
def withLeastShardAllocationRelativeLimit(value: Double): TuningParameters =
copy(leastShardAllocationRelativeLimit = value)
private def copy(
bufferSize: Int = bufferSize,
@ -261,7 +271,9 @@ object ClusterShardingSettings {
updatingStateTimeout: FiniteDuration = updatingStateTimeout,
waitingForStateTimeout: FiniteDuration = waitingForStateTimeout,
coordinatorStateWriteMajorityPlus: Int = coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus: Int = coordinatorStateReadMajorityPlus): TuningParameters =
coordinatorStateReadMajorityPlus: Int = coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit: Int = leastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit: Double = leastShardAllocationRelativeLimit): TuningParameters =
new TuningParameters(
bufferSize = bufferSize,
coordinatorFailureBackoff = coordinatorFailureBackoff,
@ -281,10 +293,12 @@ object ClusterShardingSettings {
updatingStateTimeout = updatingStateTimeout,
waitingForStateTimeout = waitingForStateTimeout,
coordinatorStateWriteMajorityPlus = coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus = coordinatorStateReadMajorityPlus)
coordinatorStateReadMajorityPlus = coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit = leastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit = leastShardAllocationRelativeLimit)
override def toString =
s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout,$coordinatorStateReadMajorityPlus,$coordinatorStateReadMajorityPlus)"""
s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout,$coordinatorStateReadMajorityPlus,$coordinatorStateReadMajorityPlus,$leastShardAllocationAbsoluteLimit,$leastShardAllocationRelativeLimit)"""
}
}

View file

@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import akka.actor.ActorRefProvider
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
@ -28,7 +29,7 @@ import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity }
@ -279,9 +280,17 @@ import akka.util.JavaDurationConverters._
}
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
if (settings.tuningParameters.leastShardAllocationAbsoluteLimit > 0) {
// new algorithm
val absoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit
val relativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit)
} else {
// old algorithm
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
new ShardCoordinator.LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
}
override lazy val shardState: ActorRef[ClusterShardingQuery] = {

View file

@ -119,11 +119,10 @@ object ClusterSharding {
* location.
*
* The logic that decides which shards to rebalance is defined in a plugable shard
* allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]]
* picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
* allocation strategy. The default implementation `LeastShardAllocationStrategy`
* picks shards for handoff from the ShardRegion` with most number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
* must be to begin the rebalancing. This strategy can be replaced by an application specific
* i.e. new members in the cluster. This strategy can be replaced by an application specific
* implementation.
*
* The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or
@ -213,8 +212,7 @@ abstract class ClusterSharding {
def shardState: ActorRef[ClusterShardingQuery]
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
* The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}

View file

@ -213,8 +213,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding =
def shardState: ActorRef[ClusterShardingQuery]
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
* The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy

View file

@ -14,7 +14,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.ClusterShardingQuery
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.javadsl
@ -73,7 +73,7 @@ class TestEntityRefSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike w
override def defaultShardAllocationStrategy(
settings: ClusterShardingSettings): ShardCoordinator.ShardAllocationStrategy =
new LeastShardAllocationStrategy(1, 1)
ShardAllocationStrategy.leastShardAllocationStrategy(1, 0.1)
// below are for javadsl
override def init[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = ???

View file

@ -98,8 +98,28 @@ akka.cluster.sharding {
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot)
keep-nr-of-batches = 2
# Setting for the default shard allocation strategy
# Settings for LeastShardAllocationStrategy.
#
# A new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in
# less rebalance rounds (typically 1 or 2 rounds). The amount of shards to rebalance in each
# round can still be limited to make it progress slower. For backwards compatibility
# the new algorithm is not enabled by default. Enable the new algorithm by setting
# `rebalance-absolute-limit` > 0, for example:
# akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit=20
# The new algorithm is recommended and will become the default in future versions of Akka.
least-shard-allocation-strategy {
# Maximum number of shards that will be rebalanced in one rebalance round.
# The lower of this and `rebalance-relative-limit` will be used.
rebalance-absolute-limit = 0
# Maximum number of shards that will be rebalanced in one rebalance round.
# Fraction of total number of (known) shards.
# The lower of this and `rebalance-absolute-limit` will be used.
rebalance-relative-limit = 0.1
# Deprecated: Use rebalance-absolute-limit and rebalance-relative-limit instead. This property is not used
# when rebalance-absolute-limit > 0.
#
# Threshold of how large the difference between most and least number of
# allocated shards must be to begin the rebalancing.
# The difference between number of shards in the region with most shards and
@ -112,6 +132,9 @@ akka.cluster.sharding {
# on different nodes before rebalance will occur.
rebalance-threshold = 1
# Deprecated: Use rebalance-absolute-limit and rebalance-relative-limit instead. This property is not used
# when rebalance-absolute-limit > 0.
#
# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
}

View file

@ -113,11 +113,10 @@ import akka.util.ccompat.JavaConverters._
*
* '''Shard Allocation''':
* The logic deciding which shards to rebalance is defined in a plugable shard allocation
* strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]]
* strategy. The default implementation `LeastShardAllocationStrategy`
* picks shards for handoff from the `ShardRegion` with highest number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with lowest number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable `rebalance-threshold` of how large
* the difference must be to begin the rebalancing. This strategy can be replaced by an application
* i.e. new members in the cluster. This strategy can be replaced by an application
* specific implementation.
*
* '''Recovery''':
@ -172,7 +171,6 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv
*/
class ClusterSharding(system: ExtendedActorSystem) extends Extension {
import ClusterShardingGuardian._
import ShardCoordinator.LeastShardAllocationStrategy
import ShardCoordinator.ShardAllocationStrategy
private val log = Logging(system, this.getClass)
@ -656,13 +654,20 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
}
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
* The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
if (settings.tuningParameters.leastShardAllocationAbsoluteLimit > 0) {
// new algorithm
val absoluteLimit = settings.tuningParameters.leastShardAllocationAbsoluteLimit
val relativeLimit = settings.tuningParameters.leastShardAllocationRelativeLimit
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit)
} else {
// old algorithm
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
new ShardCoordinator.LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
}
}

View file

@ -84,7 +84,9 @@ object ClusterShardingSettings {
entityRecoveryConstantRateStrategyNumberOfEntities =
config.getInt("entity-recovery-constant-rate-strategy.number-of-entities"),
coordinatorStateWriteMajorityPlus = configMajorityPlus("coordinator-state.write-majority-plus"),
coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus"))
coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus"),
leastShardAllocationAbsoluteLimit = config.getInt("least-shard-allocation-strategy.rebalance-absolute-limit"),
leastShardAllocationRelativeLimit = config.getDouble("least-shard-allocation-strategy.rebalance-relative-limit"))
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
@ -148,12 +150,62 @@ object ClusterShardingSettings {
val entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
val entityRecoveryConstantRateStrategyNumberOfEntities: Int,
val coordinatorStateWriteMajorityPlus: Int,
val coordinatorStateReadMajorityPlus: Int) {
val coordinatorStateReadMajorityPlus: Int,
val leastShardAllocationAbsoluteLimit: Int,
val leastShardAllocationRelativeLimit: Double) {
require(
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'")
// included for binary compatibility
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including " +
"leastShardAllocationAbsoluteLimit and leastShardAllocationRelativeLimit instead",
since = "2.6.10")
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
keepNrOfBatches: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration,
entityRecoveryStrategy: String,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
entityRecoveryConstantRateStrategyNumberOfEntities: Int,
coordinatorStateWriteMajorityPlus: Int,
coordinatorStateReadMajorityPlus: Int) =
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
keepNrOfBatches,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit = 100,
leastShardAllocationRelativeLimit = 0.1)
// included for binary compatibility
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including " +

View file

@ -73,6 +73,51 @@ object ShardCoordinator {
majorityMinCap,
rememberEntitiesStoreProvider)).withDeploy(Deploy.local)
/**
* Java API: `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least
* number of previously allocated shards.
*
* When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node.
* The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number
* of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of
* previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each
* round can be limited to make it progress slower since rebalancing too many shards at the same time could
* result in additional load on the system. For example, causing many Event Sourced entites to be started
* at the same time.
*
* It will not rebalance when there is already an ongoing rebalance in progress.
*
* @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round
* @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced
* in one rebalance round
*/
def leastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double): ShardAllocationStrategy =
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit)
object ShardAllocationStrategy {
/**
* Scala API: `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least
* number of previously allocated shards.
*
* When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node.
* The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number
* of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of
* previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each
* round can be limited to make it progress slower since rebalancing too many shards at the same time could
* result in additional load on the system. For example, causing many Event Sourced entites to be started
* at the same time.
*
* It will not rebalance when there is already an ongoing rebalance in progress.
*
* @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round
* @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced
* in one rebalance round
*/
def leastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double): ShardAllocationStrategy =
new internal.LeastShardAllocationStrategy(absoluteLimit, relativeLimit)
}
/**
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
*
@ -177,7 +222,12 @@ object ShardCoordinator {
private val emptyRebalanceResult = Future.successful(Set.empty[ShardId])
/**
* The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]]
* Use [[akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy.leastShardAllocationStrategy]] instead.
* The new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in
* less rebalance rounds (typically 1 or 2 rounds). The amount of shards to rebalance in each
* round can still be limited to make it progress slower.
*
* This implementation of [[ShardCoordinator.ShardAllocationStrategy]]
* allocates new shards to the `ShardRegion` with least number of previously allocated shards.
*
* When a node is removed from the cluster the shards on that node will be started on the remaining nodes,

View file

@ -0,0 +1,125 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal
import scala.collection.immutable
import scala.concurrent.Future
import akka.actor.ActorRef
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.ShardId
/**
* INTERNAL API
*/
@InternalApi private[akka] object LeastShardAllocationStrategy {
private val emptyRebalanceResult = Future.successful(Set.empty[ShardId])
}
/**
* INTERNAL API: Use `ShardCoordinator.ShardAllocationStrategy.leastShardAllocationStrategy` factory method.
*
* `ShardAllocationStrategy` that allocates new shards to the `ShardRegion` (node) with least
* number of previously allocated shards.
*
* When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node.
* The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number
* of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of
* previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each
* round can be limited to make it progress slower since rebalancing too many shards at the same time could
* result in additional load on the system. For example, causing many Event Sourced entites to be started
* at the same time.
*
* It will not rebalance when there is already an ongoing rebalance in progress.
*
* @param absoluteLimit the maximum number of shards that will be rebalanced in one rebalance round
* @param relativeLimit fraction (< 1.0) of total number of (known) shards that will be rebalanced
* in one rebalance round
*/
@InternalApi private[akka] final class LeastShardAllocationStrategy(absoluteLimit: Int, relativeLimit: Double)
extends ShardAllocationStrategy {
import LeastShardAllocationStrategy.emptyRebalanceResult
override def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = {
val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) => v.size }
Future.successful(regionWithLeastShards)
}
override def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
import math.max
import math.min
def limit(numberOfShards: Int): Int =
max(1, min((relativeLimit * numberOfShards).toInt, absoluteLimit))
def rebalancePhase1(
numberOfShards: Int,
optimalPerRegion: Int,
sortedAllocations: Vector[immutable.IndexedSeq[ShardId]]): Set[ShardId] = {
val selected = Vector.newBuilder[ShardId]
sortedAllocations.foreach { shards =>
if (shards.size > optimalPerRegion) {
selected ++= shards.take(shards.size - optimalPerRegion)
}
}
val result = selected.result()
result.take(limit(numberOfShards)).toSet
}
def rebalancePhase2(
numberOfShards: Int,
optimalPerRegion: Int,
sortedAllocations: Vector[immutable.IndexedSeq[ShardId]]): Future[Set[ShardId]] = {
// In the first phase the optimalPerRegion is rounded up, and depending on number of shards per region and number
// of regions that might not be the exact optimal.
// In second phase we look for diff of >= 2 below optimalPerRegion and rebalance that number of shards.
val countBelowOptimal =
sortedAllocations.iterator.map(shards => max(0, (optimalPerRegion - 1) - shards.size)).sum
if (countBelowOptimal == 0) {
emptyRebalanceResult
} else {
val selected = Vector.newBuilder[ShardId]
sortedAllocations.foreach { shards =>
if (shards.size >= optimalPerRegion) {
selected += shards.head
}
}
val result = selected.result().take(min(countBelowOptimal, limit(numberOfShards))).toSet
Future.successful(result)
}
}
if (rebalanceInProgress.nonEmpty) {
// one rebalance at a time
emptyRebalanceResult
} else {
val numberOfShards = currentShardAllocations.valuesIterator.map(_.size).sum
val numberOfRegions = currentShardAllocations.size
if (numberOfRegions == 0 || numberOfShards == 0) {
emptyRebalanceResult
} else {
val sortedAllocations = currentShardAllocations.valuesIterator.toVector.sortBy(_.size)
val optimalPerRegion = numberOfShards / numberOfRegions + (if (numberOfShards % numberOfRegions == 0) 0 else 1)
val result1 = rebalancePhase1(numberOfShards, optimalPerRegion, sortedAllocations)
if (result1.nonEmpty) {
Future.successful(result1)
} else {
rebalancePhase2(numberOfShards, optimalPerRegion, sortedAllocations)
}
}
}
}
override def toString: ShardId =
s"LeastShardAllocationStrategy($absoluteLimit,$relativeLimit)"
}

View file

@ -7,6 +7,7 @@ package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor._
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.remote.testconductor.RoleName
import akka.testkit._
@ -59,8 +60,7 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard
entityProps = Props[ShardedEntity](),
extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractShardId = MultiNodeClusterShardingSpec.intExtractShardId,
allocationStrategy =
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1),
allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0),
handOffStopMessage = ShardedEntity.Stop)
lazy val region = ClusterSharding(system).shardRegion(typeName)

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
import akka.actor._
import akka.cluster.MemberStatus
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats }
import akka.testkit._
import akka.util.ccompat._
@ -57,8 +58,7 @@ abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMin
entityProps = TestActors.echoActorProps,
extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractShardId = MultiNodeClusterShardingSpec.intExtractShardId,
allocationStrategy =
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1),
allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0),
handOffStopMessage = ShardedEntity.Stop)
}

View file

@ -13,6 +13,7 @@ import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped }
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate }
import akka.cluster.sharding.internal.{ DDataRememberEntitiesProvider, EventSourcedRememberEntitiesProvider }
import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings }
@ -150,8 +151,8 @@ abstract class ClusterShardingSpecConfig(
number-of-entities = 1
}
least-shard-allocation-strategy {
rebalance-threshold = 1
max-simultaneous-rebalance = 1
rebalance-absolute-limit = 1
rebalance-relative-limit = 1.0
}
}
akka.testconductor.barrier-timeout = 70s
@ -299,7 +300,7 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
def coordinatorProps(typeName: String, rebalanceEnabled: Boolean, rememberEntities: Boolean): Props = {
val allocationStrategy =
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 2, relativeLimit = 1.0)
val cfg = ConfigFactory.parseString(s"""
handoff-timeout = 10s
shard-start-timeout = 10s

View file

@ -9,7 +9,7 @@ import scala.concurrent.duration._
import akka.actor.{ Actor, ActorRef, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator.Internal.ShardStopped
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.{ ExtractEntityId, ExtractShardId, HandOffStopper, Msg }
import akka.testkit.WithLogCapturing
import akka.testkit.{ AkkaSpec, TestProbe }
@ -74,7 +74,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
settings = settingsWithRole,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy = new LeastShardAllocationStrategy(3, 4),
allocationStrategy = ShardAllocationStrategy.leastShardAllocationStrategy(3, 0.1),
handOffStopMessage = PoisonPill)
probe.expectMsg(StartingProxy(typeName, settingsWithRole.role, None, extractEntityId, extractShardId))

View file

@ -0,0 +1,146 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.ActorRef
import akka.actor.Props
import akka.testkit.AkkaSpec
class DeprecatedLeastShardAllocationStrategySpec extends AkkaSpec {
import ShardCoordinator._
val regionA = system.actorOf(Props.empty, "regionA")
val regionB = system.actorOf(Props.empty, "regionB")
val regionC = system.actorOf(Props.empty, "regionC")
def createAllocations(aCount: Int, bCount: Int = 0, cCount: Int = 0): Map[ActorRef, Vector[String]] = {
val shards = (1 to (aCount + bCount + cCount)).map(n => ("00" + n.toString).takeRight(3))
Map(
regionA -> shards.take(aCount).toVector,
regionB -> shards.slice(aCount, aCount + bCount).toVector,
regionC -> shards.takeRight(cCount).toVector)
}
"DeprecatedLeastShardAllocationStrategy" must {
"allocate to region with least number of shards" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 1)
allocationStrategy.allocateShard(regionA, "003", allocations).futureValue should ===(regionC)
}
"rebalance from region with most number of shards [2, 0, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set.empty[String])
}
"not rebalance when diff equal to threshold, [1, 1, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 1)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [1, 2, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002"))
allocationStrategy.rebalance(allocations, Set("002")).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [3, 0, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 3)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("002"))
}
"rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 4, bCount = 4)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005"))
}
"rebalance from region with most number of shards [4, 4, 2], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 4, bCount = 4, cCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
// not optimal, 005 stopped and started again, but ok
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005"))
}
"rebalance from region with most number of shards [1, 3, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 2)
// so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
val allocations2 = createAllocations(aCount = 1, bCount = 3)
allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("002"))
allocationStrategy.rebalance(allocations2, Set("002")).futureValue should ===(Set.empty[String])
}
"not rebalance when diff equal to threshold, [2, 2, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 2, bCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [3, 3, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 3, bCount = 3)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("004"))
allocationStrategy.rebalance(allocations, Set("001", "004")).futureValue should ===(Set.empty)
}
"rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 4, bCount = 4)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("005", "006"))
allocationStrategy.rebalance(allocations, Set("001", "002", "005", "006")).futureValue should ===(Set.empty)
}
"rebalance from region with most number of shards [5, 5, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 5, bCount = 5)
// optimal would => [4, 4, 2] or even => [3, 4, 3]
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
// if 001 and 002 are not started quickly enough this is stopping more than optimal
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("006", "007"))
allocationStrategy.rebalance(allocations, Set("001", "002", "006", "007")).futureValue should ===(Set("003"))
}
"rebalance from region with most number of shards [50, 50, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 100)
val allocations = createAllocations(aCount = 50, cCount = 50)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("051", "052"))
allocationStrategy.rebalance(allocations, Set("001", "002", "051", "052")).futureValue should ===(
Set("003", "004"))
}
"limit number of simultaneous rebalance" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2)
val allocations = createAllocations(aCount = 1, bCount = 10)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002", "003"))
allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String])
}
"not pick shards that are in progress" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 4)
val allocations = createAllocations(aCount = 10)
allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set("001", "004"))
}
}
}

View file

@ -0,0 +1,126 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.Random
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.ShardId
import akka.testkit.AkkaSpec
class LeastShardAllocationStrategyRandomizedSpec extends AkkaSpec("akka.loglevel = INFO") {
import LeastShardAllocationStrategySpec.{ afterRebalance, countShards, countShardsPerRegion }
def createAllocations(countPerRegion: Map[ActorRef, Int]): Map[ActorRef, immutable.IndexedSeq[ShardId]] = {
countPerRegion.map {
case (region, count) =>
region -> (1 to count).map(n => ("00" + n.toString).takeRight(3)).map(n => s"${region.path.name}-$n").toVector
}
}
private val strategyWithoutLimits =
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 100000, relativeLimit = 1.0)
private val rndSeed = System.currentTimeMillis()
private val rnd = new Random(rndSeed)
info(s"Random seed: $rndSeed")
private var iteration = 1
private val iterationsPerTest = 10
private def testRebalance(
allocationStrategy: ShardAllocationStrategy,
maxRegions: Int,
maxShardsPerRegion: Int,
expectedMaxSteps: Int): Unit = {
(1 to iterationsPerTest).foreach { _ =>
iteration += 1
val numberOfRegions = rnd.nextInt(maxRegions) + 1
val regions = (1 to numberOfRegions).map(n => system.actorOf(Props.empty, s"$iteration-R$n"))
val countPerRegion = regions.map { region =>
region -> rnd.nextInt(maxShardsPerRegion)
}.toMap
val allocations = createAllocations(countPerRegion)
withClue(s"test $allocationStrategy [${countShardsPerRegion(allocations).mkString(",")}]: ") {
testRebalance(allocationStrategy, allocations, Vector(allocations), expectedMaxSteps)
}
regions.foreach(system.stop)
}
}
@tailrec private def testRebalance(
allocationStrategy: ShardAllocationStrategy,
allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
steps: Vector[Map[ActorRef, immutable.IndexedSeq[ShardId]]],
maxSteps: Int): Unit = {
val round = steps.size
val rebalanceResult = allocationStrategy.rebalance(allocations, Set.empty).value.get.get
val newAllocations = afterRebalance(allocationStrategy, allocations, rebalanceResult)
countShards(newAllocations) should ===(countShards(allocations))
val min = countShardsPerRegion(newAllocations).min
val max = countShardsPerRegion(newAllocations).max
val diff = max - min
val newSteps = steps :+ newAllocations
if (diff <= 1) {
if (round >= 3 && maxSteps <= 10) {
// Should be very rare (I have not seen it)
system.log.info(
s"rebalance solved in round $round, [${newSteps.map(step => countShardsPerRegion(step).mkString(",")).mkString(" => ")}]")
}
()
} else if (round == maxSteps) {
fail(
s"Couldn't solve rebalance in $round rounds, [${newSteps.map(step => countShardsPerRegion(step).mkString(",")).mkString(" => ")}]")
} else {
testRebalance(allocationStrategy, newAllocations, newSteps, maxSteps)
}
}
"LeastShardAllocationStrategy with random scenario" must {
"rebalance shards with max 5 regions / 5 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 5, maxShardsPerRegion = 5, expectedMaxSteps = 2)
}
"rebalance shards with max 5 regions / 100 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 5, maxShardsPerRegion = 100, expectedMaxSteps = 2)
}
"rebalance shards with max 20 regions / 5 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 5, expectedMaxSteps = 2)
}
"rebalance shards with max 20 regions / 20 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 20, expectedMaxSteps = 2)
}
"rebalance shards with max 20 regions / 200 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 20, maxShardsPerRegion = 200, expectedMaxSteps = 5)
}
"rebalance shards with max 100 regions / 100 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 100, maxShardsPerRegion = 100, expectedMaxSteps = 5)
}
"rebalance shards with max 100 regions / 1000 shards" in {
testRebalance(strategyWithoutLimits, maxRegions = 100, maxShardsPerRegion = 1000, expectedMaxSteps = 5)
}
"rebalance shards with max 20 regions / 20 shards and limits" in {
val absoluteLimit = 3 + rnd.nextInt(7) + 3
val relativeLimit = 0.05 + (rnd.nextDouble() * 0.95)
val strategy = ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit, relativeLimit)
testRebalance(strategy, maxRegions = 20, maxShardsPerRegion = 20, expectedMaxSteps = 20)
}
}
}

View file

@ -4,143 +4,186 @@
package akka.cluster.sharding
import scala.collection.immutable
import akka.actor.ActorPath
import akka.actor.ActorRef
import akka.actor.ActorRefProvider
import akka.actor.Address
import akka.actor.MinimalActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.ShardId
import akka.testkit.AkkaSpec
class LeastShardAllocationStrategySpec extends AkkaSpec {
import ShardCoordinator._
object LeastShardAllocationStrategySpec {
val regionA = system.actorOf(Props.empty, "regionA")
val regionB = system.actorOf(Props.empty, "regionB")
val regionC = system.actorOf(Props.empty, "regionC")
private object DummyActorRef extends MinimalActorRef {
override val path: ActorPath = RootActorPath(Address("akka", "myapp")) / "system" / "fake"
override def provider: ActorRefProvider = ???
}
def afterRebalance(
allocationStrategy: ShardAllocationStrategy,
allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalance: Set[ShardId]): Map[ActorRef, immutable.IndexedSeq[ShardId]] = {
val allocationsAfterRemoval = allocations.map {
case (region, shards) => region -> shards.filterNot(rebalance)
}
rebalance.toList.sorted.foldLeft(allocationsAfterRemoval) {
case (acc, shard) =>
val region = allocationStrategy.allocateShard(DummyActorRef, shard, acc).value.get.get
acc.updated(region, acc(region) :+ shard)
}
}
def countShardsPerRegion(newAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Vector[Int] = {
newAllocations.valuesIterator.map(_.size).toVector
}
def countShards(allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Int = {
countShardsPerRegion(allocations).sum
}
def allocationCountsAfterRebalance(
allocationStrategy: ShardAllocationStrategy,
allocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalance: Set[ShardId]): Vector[Int] = {
countShardsPerRegion(afterRebalance(allocationStrategy, allocations, rebalance))
}
}
class LeastShardAllocationStrategySpec extends AkkaSpec {
import LeastShardAllocationStrategySpec.{ afterRebalance, allocationCountsAfterRebalance }
private val regionA = system.actorOf(Props.empty, "regionA")
private val regionB = system.actorOf(Props.empty, "regionB")
private val regionC = system.actorOf(Props.empty, "regionC")
private val shards = (1 to 999).map(n => ("00" + n.toString).takeRight(3))
def createAllocations(aCount: Int, bCount: Int = 0, cCount: Int = 0): Map[ActorRef, Vector[String]] = {
val shards = (1 to (aCount + bCount + cCount)).map(n => ("00" + n.toString).takeRight(3))
Map(
regionA -> shards.take(aCount).toVector,
regionB -> shards.slice(aCount, aCount + bCount).toVector,
regionC -> shards.takeRight(cCount).toVector)
regionC -> shards.slice(aCount + bCount, aCount + bCount + cCount).toVector)
}
private val strategyWithoutLimits =
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1000, relativeLimit = 1.0)
"LeastShardAllocationStrategy" must {
"allocate to region with least number of shards" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 10)
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 1, bCount = 1)
allocationStrategy.allocateShard(regionA, "003", allocations).futureValue should ===(regionC)
}
"rebalance from region with most number of shards [2, 0, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set.empty[String])
"rebalance shards [1, 2, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 1, bCount = 2)
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("002"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 1))
}
"not rebalance when diff equal to threshold, [1, 1, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
"rebalance shards [2, 0, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 2)
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("001"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 0))
}
"not rebalance shards [1, 1, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 1, bCount = 1)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [1, 2, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002"))
allocationStrategy.rebalance(allocations, Set("002")).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [3, 0, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
"rebalance shards [3, 0, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 3)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("002"))
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("001", "002"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(1, 1, 1))
}
"rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
"rebalance shards [4, 4, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 4, bCount = 4)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005"))
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("001", "005"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(3, 3, 2))
}
"rebalance from region with most number of shards [4, 4, 2], rebalanceThreshold=1" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10)
"rebalance shards [4, 4, 2]" in {
// this is handled by phase 2, to find diff of 2
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 4, bCount = 4, cCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
// not optimal, 005 stopped and started again, but ok
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005"))
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("001"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(3, 4, 3))
}
"rebalance from region with most number of shards [1, 3, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 1, bCount = 2)
// so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
val allocations2 = createAllocations(aCount = 1, bCount = 3)
allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("002"))
allocationStrategy.rebalance(allocations2, Set("002")).futureValue should ===(Set.empty[String])
}
"not rebalance when diff equal to threshold, [2, 2, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 2, bCount = 2)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
}
"rebalance from region with most number of shards [3, 3, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 3, bCount = 3)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001"))
allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("004"))
allocationStrategy.rebalance(allocations, Set("001", "004")).futureValue should ===(Set.empty)
}
"rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
val allocations = createAllocations(aCount = 4, bCount = 4)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("005", "006"))
allocationStrategy.rebalance(allocations, Set("001", "002", "005", "006")).futureValue should ===(Set.empty)
}
"rebalance from region with most number of shards [5, 5, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10)
"rebalance shards [5, 5, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 5, bCount = 5)
// optimal would => [4, 4, 2] or even => [3, 4, 3]
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
// if 001 and 002 are not started quickly enough this is stopping more than optimal
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("006", "007"))
allocationStrategy.rebalance(allocations, Set("001", "002", "006", "007")).futureValue should ===(Set("003"))
val result1 = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result1 should ===(Set("001", "006"))
// so far [4, 4, 2]
allocationCountsAfterRebalance(allocationStrategy, allocations, result1) should ===(Vector(4, 4, 2))
val allocations2 = afterRebalance(allocationStrategy, allocations, result1)
// second phase will find the diff of 2, resulting in [3, 4, 3]
val result2 = allocationStrategy.rebalance(allocations2, Set.empty).futureValue
result2 should ===(Set("002"))
allocationCountsAfterRebalance(allocationStrategy, allocations2, result2) should ===(Vector(3, 4, 3))
}
"rebalance from region with most number of shards [50, 50, 0], rebalanceThreshold=2" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 100)
"rebalance shards [50, 50, 0]" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 50, cCount = 50)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002"))
allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("051", "052"))
allocationStrategy.rebalance(allocations, Set("001", "002", "051", "052")).futureValue should ===(
Set("003", "004"))
val result1 = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result1 should ===(shards.take(50 - 34).toSet ++ shards.drop(50).take(50 - 34))
// so far [34, 34, 32]
allocationCountsAfterRebalance(allocationStrategy, allocations, result1).sorted should ===(
Vector(34, 34, 32).sorted)
val allocations2 = afterRebalance(allocationStrategy, allocations, result1)
// second phase will find the diff of 2, resulting in [33, 34, 33]
val result2 = allocationStrategy.rebalance(allocations2, Set.empty).futureValue
result2 should ===(Set("017"))
allocationCountsAfterRebalance(allocationStrategy, allocations2, result2).sorted should ===(
Vector(33, 34, 33).sorted)
}
"limit number of simultaneous rebalance" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2)
val allocations = createAllocations(aCount = 1, bCount = 10)
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002", "003"))
"respect absolute limit of number shards" in {
val allocationStrategy =
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 3, relativeLimit = 1.0)
val allocations = createAllocations(aCount = 1, bCount = 9)
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("002", "003", "004"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(2, 6, 2))
}
"respect relative limit of number shards" in {
val allocationStrategy =
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 5, relativeLimit = 0.3)
val allocations = createAllocations(aCount = 1, bCount = 9)
val result = allocationStrategy.rebalance(allocations, Set.empty).futureValue
result should ===(Set("002", "003", "004"))
allocationCountsAfterRebalance(allocationStrategy, allocations, result) should ===(Vector(2, 6, 2))
}
"not rebalance when in progress" in {
val allocationStrategy = strategyWithoutLimits
val allocations = createAllocations(aCount = 10)
allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String])
}
"not pick shards that are in progress" in {
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 4)
val allocations = createAllocations(aCount = 10)
allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set("001", "004"))
}
}
}

View file

@ -19,9 +19,10 @@ import akka.testkit.WithLogCapturing
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
object RememberEntitiesFailureSpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel = DEBUG
@ -320,7 +321,7 @@ class RememberEntitiesFailureSpec
ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId,
extractShardId,
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 3),
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1, relativeLimit = 0.1),
"graceful-stop")
val probe = TestProbe()
@ -361,7 +362,7 @@ class RememberEntitiesFailureSpec
ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId,
extractShardId,
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 3),
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1, relativeLimit = 0.1),
"graceful-stop")
val probe = TestProbe()

View file

@ -164,9 +164,33 @@ The `number-of-shards` configuration value must be the same for all nodes in the
configuration check when joining. Changing the value requires stopping all nodes in the cluster.
The shards are allocated to the nodes in the cluster. The decision of where to allocate a shard is done
by a shard allocation strategy. The default implementation @apidoc[ShardCoordinator.LeastShardAllocationStrategy]
allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards.
This strategy can be replaced by an application specific implementation.
by a shard allocation strategy.
The default implementation `LeastShardAllocationStrategy` allocates new shards to the `ShardRegion` (node) with least
number of previously allocated shards. This strategy can be replaced by an application specific implementation.
When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node.
The `LeastShardAllocationStrategy` picks shards for rebalancing from the `ShardRegion`s with most number
of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of
previously allocated shards, i.e. new members in the cluster. The amount of shards to rebalance in each
round can be limited to make it progress slower since rebalancing too many shards at the same time could
result in additional load on the system. For example, causing many Event Sourced entites to be started
at the same time.
A new rebalance algorithm was included in Akka 2.6.10. It can reach optimal balance in a few rebalance rounds
(typically 1 or 2 rounds). For backwards compatibility the new algorithm is not enabled by default.
The new algorithm is recommended and will become the default in future versions of Akka.
You enable the new algorithm by setting `rebalance-absolute-limit` > 0, for example:
```
akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit = 20
```
The `rebalance-absolute-limit` is the maximum number of shards that will be rebalanced in one rebalance round.
You may also want to tune the `akka.cluster.sharding.least-shard-allocation-strategy.rebalance-relative-limit`.
The `rebalance-relative-limit` is a fraction (< 1.0) of total number of (known) shards that will be rebalanced
in one rebalance round. The lower result of `rebalance-relative-limit` and `rebalance-absolute-limit` will be used.
### External shard allocation
@ -538,7 +562,10 @@ properties are read by the `ClusterShardingSettings` when created with an ActorS
It is also possible to amend the `ClusterShardingSettings` or create it from another config section
with the same layout as below.
One important configuration property is `number-of-shards` as described in @ref:[Shard allocation](#shard-allocation)
One important configuration property is `number-of-shards` as described in @ref:[Shard allocation](#shard-allocation).
You may also need to tune the configuration properties is `rebalance-absolute-limit` and `rebalance-relative-limit`
as described in @ref:[Shard allocation](#shard-allocation).
@@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config }