diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 9edc428ff6..8c8e19a255 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1433,6 +1433,7 @@ private[akka] class DDataShardCoordinator( private var terminating = false private var getShardHomeRequests: Set[(ActorRef, GetShardHome)] = Set.empty private var initialStateRetries = 0 + private var updateStateRetries = 0 private val rememberEntitiesStore = rememberEntitiesStoreProvider.map { provider => @@ -1572,6 +1573,7 @@ private[akka] class DDataShardCoordinator( afterUpdateCallback: E => Unit): Receive = { case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) => + updateStateRetries = 0 if (!waitingForRememberShard) { log.debug("{}: The coordinator state was successfully updated with {}", typeName, evt) if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey) @@ -1591,18 +1593,28 @@ private[akka] class DDataShardCoordinator( } case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) => - log.error( - "{}: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis ({}). " + - "Perhaps the ShardRegion has not started on all active nodes yet? event={}", - typeName, - stateWriteConsistency.timeout.toMillis, - if (terminating) "terminating" else "retrying", - evt) - if (terminating) { - context.stop(self) + updateStateRetries += 1 + + val template = s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency.timeout.toMillis} millis (${if (terminating) "terminating" + else "retrying"}). Attempt $updateStateRetries. " + + s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt" + + if (updateStateRetries < 5) { + log.warning(template) + if (terminating) { + context.stop(self) + } else { + // repeat until UpdateSuccess + sendCoordinatorStateUpdate(evt) + } } else { - // repeat until UpdateSuccess - sendCoordinatorStateUpdate(evt) + log.error(template) + if (terminating) { + context.stop(self) + } else { + // repeat until UpdateSuccess + sendCoordinatorStateUpdate(evt) + } } case ModifyFailure(key, error, cause, _) =>