From 9c558016a2be9dabe70df5adfabb92124947bf8a Mon Sep 17 00:00:00 2001 From: Abe Sanderson Date: Wed, 10 Aug 2016 12:54:08 -0600 Subject: [PATCH] optimization to improve the time to rebalance shards --- .../akka/cluster/sharding/ShardCoordinator.scala | 2 +- .../sharding/LeastShardAllocationStrategySpec.scala | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index e980f7ab9c..3ba933ee7d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -153,7 +153,7 @@ object ShardCoordinator { case (_, v) ⇒ v.filterNot(s ⇒ rebalanceInProgress(s)) }.maxBy(_.size) if (mostShards.size - leastShards.size >= rebalanceThreshold) - Future.successful(Set(mostShards.head)) + Future.successful(mostShards.take(maxSimultaneousRebalance - rebalanceInProgress.size).toSet) else emptyRebalanceResult } else emptyRebalanceResult diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala index 8717e9eb22..54fd1ef339 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala @@ -31,14 +31,23 @@ class LeastShardAllocationStrategySpec extends AkkaSpec { Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set.empty[String]) val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4")) - Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2")) + Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2", "shard3")) Await.result(allocationStrategy.rebalance(allocations2, Set("shard4")), 3.seconds) should ===(Set.empty[String]) val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6")) Await.result(allocationStrategy.rebalance(allocations3, Set("shard1")), 3.seconds) should ===(Set("shard2")) } - "must limit number of simultanious rebalance" in { + "rebalance multiple shards if max simultaneous rebalances is not exceeded" in { + val allocations = Map( + regionA → Vector("shard1"), + regionB → Vector("shard2", "shard3", "shard4", "shard5", "shard6"), + regionC → Vector.empty) + + Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set("shard2", "shard3")) + Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String]) + } + "limit number of simultaneous rebalance" in { val allocations = Map( regionA → Vector("shard1"), regionB → Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC → Vector.empty)