diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index efe7b65dd6..887e502ec2 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -485,7 +485,10 @@ private[akka] class ShardRegion( override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) if (settings.passivateIdleEntityAfter > Duration.Zero) - log.info("Idle entities will be passivated after [{}]", PrettyDuration.format(settings.passivateIdleEntityAfter)) + log.info( + "{}: Idle entities will be passivated after [{}]", + typeName, + PrettyDuration.format(settings.passivateIdleEntityAfter)) } override def postStop(): Unit = { @@ -524,7 +527,8 @@ private[akka] class ShardRegion( if (before != after) { if (log.isDebugEnabled) log.debug( - "Coordinator moved from [{}] to [{}]", + "{}: Coordinator moved from [{}] to [{}]", + typeName, before.map(_.address).getOrElse(""), after.map(_.address).getOrElse("")) coordinator = None @@ -544,10 +548,7 @@ private[akka] class ShardRegion( case msg: StartEntity => deliverStartEntity(msg, sender()) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) case unknownMsg => - log.warning( - "Message does not have an extractor defined in shard [{}] so it was ignored: {}", - typeName, - unknownMsg) + log.warning("{}: Message does not have an extractor defined in shard so it was ignored: {}", typeName, unknownMsg) } def receiveClusterState(state: CurrentClusterState): Unit = { @@ -571,7 +572,7 @@ private[akka] class ShardRegion( case MemberDowned(m) => if (m.uniqueAddress == cluster.selfUniqueAddress) { - log.info("Self downed, stopping ShardRegion [{}]", self.path) + log.info("{}: Self downed, stopping ShardRegion [{}]", typeName, self.path) context.stop(self) } @@ -582,7 +583,7 @@ private[akka] class ShardRegion( def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { case HostShard(shard) => - log.debug("Host Shard [{}] ", shard) + log.debug("{}: Host Shard [{}] ", typeName, shard) regionByShard = regionByShard.updated(shard, self) regions = regions.updated(self, regions.getOrElse(self, Set.empty) + shard) @@ -592,11 +593,11 @@ private[akka] class ShardRegion( sender() ! ShardStarted(shard) case ShardHome(shard, ref) => - log.debug("Shard [{}] located at [{}]", shard, ref) + log.debug("{}: Shard [{}] located at [{}]", typeName, shard, ref) regionByShard.get(shard) match { case Some(r) if r == self && ref != self => // should not happen, inconsistency between ShardRegion and ShardCoordinator - throw new IllegalStateException(s"Unexpected change of shard [$shard] from self to [$ref]") + throw new IllegalStateException(s"$typeName: Unexpected change of shard [$shard] from self to [$ref]") case _ => } regionByShard = regionByShard.updated(shard, ref) @@ -616,7 +617,7 @@ private[akka] class ShardRegion( requestShardBufferHomes() case BeginHandOff(shard) => - log.debug("BeginHandOff shard [{}]", shard) + log.debug("{}: BeginHandOff shard [{}]", typeName, shard) if (regionByShard.contains(shard)) { val regionRef = regionByShard(shard) val updatedShards = regions(regionRef) - shard @@ -627,7 +628,7 @@ private[akka] class ShardRegion( sender() ! BeginHandOffAck(shard) case msg @ HandOff(shard) => - log.debug("HandOff shard [{}]", shard) + log.debug("{}: HandOff shard [{}]", typeName, shard) // must drop requests that came in between the BeginHandOff and now, // because they might be forwarded from other regions and there @@ -662,7 +663,7 @@ private[akka] class ShardRegion( tryCompleteGracefulShutdown() case GracefulShutdown => - log.debug("Starting graceful shutdown of region and all its shards") + log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) gracefulShutdownInProgress = true sendGracefulShutdownToCoordinator() tryCompleteGracefulShutdown() @@ -697,7 +698,7 @@ private[akka] class ShardRegion( regionByShard --= shards regions -= ref if (log.isDebugEnabled) - log.debug("Region [{}] with shards [{}] terminated", ref, shards.mkString(", ")) + log.debug("{}: Region [{}] with shards [{}] terminated", typeName, ref, shards.mkString(", ")) } else if (shardsByRef.contains(ref)) { val shardId: ShardId = shardsByRef(ref) @@ -706,10 +707,10 @@ private[akka] class ShardRegion( startingShards -= shardId if (handingOff.contains(ref)) { handingOff = handingOff - ref - log.debug("Shard [{}] handoff complete", shardId) + log.debug("{}: Shard [{}] handoff complete", typeName, shardId) } else { // if persist fails it will stop - log.debug("Shard [{}] terminated while not being handed off", shardId) + log.debug("{}: Shard [{}] terminated while not being handed off", typeName, shardId) if (rememberEntities) { context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RestartShard(shardId)) } @@ -765,13 +766,15 @@ private[akka] class ShardRegion( if (cluster.state.unreachable(membersByAge.head)) s"Coordinator [${membersByAge.head}] is unreachable." else s"Coordinator [${membersByAge.head}] is reachable." log.warning( - "Trying to register to coordinator at [{}], but no acknowledgement. Total [{}] buffered messages. [{}]", + "{}: Trying to register to coordinator at [{}], but no acknowledgement. Total [{}] buffered messages. [{}]", + typeName, actorSelection, shardBuffers.totalSize, coordinatorMessage) case None => log.warning( - "No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed? Total [{}] buffered messages.", + "{}: No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed? Total [{}] buffered messages.", + typeName, shardBuffers.totalSize) } } @@ -783,11 +786,11 @@ private[akka] class ShardRegion( shardBuffers.foreach { case (shard, buf) => coordinator.foreach { c => - val logMsg = "Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages." + val logMsg = "{}: Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages." if (retryCount >= 5) - log.warning(logMsg, shard, c, buf.size) + log.warning(logMsg, typeName, shard, c, buf.size) else - log.debug(logMsg, shard, c, buf.size) + log.debug(logMsg, typeName, shard, c, buf.size) c ! GetShardHome(shard) } @@ -795,7 +798,7 @@ private[akka] class ShardRegion( } def initializeShard(id: ShardId, shard: ActorRef): Unit = { - log.debug("Shard was initialized {}", id) + log.debug("{}: Shard was initialized {}", typeName, id) startingShards -= id deliverBufferedMessages(id, shard) } @@ -804,9 +807,9 @@ private[akka] class ShardRegion( val totBufSize = shardBuffers.totalSize if (totBufSize >= bufferSize) { if (loggedFullBufferWarning) - log.debug("Buffer is full, dropping message for shard [{}]", shardId) + log.debug("{}: Buffer is full, dropping message for shard [{}]", typeName, shardId) else { - log.warning("Buffer is full, dropping message for shard [{}]", shardId) + log.warning("{}: Buffer is full, dropping message for shard [{}]", typeName, shardId) loggedFullBufferWarning = true } context.system.deadLetters ! msg @@ -816,7 +819,7 @@ private[akka] class ShardRegion( // log some insight to how buffers are filled up every 10% of the buffer capacity val tot = totBufSize + 1 if (tot % (bufferSize / 10) == 0) { - val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity." + val logMsg = s"$typeName: ShardRegion is using [${100.0 * tot / bufferSize} %] of its buffer capacity." if (tot <= bufferSize / 2) log.info(logMsg) else @@ -829,7 +832,7 @@ private[akka] class ShardRegion( def deliverBufferedMessages(shardId: ShardId, receiver: ActorRef): Unit = { if (shardBuffers.contains(shardId)) { val buf = shardBuffers.getOrEmpty(shardId) - log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shardId) + log.debug("{}: Deliver [{}] buffered messages for shard [{}]", typeName, buf.size, shardId) buf.foreach { case (msg, snd) => receiver.tell(msg, snd) } shardBuffers.remove(shardId) } @@ -842,7 +845,10 @@ private[akka] class ShardRegion( deliverMessage(msg, snd) } catch { case ex: MatchError => - log.error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).") + log.error( + ex, + "{}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).", + typeName) } } @@ -855,11 +861,15 @@ private[akka] class ShardRegion( getShard(shardId) case None => if (!shardBuffers.contains(shardId)) { - log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator) + log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator) coordinator.foreach(_ ! GetShardHome(shardId)) } val buf = shardBuffers.getOrEmpty(shardId) - log.debug("Buffer message for shard [{}]. Total [{}] buffered messages.", shardId, buf.size + 1) + log.debug( + "{}: Buffer message for shard [{}]. Total [{}] buffered messages.", + typeName, + shardId, + buf.size + 1) shardBuffers.append(shardId, msg, snd) } @@ -877,14 +887,14 @@ private[akka] class ShardRegion( case None => bufferMessage(shardId, msg, snd) } case Some(ref) => - log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref) + log.debug("{}: Forwarding request for shard [{}] to [{}]", typeName, shardId, ref) ref.tell(msg, snd) case None if shardId == null || shardId == "" => - log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName) + log.warning("{}: Shard must not be empty, dropping message [{}]", typeName, msg.getClass.getName) context.system.deadLetters ! msg case None => if (!shardBuffers.contains(shardId)) { - log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator) + log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator) coordinator.foreach(_ ! GetShardHome(shardId)) } bufferMessage(shardId, msg, snd) @@ -899,7 +909,7 @@ private[akka] class ShardRegion( .get(id) .orElse(entityProps match { case Some(props) if !shardsByRef.values.exists(_ == id) => - log.debug("Starting shard [{}] in region", id) + log.debug("{}: Starting shard [{}] in region", typeName, id) val name = URLEncoder.encode(id, "utf-8") val shard = context.watch(