Change ShardCoordinator update failure logging #30608
This commit is contained in:
parent
16ed5b4f64
commit
70120060b8
1 changed files with 23 additions and 11 deletions
|
|
@ -1433,6 +1433,7 @@ private[akka] class DDataShardCoordinator(
|
|||
private var terminating = false
|
||||
private var getShardHomeRequests: Set[(ActorRef, GetShardHome)] = Set.empty
|
||||
private var initialStateRetries = 0
|
||||
private var updateStateRetries = 0
|
||||
|
||||
private val rememberEntitiesStore =
|
||||
rememberEntitiesStoreProvider.map { provider =>
|
||||
|
|
@ -1572,6 +1573,7 @@ private[akka] class DDataShardCoordinator(
|
|||
afterUpdateCallback: E => Unit): Receive = {
|
||||
|
||||
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) =>
|
||||
updateStateRetries = 0
|
||||
if (!waitingForRememberShard) {
|
||||
log.debug("{}: The coordinator state was successfully updated with {}", typeName, evt)
|
||||
if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
|
||||
|
|
@ -1591,18 +1593,28 @@ private[akka] class DDataShardCoordinator(
|
|||
}
|
||||
|
||||
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) =>
|
||||
log.error(
|
||||
"{}: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis ({}). " +
|
||||
"Perhaps the ShardRegion has not started on all active nodes yet? event={}",
|
||||
typeName,
|
||||
stateWriteConsistency.timeout.toMillis,
|
||||
if (terminating) "terminating" else "retrying",
|
||||
evt)
|
||||
if (terminating) {
|
||||
context.stop(self)
|
||||
updateStateRetries += 1
|
||||
|
||||
val template = s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency.timeout.toMillis} millis (${if (terminating) "terminating"
|
||||
else "retrying"}). Attempt $updateStateRetries. " +
|
||||
s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt"
|
||||
|
||||
if (updateStateRetries < 5) {
|
||||
log.warning(template)
|
||||
if (terminating) {
|
||||
context.stop(self)
|
||||
} else {
|
||||
// repeat until UpdateSuccess
|
||||
sendCoordinatorStateUpdate(evt)
|
||||
}
|
||||
} else {
|
||||
// repeat until UpdateSuccess
|
||||
sendCoordinatorStateUpdate(evt)
|
||||
log.error(template)
|
||||
if (terminating) {
|
||||
context.stop(self)
|
||||
} else {
|
||||
// repeat until UpdateSuccess
|
||||
sendCoordinatorStateUpdate(evt)
|
||||
}
|
||||
}
|
||||
|
||||
case ModifyFailure(key, error, cause, _) =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue