From 6946a04e20e22b6887403627b45efb5c8434a9c4 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 18 Jun 2019 11:28:37 +0100 Subject: [PATCH] Log rebalance progress at INFO per rebalance info For cases where rebalance takes a long time due to a relatively large cluster change e.g. when doubling number of nodes (common for 1-2, 3-6 etc) there is little insignt into what is happening unless DEBUG logging is enabled. This adds an INFO log per rebalance-interval (default is 10s) to show what is in progress and which new shards are starting rebalancing. --- akka-cluster-sharding/src/main/resources/reference.conf | 1 + .../scala/akka/cluster/sharding/ShardCoordinator.scala | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 4ca6b105a6..1b0f0f7375 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -98,6 +98,7 @@ akka.cluster.sharding { # The difference between number of shards in the region with most shards and # the region with least shards must be greater than (>) the `rebalanceThreshold` # for the rebalance to occur. + # It is also the maximum number of shards that will start rebalancing per rebalance-interval # 1 gives the best distribution and therefore typically the best choice. # Increasing the threshold can result in quicker rebalance but has the # drawback of increased difference between number of shards (and therefore load) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index a750473544..f8c19711dd 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -866,7 +866,13 @@ abstract class ShardCoordinator( } } - def continueRebalance(shards: Set[ShardId]): Unit = + def continueRebalance(shards: Set[ShardId]): Unit = { + if (log.isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) { + log.info( + "Starting rebalance for shards [{}]. Current shards rebalancing: [{}]", + shards.mkString(","), + rebalanceInProgress.keySet.mkString(",")) + } shards.foreach { shard => if (!rebalanceInProgress.contains(shard)) { state.shards.get(shard) match { @@ -885,6 +891,7 @@ abstract class ShardCoordinator( } } + } }