diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index be9314aea2..bd503fbd3e 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -95,7 +95,14 @@ akka.cluster.sharding { least-shard-allocation-strategy { # Threshold of how large the difference between most and least number of # allocated shards must be to begin the rebalancing. - rebalance-threshold = 10 + # The difference between number of shards in the region with most shards and + # the region with least shards must be greater than (>) the `rebalanceThreshold` + # for the rebalance to occur. + # 1 gives the best distribution and therefore typically the best choice. + # Increasing the threshold can result in quicker rebalance but has the + # drawback of increased difference between number of shards (and therefore load) + # on different nodes before rebalance will occur. + rebalance-threshold = 1 # The number of ongoing rebalancing processes is limited to this number. max-simultaneous-rebalance = 3 diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 3584737977..43678a1cf7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -135,10 +135,26 @@ object ShardCoordinator { /** * The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]] * allocates new shards to the `ShardRegion` with least number of previously allocated shards. - * It picks shards for rebalancing handoff from the `ShardRegion` with most number of previously allocated shards. + * + * When a node is removed from the cluster the shards on that node will be started on the remaining nodes, + * evenly spread on the remaining nodes (by picking regions with least shards). + * + * When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + * It picks shards for rebalancing from the `ShardRegion` with most number of previously allocated shards. * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, * i.e. new members in the cluster. There is a configurable threshold of how large the difference - * must be to begin the rebalancing. The number of ongoing rebalancing processes can be limited. + * must be to begin the rebalancing. The difference between number of shards in the region with most shards and + * the region with least shards must be greater than the `rebalanceThreshold` for the rebalance to occur. + * + * A `rebalanceThreshold` of 1 gives the best distribution and therefore typically the best choice. + * A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one. + * That has the advantage that the rebalance process can be quicker but has the drawback that the + * the number of shards (and therefore load) between different nodes may be significantly different. + * Given the recommendation of using 10x shards than number of nodes and `rebalanceThreshold=10` can result + * in one node hosting ~2 times the number of shards of other nodes. Example: 1000 shards on 100 nodes means + * 10 shards per node. One node may have 19 shards and others 10 without a rebalance occurring. + * + * The number of ongoing rebalancing processes can be limited by `maxSimultaneousRebalance`. */ @SerialVersionUID(1L) class LeastShardAllocationStrategy(rebalanceThreshold: Int, maxSimultaneousRebalance: Int) @@ -159,9 +175,12 @@ object ShardCoordinator { case (_, v) ⇒ v.filterNot(s ⇒ rebalanceInProgress(s)) }.maxBy(_.size) val difference = mostShards.size - leastShards.size - if (difference >= rebalanceThreshold) - Future.successful(mostShards.take(math.min(difference, maxSimultaneousRebalance - rebalanceInProgress.size)).toSet) - else + if (difference > rebalanceThreshold) { + val n = math.min( + math.min(difference - rebalanceThreshold, rebalanceThreshold), + maxSimultaneousRebalance - rebalanceInProgress.size) + Future.successful(mostShards.sorted.take(n).toSet) + } else emptyRebalanceResult } else emptyRebalanceResult } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index d72da3fa0c..8150050ece 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -158,7 +158,7 @@ abstract class ClusterShardingSpecConfig( number-of-entities = 1 } least-shard-allocation-strategy { - rebalance-threshold = 2 + rebalance-threshold = 1 max-simultaneous-rebalance = 1 } distributed-data.durable.lmdb { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala index 5de5cbeb2f..27b897ee54 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding +import akka.actor.ActorRef import akka.actor.Props import akka.testkit.AkkaSpec @@ -14,61 +15,132 @@ class LeastShardAllocationStrategySpec extends AkkaSpec { val regionB = system.actorOf(Props.empty, "regionB") val regionC = system.actorOf(Props.empty, "regionC") - val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2) + def createAllocations(aCount: Int, bCount: Int = 0, cCount: Int = 0): Map[ActorRef, Vector[String]] = { + val shards = (1 to (aCount + bCount + cCount)).map(n ⇒ ("00" + n.toString).takeRight(3)) + Map( + regionA -> shards.take(aCount).toVector, + regionB -> shards.slice(aCount, aCount + bCount).toVector, + regionC -> shards.takeRight(cCount).toVector) + } "LeastShardAllocationStrategy" must { "allocate to region with least number of shards" in { - val allocations = Map(regionA → Vector("shard1"), regionB → Vector("shard2"), regionC → Vector.empty) - allocationStrategy.allocateShard(regionA, "shard3", allocations).futureValue should ===(regionC) + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 1) + allocationStrategy.allocateShard(regionA, "003", allocations).futureValue should ===(regionC) } - "rebalance from region with most number of shards" in { - val allocations = Map(regionA → Vector("shard1"), regionB → Vector("shard2", "shard3"), - regionC → Vector.empty) + "rebalance from region with most number of shards [2, 0, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 2) - // so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set.empty[String]) + } + + "not rebalance when diff equal to threshold, [1, 1, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 1) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) + } + + "rebalance from region with most number of shards [1, 2, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 2) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002")) + allocationStrategy.rebalance(allocations, Set("002")).futureValue should ===(Set.empty[String]) + } + + "rebalance from region with most number of shards [3, 0, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 3) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("002")) + } + + "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4) + + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + } + + "rebalance from region with most number of shards [4, 4, 2], rebalanceThreshold=1" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 1, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4, cCount = 2) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + // not optimal, 005 stopped and started again, but ok + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("005")) + } + + "rebalance from region with most number of shards [1, 3, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 1, bCount = 2) + + // so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) - val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4")) - allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("shard2", "shard3")) - allocationStrategy.rebalance(allocations2, Set("shard4")).futureValue should ===(Set.empty[String]) - - val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6")) - allocationStrategy.rebalance(allocations3, Set("shard1")).futureValue should ===(Set("shard2")) + val allocations2 = createAllocations(aCount = 1, bCount = 3) + allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("002")) + allocationStrategy.rebalance(allocations2, Set("002")).futureValue should ===(Set.empty[String]) } - "rebalance multiple shards if max simultaneous rebalances is not exceeded" in { - val allocations = Map( - regionA → Vector("shard1"), - regionB → Vector("shard2", "shard3", "shard4", "shard5", "shard6"), - regionC → Vector.empty) + "not rebalance when diff equal to threshold, [2, 2, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 2, bCount = 2) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String]) + } - allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("shard2", "shard3")) - allocationStrategy.rebalance(allocations, Set("shard2", "shard3")).futureValue should ===(Set.empty[String]) + "rebalance from region with most number of shards [3, 3, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 3, bCount = 3) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001")) + allocationStrategy.rebalance(allocations, Set("001")).futureValue should ===(Set("004")) + allocationStrategy.rebalance(allocations, Set("001", "004")).futureValue should ===(Set.empty) + } + + "rebalance from region with most number of shards [4, 4, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 4, bCount = 4) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("005", "006")) + allocationStrategy.rebalance(allocations, Set("001", "002", "005", "006")).futureValue should ===(Set.empty) + } + + "rebalance from region with most number of shards [5, 5, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 10) + val allocations = createAllocations(aCount = 5, bCount = 5) + // optimal would => [4, 4, 2] or even => [3, 4, 3] + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + // if 001 and 002 are not started quickly enough this is stopping more than optimal + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("006", "007")) + allocationStrategy.rebalance(allocations, Set("001", "002", "006", "007")).futureValue should ===(Set("003")) + } + + "rebalance from region with most number of shards [50, 50, 0], rebalanceThreshold=2" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 100) + val allocations = createAllocations(aCount = 50, cCount = 50) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("001", "002")) + allocationStrategy.rebalance(allocations, Set("001", "002")).futureValue should ===(Set("051", "052")) + allocationStrategy.rebalance(allocations, Set("001", "002", "051", "052")) + .futureValue should ===(Set("003", "004")) } "limit number of simultaneous rebalance" in { - val allocations = Map( - regionA → Vector("shard1"), - regionB → Vector("shard2", "shard3", "shard4", "shard5", "shard6"), - regionC → Vector.empty) - - allocationStrategy.rebalance(allocations, Set("shard2")).futureValue should ===(Set("shard3")) - allocationStrategy.rebalance(allocations, Set("shard2", "shard3")).futureValue should ===(Set.empty[String]) + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2) + val allocations = createAllocations(aCount = 1, bCount = 10) + allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("002", "003")) + allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set.empty[String]) } - "don't rebalance excessive shards if maxSimultaneousRebalance > rebalanceThreshold" in { - val allocationStrategy = - new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 5) - - val allocations = Map( - regionA → Vector("shard1", "shard2", "shard3", "shard4", "shard5", "shard6", "shard7", "shard8"), - regionB → Vector("shard9", "shard10", "shard11", "shard12")) - - allocationStrategy.rebalance(allocations, Set("shard2")).futureValue should - ===(Set("shard1", "shard3", "shard4")) - allocationStrategy.rebalance(allocations, Set("shard5", "shard6", "shard7", "shard8")).futureValue should - ===(Set.empty[String]) + "not pick shards that are in progress" in { + val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 4) + val allocations = createAllocations(aCount = 10) + allocationStrategy.rebalance(allocations, Set("002", "003")).futureValue should ===(Set("001", "004")) } + } } diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index 6a390ef5cb..7b98e35e62 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -223,9 +223,17 @@ The logic that decides which shards to rebalance is defined in a pluggable shard allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy` picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. They will then be allocated to the `ShardRegion` with least number of previously allocated shards, -i.e. new members in the cluster. There is a configurable threshold of how large the difference -must be to begin the rebalancing. This strategy can be replaced by an application specific -implementation. +i.e. new members in the cluster. + +For the `LeastShardAllocationStrategy` there is a configurable threshold (`rebalance-threshold`) of +how large the difference must be to begin the rebalancing. The difference between number of shards in +the region with most shards and the region with least shards must be greater than the `rebalance-threshold` +for the rebalance to occur. + +A `rebalance-threshold` of 1 gives the best distribution and therefore typically the best choice. +A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one. +That has the advantage that the rebalance process can be quicker but has the drawback that the +the number of shards (and therefore load) between different nodes may be significantly different. ### Shard Coordinator State