diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index bb301306d4..b1f2a0a789 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -186,9 +186,10 @@ private[akka] final class ReplayingEvents[C, E, S]( } /** - * Called whenever a message replay fails. By default it logs the error. + * Called whenever a message replay fails. * - * The actor is always stopped after this method has been invoked. + * This method throws `JournalFailureException` which will be caught by the internal + * supervision strategy to stop or restart the actor with backoff. * * @param cause failure cause. * @param event the event that was being processed when the exception was thrown diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index c790efe587..0e27576356 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -11,6 +11,7 @@ import akka.annotation.{ InternalApi, InternalStableApi } import akka.persistence.SnapshotProtocol.LoadSnapshotFailed import akka.persistence.SnapshotProtocol.LoadSnapshotResult import akka.persistence._ +import akka.persistence.typed.RecoveryFailed import akka.util.unused /** @@ -77,17 +78,26 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup } /** - * Called whenever a message replay fails. By default it logs the error. + * Called whenever snapshot recovery fails. * - * The actor is always stopped after this method has been invoked. + * This method throws `JournalFailureException` which will be caught by the internal + * supervision strategy to stop or restart the actor with backoff. * * @param cause failure cause. */ private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = { onRecoveryFailed(setup.context, cause) + setup.onSignal(setup.emptyState, RecoveryFailed(cause), catchAndLog = true) setup.cancelRecoveryTimer() - setup.log.error(s"Persistence failure when replaying snapshot, due to: ${cause.getMessage}", cause) - Behaviors.stopped + + tryReturnRecoveryPermit("on snapshot recovery failure: " + cause.getMessage) + + if (setup.log.isDebugEnabled) + setup.log.debug("Recovery failure for persistenceId [{}]", setup.persistenceId) + + val msg = s"Exception during recovery from snapshot. " + + s"PersistenceId [${setup.persistenceId.id}]. ${cause.getMessage}" + throw new JournalFailureException(msg, cause) } @InternalStableApi diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index c55d715f68..e1a4f42717 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -496,7 +496,7 @@ class EventSourcedBehaviorSpec } "fail after recovery timeout" in { - LoggingEventFilter.error("Persistence failure when replaying snapshot").intercept { + LoggingEventFilter.error("Exception during recovery from snapshot").intercept { val c = spawn( Behaviors.setup[Command](ctx => counter(ctx, nextPid)