log typeName in ShardRegion logs (#26611)
This commit is contained in:
parent
baeaca1ce0
commit
7f5e7ae352
1 changed files with 43 additions and 33 deletions
|
|
@ -485,7 +485,10 @@ private[akka] class ShardRegion(
|
|||
override def preStart(): Unit = {
|
||||
cluster.subscribe(self, classOf[MemberEvent])
|
||||
if (settings.passivateIdleEntityAfter > Duration.Zero)
|
||||
log.info("Idle entities will be passivated after [{}]", PrettyDuration.format(settings.passivateIdleEntityAfter))
|
||||
log.info(
|
||||
"{}: Idle entities will be passivated after [{}]",
|
||||
typeName,
|
||||
PrettyDuration.format(settings.passivateIdleEntityAfter))
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
@ -524,7 +527,8 @@ private[akka] class ShardRegion(
|
|||
if (before != after) {
|
||||
if (log.isDebugEnabled)
|
||||
log.debug(
|
||||
"Coordinator moved from [{}] to [{}]",
|
||||
"{}: Coordinator moved from [{}] to [{}]",
|
||||
typeName,
|
||||
before.map(_.address).getOrElse(""),
|
||||
after.map(_.address).getOrElse(""))
|
||||
coordinator = None
|
||||
|
|
@ -544,10 +548,7 @@ private[akka] class ShardRegion(
|
|||
case msg: StartEntity => deliverStartEntity(msg, sender())
|
||||
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender())
|
||||
case unknownMsg =>
|
||||
log.warning(
|
||||
"Message does not have an extractor defined in shard [{}] so it was ignored: {}",
|
||||
typeName,
|
||||
unknownMsg)
|
||||
log.warning("{}: Message does not have an extractor defined in shard so it was ignored: {}", typeName, unknownMsg)
|
||||
}
|
||||
|
||||
def receiveClusterState(state: CurrentClusterState): Unit = {
|
||||
|
|
@ -571,7 +572,7 @@ private[akka] class ShardRegion(
|
|||
|
||||
case MemberDowned(m) =>
|
||||
if (m.uniqueAddress == cluster.selfUniqueAddress) {
|
||||
log.info("Self downed, stopping ShardRegion [{}]", self.path)
|
||||
log.info("{}: Self downed, stopping ShardRegion [{}]", typeName, self.path)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
|
|
@ -582,7 +583,7 @@ private[akka] class ShardRegion(
|
|||
|
||||
def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
|
||||
case HostShard(shard) =>
|
||||
log.debug("Host Shard [{}] ", shard)
|
||||
log.debug("{}: Host Shard [{}] ", typeName, shard)
|
||||
regionByShard = regionByShard.updated(shard, self)
|
||||
regions = regions.updated(self, regions.getOrElse(self, Set.empty) + shard)
|
||||
|
||||
|
|
@ -592,11 +593,11 @@ private[akka] class ShardRegion(
|
|||
sender() ! ShardStarted(shard)
|
||||
|
||||
case ShardHome(shard, ref) =>
|
||||
log.debug("Shard [{}] located at [{}]", shard, ref)
|
||||
log.debug("{}: Shard [{}] located at [{}]", typeName, shard, ref)
|
||||
regionByShard.get(shard) match {
|
||||
case Some(r) if r == self && ref != self =>
|
||||
// should not happen, inconsistency between ShardRegion and ShardCoordinator
|
||||
throw new IllegalStateException(s"Unexpected change of shard [$shard] from self to [$ref]")
|
||||
throw new IllegalStateException(s"$typeName: Unexpected change of shard [$shard] from self to [$ref]")
|
||||
case _ =>
|
||||
}
|
||||
regionByShard = regionByShard.updated(shard, ref)
|
||||
|
|
@ -616,7 +617,7 @@ private[akka] class ShardRegion(
|
|||
requestShardBufferHomes()
|
||||
|
||||
case BeginHandOff(shard) =>
|
||||
log.debug("BeginHandOff shard [{}]", shard)
|
||||
log.debug("{}: BeginHandOff shard [{}]", typeName, shard)
|
||||
if (regionByShard.contains(shard)) {
|
||||
val regionRef = regionByShard(shard)
|
||||
val updatedShards = regions(regionRef) - shard
|
||||
|
|
@ -627,7 +628,7 @@ private[akka] class ShardRegion(
|
|||
sender() ! BeginHandOffAck(shard)
|
||||
|
||||
case msg @ HandOff(shard) =>
|
||||
log.debug("HandOff shard [{}]", shard)
|
||||
log.debug("{}: HandOff shard [{}]", typeName, shard)
|
||||
|
||||
// must drop requests that came in between the BeginHandOff and now,
|
||||
// because they might be forwarded from other regions and there
|
||||
|
|
@ -662,7 +663,7 @@ private[akka] class ShardRegion(
|
|||
tryCompleteGracefulShutdown()
|
||||
|
||||
case GracefulShutdown =>
|
||||
log.debug("Starting graceful shutdown of region and all its shards")
|
||||
log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
|
||||
gracefulShutdownInProgress = true
|
||||
sendGracefulShutdownToCoordinator()
|
||||
tryCompleteGracefulShutdown()
|
||||
|
|
@ -697,7 +698,7 @@ private[akka] class ShardRegion(
|
|||
regionByShard --= shards
|
||||
regions -= ref
|
||||
if (log.isDebugEnabled)
|
||||
log.debug("Region [{}] with shards [{}] terminated", ref, shards.mkString(", "))
|
||||
log.debug("{}: Region [{}] with shards [{}] terminated", typeName, ref, shards.mkString(", "))
|
||||
} else if (shardsByRef.contains(ref)) {
|
||||
val shardId: ShardId = shardsByRef(ref)
|
||||
|
||||
|
|
@ -706,10 +707,10 @@ private[akka] class ShardRegion(
|
|||
startingShards -= shardId
|
||||
if (handingOff.contains(ref)) {
|
||||
handingOff = handingOff - ref
|
||||
log.debug("Shard [{}] handoff complete", shardId)
|
||||
log.debug("{}: Shard [{}] handoff complete", typeName, shardId)
|
||||
} else {
|
||||
// if persist fails it will stop
|
||||
log.debug("Shard [{}] terminated while not being handed off", shardId)
|
||||
log.debug("{}: Shard [{}] terminated while not being handed off", typeName, shardId)
|
||||
if (rememberEntities) {
|
||||
context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RestartShard(shardId))
|
||||
}
|
||||
|
|
@ -765,13 +766,15 @@ private[akka] class ShardRegion(
|
|||
if (cluster.state.unreachable(membersByAge.head)) s"Coordinator [${membersByAge.head}] is unreachable."
|
||||
else s"Coordinator [${membersByAge.head}] is reachable."
|
||||
log.warning(
|
||||
"Trying to register to coordinator at [{}], but no acknowledgement. Total [{}] buffered messages. [{}]",
|
||||
"{}: Trying to register to coordinator at [{}], but no acknowledgement. Total [{}] buffered messages. [{}]",
|
||||
typeName,
|
||||
actorSelection,
|
||||
shardBuffers.totalSize,
|
||||
coordinatorMessage)
|
||||
case None =>
|
||||
log.warning(
|
||||
"No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed? Total [{}] buffered messages.",
|
||||
"{}: No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed? Total [{}] buffered messages.",
|
||||
typeName,
|
||||
shardBuffers.totalSize)
|
||||
}
|
||||
}
|
||||
|
|
@ -783,11 +786,11 @@ private[akka] class ShardRegion(
|
|||
shardBuffers.foreach {
|
||||
case (shard, buf) =>
|
||||
coordinator.foreach { c =>
|
||||
val logMsg = "Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages."
|
||||
val logMsg = "{}: Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages."
|
||||
if (retryCount >= 5)
|
||||
log.warning(logMsg, shard, c, buf.size)
|
||||
log.warning(logMsg, typeName, shard, c, buf.size)
|
||||
else
|
||||
log.debug(logMsg, shard, c, buf.size)
|
||||
log.debug(logMsg, typeName, shard, c, buf.size)
|
||||
|
||||
c ! GetShardHome(shard)
|
||||
}
|
||||
|
|
@ -795,7 +798,7 @@ private[akka] class ShardRegion(
|
|||
}
|
||||
|
||||
def initializeShard(id: ShardId, shard: ActorRef): Unit = {
|
||||
log.debug("Shard was initialized {}", id)
|
||||
log.debug("{}: Shard was initialized {}", typeName, id)
|
||||
startingShards -= id
|
||||
deliverBufferedMessages(id, shard)
|
||||
}
|
||||
|
|
@ -804,9 +807,9 @@ private[akka] class ShardRegion(
|
|||
val totBufSize = shardBuffers.totalSize
|
||||
if (totBufSize >= bufferSize) {
|
||||
if (loggedFullBufferWarning)
|
||||
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
|
||||
log.debug("{}: Buffer is full, dropping message for shard [{}]", typeName, shardId)
|
||||
else {
|
||||
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
|
||||
log.warning("{}: Buffer is full, dropping message for shard [{}]", typeName, shardId)
|
||||
loggedFullBufferWarning = true
|
||||
}
|
||||
context.system.deadLetters ! msg
|
||||
|
|
@ -816,7 +819,7 @@ private[akka] class ShardRegion(
|
|||
// log some insight to how buffers are filled up every 10% of the buffer capacity
|
||||
val tot = totBufSize + 1
|
||||
if (tot % (bufferSize / 10) == 0) {
|
||||
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
|
||||
val logMsg = s"$typeName: ShardRegion is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
|
||||
if (tot <= bufferSize / 2)
|
||||
log.info(logMsg)
|
||||
else
|
||||
|
|
@ -829,7 +832,7 @@ private[akka] class ShardRegion(
|
|||
def deliverBufferedMessages(shardId: ShardId, receiver: ActorRef): Unit = {
|
||||
if (shardBuffers.contains(shardId)) {
|
||||
val buf = shardBuffers.getOrEmpty(shardId)
|
||||
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shardId)
|
||||
log.debug("{}: Deliver [{}] buffered messages for shard [{}]", typeName, buf.size, shardId)
|
||||
buf.foreach { case (msg, snd) => receiver.tell(msg, snd) }
|
||||
shardBuffers.remove(shardId)
|
||||
}
|
||||
|
|
@ -842,7 +845,10 @@ private[akka] class ShardRegion(
|
|||
deliverMessage(msg, snd)
|
||||
} catch {
|
||||
case ex: MatchError =>
|
||||
log.error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).")
|
||||
log.error(
|
||||
ex,
|
||||
"{}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).",
|
||||
typeName)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -855,11 +861,15 @@ private[akka] class ShardRegion(
|
|||
getShard(shardId)
|
||||
case None =>
|
||||
if (!shardBuffers.contains(shardId)) {
|
||||
log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator)
|
||||
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
|
||||
coordinator.foreach(_ ! GetShardHome(shardId))
|
||||
}
|
||||
val buf = shardBuffers.getOrEmpty(shardId)
|
||||
log.debug("Buffer message for shard [{}]. Total [{}] buffered messages.", shardId, buf.size + 1)
|
||||
log.debug(
|
||||
"{}: Buffer message for shard [{}]. Total [{}] buffered messages.",
|
||||
typeName,
|
||||
shardId,
|
||||
buf.size + 1)
|
||||
shardBuffers.append(shardId, msg, snd)
|
||||
}
|
||||
|
||||
|
|
@ -877,14 +887,14 @@ private[akka] class ShardRegion(
|
|||
case None => bufferMessage(shardId, msg, snd)
|
||||
}
|
||||
case Some(ref) =>
|
||||
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
|
||||
log.debug("{}: Forwarding request for shard [{}] to [{}]", typeName, shardId, ref)
|
||||
ref.tell(msg, snd)
|
||||
case None if shardId == null || shardId == "" =>
|
||||
log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName)
|
||||
log.warning("{}: Shard must not be empty, dropping message [{}]", typeName, msg.getClass.getName)
|
||||
context.system.deadLetters ! msg
|
||||
case None =>
|
||||
if (!shardBuffers.contains(shardId)) {
|
||||
log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator)
|
||||
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
|
||||
coordinator.foreach(_ ! GetShardHome(shardId))
|
||||
}
|
||||
bufferMessage(shardId, msg, snd)
|
||||
|
|
@ -899,7 +909,7 @@ private[akka] class ShardRegion(
|
|||
.get(id)
|
||||
.orElse(entityProps match {
|
||||
case Some(props) if !shardsByRef.values.exists(_ == id) =>
|
||||
log.debug("Starting shard [{}] in region", id)
|
||||
log.debug("{}: Starting shard [{}] in region", typeName, id)
|
||||
|
||||
val name = URLEncoder.encode(id, "utf-8")
|
||||
val shard = context.watch(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue