optimization to improve the time to rebalance shards

This commit is contained in:
Abe Sanderson 2016-08-10 12:54:08 -06:00
parent d5f84d4ad8
commit 9c558016a2
2 changed files with 12 additions and 3 deletions

View file

@ -153,7 +153,7 @@ object ShardCoordinator {
case (_, v) v.filterNot(s rebalanceInProgress(s))
}.maxBy(_.size)
if (mostShards.size - leastShards.size >= rebalanceThreshold)
Future.successful(Set(mostShards.head))
Future.successful(mostShards.take(maxSimultaneousRebalance - rebalanceInProgress.size).toSet)
else
emptyRebalanceResult
} else emptyRebalanceResult

View file

@ -31,14 +31,23 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set.empty[String])
val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4"))
Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2"))
Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2", "shard3"))
Await.result(allocationStrategy.rebalance(allocations2, Set("shard4")), 3.seconds) should ===(Set.empty[String])
val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6"))
Await.result(allocationStrategy.rebalance(allocations3, Set("shard1")), 3.seconds) should ===(Set("shard2"))
}
"must limit number of simultanious rebalance" in {
"rebalance multiple shards if max simultaneous rebalances is not exceeded" in {
val allocations = Map(
regionA Vector("shard1"),
regionB Vector("shard2", "shard3", "shard4", "shard5", "shard6"),
regionC Vector.empty)
Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set("shard2", "shard3"))
Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String])
}
"limit number of simultaneous rebalance" in {
val allocations = Map(
regionA Vector("shard1"),
regionB Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC Vector.empty)