diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 0ad8718cf7..ae2e5a0c36 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -255,6 +255,7 @@ class ShardRegion( var regions = Map.empty[ActorRef, Set[ShardId]] var regionByShard = Map.empty[ShardId, ActorRef] var shardBuffers = Map.empty[ShardId, Vector[(Msg, ActorRef)]] + var loggedFullBufferWarning = false var shards = Map.empty[ShardId, ActorRef] var shardsByRef = Map.empty[ActorRef, ShardId] var handingOff = Set.empty[ActorRef] @@ -264,6 +265,7 @@ class ShardRegion( import context.dispatcher val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) + var retryCount = 0 // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { @@ -376,8 +378,10 @@ class ShardRegion( // must drop requests that came in between the BeginHandOff and now, // because they might be forwarded from other regions and there // is a risk or message re-ordering otherwise - if (shardBuffers.contains(shard)) + if (shardBuffers.contains(shard)) { shardBuffers -= shard + loggedFullBufferWarning = false + } if (shards.contains(shard)) { handingOff += shards(shard) @@ -391,6 +395,8 @@ class ShardRegion( def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match { case Retry ⇒ + if (shardBuffers.nonEmpty) + retryCount += 1 if (coordinator.isEmpty) register() else { @@ -444,6 +450,9 @@ class ShardRegion( def register(): Unit = { coordinatorSelection.foreach(_ ! registrationMessage) + if (shardBuffers.nonEmpty && retryCount >= 5) + log.warning("Trying to register to coordinator at [{}], but no acknoledgment. Total [{}] buffered messages.", + coordinatorSelection, totalBufferSize) } def registrationMessage: Any = @@ -451,8 +460,13 @@ class ShardRegion( def requestShardBufferHomes(): Unit = { shardBuffers.foreach { - case (shard, _) ⇒ coordinator.foreach { c ⇒ - log.debug("Retry request for shard [{}] homes", shard) + case (shard, buf) ⇒ coordinator.foreach { c ⇒ + val logMsg = "Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages." + if (retryCount >= 5) + log.warning(logMsg, shard, c, buf.size) + else + log.debug(logMsg, shard, c, buf.size) + c ! GetShardHome(shard) } } @@ -461,10 +475,13 @@ class ShardRegion( def deliverBufferedMessages(shard: String): Unit = { shardBuffers.get(shard) match { case Some(buf) ⇒ + log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shard) buf.foreach { case (msg, snd) ⇒ deliverMessage(msg, snd) } shardBuffers -= shard case None ⇒ } + loggedFullBufferWarning = false + retryCount = 0 } def deliverMessage(msg: Any, snd: ActorRef): Unit = @@ -480,6 +497,7 @@ class ShardRegion( coordinator.foreach(_ ! GetShardHome(shardId)) } val buf = shardBuffers.getOrElse(shardId, Vector.empty) + log.debug("Buffer message for shard [{}]. Total [{}] buffered messages.", shardId, buf.size + 1) shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) } @@ -499,12 +517,28 @@ class ShardRegion( log.debug("Request shard [{}] home", shardId) coordinator.foreach(_ ! GetShardHome(shardId)) } - if (totalBufferSize >= bufferSize) { - log.debug("Buffer is full, dropping message for shard [{}]", shardId) + val totBufSize = totalBufferSize + if (totBufSize >= bufferSize) { + if (loggedFullBufferWarning) + log.debug("Buffer is full, dropping message for shard [{}]", shardId) + else { + log.warning("Buffer is full, dropping message for shard [{}]", shardId) + loggedFullBufferWarning = true + } context.system.deadLetters ! msg } else { val buf = shardBuffers.getOrElse(shardId, Vector.empty) shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) + + // log some insight to how buffers are filled up every 10% of the buffer capacity + val tot = totBufSize + 1 + if (tot % (bufferSize / 10) == 0) { + val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity." + if (tot <= bufferSize / 2) + log.info(logMsg) + else + log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.") + } } } }