From e09de4fc846c63d30bd7042963d20df40fcb3f27 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 31 May 2021 21:50:28 +0200 Subject: [PATCH] Possibility to make snapshot load failures non-fatal, #30234 (#30238) Co-authored-by: Renato Cavalcanti --- akka-docs/src/main/paradox/persistence.md | 13 +++ .../paradox/typed/persistence-snapshot.md | 14 +++ .../testkit/PersistenceTestKitPlugin.scala | 4 +- .../typed/internal/BehaviorSetup.scala | 12 +++ .../typed/internal/ExternalInteractions.scala | 8 +- .../typed/internal/ReplayingSnapshot.scala | 75 ++++++++------ .../scaladsl/SnapshotIsOptionalSpec.scala | 98 +++++++++++++++++++ .../src/main/resources/reference.conf | 9 ++ .../scala/akka/persistence/Eventsourced.scala | 76 +++++++++----- .../SnapshotFailureRobustnessSpec.scala | 46 ++++++++- 10 files changed, 294 insertions(+), 61 deletions(-) create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotIsOptionalSpec.scala diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index 2d95deed87..73a36eccf8 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -640,6 +640,19 @@ If failure messages are left unhandled by the actor, a default warning log messa No default action is performed on the success messages, however you're free to handle them e.g. in order to delete an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot again. +### Optional snapshots + +By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery. +It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case +of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events. + +Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration. + +@@@ warning + +Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails. + +@@@ ## Scaling out diff --git a/akka-docs/src/main/paradox/typed/persistence-snapshot.md b/akka-docs/src/main/paradox/typed/persistence-snapshot.md index 22dfb1982f..7fd4130189 100644 --- a/akka-docs/src/main/paradox/typed/persistence-snapshot.md +++ b/akka-docs/src/main/paradox/typed/persistence-snapshot.md @@ -71,6 +71,20 @@ started, `RecoveryFailed` signal is emitted (logging the error by default), and Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots if you for example know that serialization format has changed in an incompatible way. +### Optional snapshots + +By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery. +It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case +of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events. + +Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration. + +@@@ warning + +Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails. + +@@@ + ## Snapshot deletion To free up space, an event sourced actor can automatically delete older snapshots based on the given `RetentionCriteria`. diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index dc8d5e7aa6..aaccd927e2 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -128,6 +128,8 @@ object PersistenceTestKitSnapshotPlugin { val config: Config = ConfigFactory.parseMap( Map( "akka.persistence.snapshot-store.plugin" -> PluginId, - s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName).asJava) + s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName, + s"$PluginId.snapshot-is-optional" -> false // fallback isn't used by the testkit + ).asJava) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index ae54fb0524..64d731aa77 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -67,6 +67,18 @@ private[akka] final class BehaviorSetup[C, E, S]( val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId) val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) + val isSnapshotOptional: Boolean = + Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional") + + if (isSnapshotOptional && (retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, true) => true + case _ => false + })) { + throw new IllegalArgumentException( + "Retention criteria with delete events can't be used together with snapshot-is-optional=false. " + + "That can result in wrong recovered state if snapshot load fails.") + } + val replicaId: Option[ReplicaId] = replication.map(_.replicaId) def selfClassic: ClassicActorRef = context.self.toClassic diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 75c37d93c1..5789eed429 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -157,7 +157,12 @@ private[akka] trait JournalInteractions[C, E, S] { * is enabled, old messages are deleted based on `SnapshotCountRetentionCriteria.snapshotEveryNEvents` * before old snapshots are deleted. */ - protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit = + protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit = { + if (setup.isSnapshotOptional) { + setup.internalLogger.warn( + "Delete events shouldn't be used together with snapshot-is-optional=false. " + + "That can result in wrong recovered state if snapshot load fails.") + } if (toSequenceNr > 0) { val self = setup.selfClassic @@ -170,6 +175,7 @@ private[akka] trait JournalInteractions[C, E, S] { s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), toSequenceNr) } + } } /** INTERNAL API */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 9316972d39..ddfc3bed44 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -144,40 +144,53 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup def onSnapshotterResponse( response: SnapshotProtocol.Response, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { + + def loadSnapshotResult(snapshot: Option[SelectedSnapshot], toSnr: Long): Behavior[InternalProtocol] = { + var state: S = setup.emptyState + + val (seqNr: Long, seenPerReplica, version) = snapshot match { + case Some(SelectedSnapshot(metadata, snapshot)) => + state = setup.snapshotAdapter.fromJournal(snapshot) + setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata) + metadata.metadata match { + case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) + case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) + } + case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) + } + + setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version) + + setup.cancelRecoveryTimer() + + ReplayingEvents[C, E, S]( + setup, + ReplayingEvents.ReplayingState( + seqNr, + state, + eventSeenInInterval = false, + toSnr, + receivedPoisonPill, + System.nanoTime(), + version, + seenPerReplica, + eventsReplayed = 0)) + } + response match { - case LoadSnapshotResult(sso, toSnr) => - var state: S = setup.emptyState - - val (seqNr: Long, seenPerReplica, version) = sso match { - case Some(SelectedSnapshot(metadata, snapshot)) => - state = setup.snapshotAdapter.fromJournal(snapshot) - setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata) - metadata.metadata match { - case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) - case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) - } - case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) - } - - setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version) - - setup.cancelRecoveryTimer() - - ReplayingEvents[C, E, S]( - setup, - ReplayingEvents.ReplayingState( - seqNr, - state, - eventSeenInInterval = false, - toSnr, - receivedPoisonPill, - System.nanoTime(), - version, - seenPerReplica, - eventsReplayed = 0)) + case LoadSnapshotResult(snapshot, toSnr) => + loadSnapshotResult(snapshot, toSnr) case LoadSnapshotFailed(cause) => - onRecoveryFailure(cause) + if (setup.isSnapshotOptional) { + setup.internalLogger.info( + "Snapshot load error for persistenceId [{}]. Replaying all events since snapshot-is-optional=true", + setup.persistenceId) + + loadSnapshotResult(snapshot = None, setup.recovery.toSequenceNr) + } else { + onRecoveryFailure(cause) + } case _ => Behaviors.unhandled diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotIsOptionalSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotIsOptionalSpec.scala new file mode 100644 index 0000000000..8ce919e2d5 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotIsOptionalSpec.scala @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import com.fasterxml.jackson.annotation.JsonCreator +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.persistence.typed.PersistenceId +import akka.serialization.jackson.CborSerializable + +object SnapshotIsOptionalSpec { + private val conf: Config = ConfigFactory.parseString(s""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.snapshot-store.local.snapshot-is-optional = true + """) + case class State1(field1: String) extends CborSerializable { + @JsonCreator + def this() = this(null) + + if (field1 == null) + throw new RuntimeException("Deserialization error") + } + case class Command(c: String) extends CborSerializable + case class Event(e: String) extends CborSerializable +} + +class SnapshotIsOptionalSpec + extends ScalaTestWithActorTestKit(SnapshotIsOptionalSpec.conf) + with AnyWordSpecLike + with LogCapturing { + import SnapshotIsOptionalSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})") + + private def behavior(pid: PersistenceId, probe: ActorRef[State1]): EventSourcedBehavior[Command, Event, State1] = + EventSourcedBehavior[Command, Event, State1]( + pid, + State1(""), + commandHandler = { (state, command) => + command match { + case Command("get") => + probe.tell(state) + Effect.none + case _ => + Effect.persist(Event(command.c)).thenRun(newState => probe ! newState) + } + }, + eventHandler = { (state, evt) => + state.copy(field1 = state.field1 + "|" + evt.e) + }) + + "Snapshot recovery with snapshot-is-optional=true" must { + + "fall back to events when deserialization error" in { + val pid = nextPid() + + val stateProbe1 = createTestProbe[State1]() + val b1 = behavior(pid, stateProbe1.ref).snapshotWhen { (_, event, _) => + event.e.contains("snapshot") + } + val ref1 = spawn(b1) + ref1.tell(Command("one")) + stateProbe1.expectMessage(State1("|one")) + ref1.tell(Command("snapshot now")) + stateProbe1.expectMessage(State1("|one|snapshot now")) + testKit.stop(ref1) + + val stateProbe2 = createTestProbe[State1]() + val ref2 = spawn(behavior(pid, stateProbe2.ref)) + ref2.tell(Command("get")) + stateProbe2.expectMessage(State1("|one|snapshot now")) + testKit.stop(ref2) + } + + "fail fast if used with retention criteria with delete events" in { + val pid = nextPid() + + val stateProbe1 = createTestProbe[State1]() + val ref = spawn( + behavior(pid, stateProbe1.ref).withRetention(RetentionCriteria.snapshotEvery(10, 3).withDeleteEventsOnSnapshot)) + createTestProbe().expectTerminated(ref) + } + + } +} diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 2e4c22d4de..1e75bdc1a5 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -168,6 +168,15 @@ akka.persistence { call-timeout = 20s reset-timeout = 60s } + + # Set this to true if successful loading of snapshot is not necessary. + # This can be useful when it is alright to ignore snapshot in case of + # for example deserialization errors. When snapshot loading fails it will instead + # recover by replaying all events. + # Don't set to true if events are deleted because that would + # result in wrong recovered state if snapshot load fails. + snapshot-is-optional = false + } fsm { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 86d79d229a..632d49c834 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -655,36 +655,57 @@ private[persistence] trait Eventsourced override def recoveryRunning: Boolean = true - override def stateReceive(receive: Receive, message: Any) = - try message match { - case LoadSnapshotResult(sso, toSnr) => - timeoutCancellable.cancel() - sso.foreach { - case SelectedSnapshot(metadata, snapshot) => - val offer = SnapshotOffer(metadata, snapshot) - if (recoveryBehavior.isDefinedAt(offer)) { - try { - setLastSequenceNr(metadata.sequenceNr) - // Since we are recovering we can ignore the receive behavior from the stack - Eventsourced.super.aroundReceive(recoveryBehavior, offer) - } catch { - case NonFatal(t) => - try onRecoveryFailure(t, None) - finally context.stop(self) - returnRecoveryPermit() - } - } else { - unhandled(offer) + override def stateReceive(receive: Receive, message: Any): Unit = { + def loadSnapshotResult(snapshot: Option[SelectedSnapshot], toSnr: Long): Unit = { + timeoutCancellable.cancel() + snapshot.foreach { + case SelectedSnapshot(metadata, snapshot) => + val offer = SnapshotOffer(metadata, snapshot) + if (recoveryBehavior.isDefinedAt(offer)) { + try { + setLastSequenceNr(metadata.sequenceNr) + // Since we are recovering we can ignore the receive behavior from the stack + Eventsourced.super.aroundReceive(recoveryBehavior, offer) + } catch { + case NonFatal(t) => + try onRecoveryFailure(t, None) + finally context.stop(self) + returnRecoveryPermit() } - } - changeState(recovering(recoveryBehavior, timeout)) - journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) + } else { + unhandled(offer) + } + } + changeState(recovering(recoveryBehavior, timeout)) + journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) + } + + def isSnapshotOptional: Boolean = { + try { + Persistence(context.system).configFor(snapshotStore).getBoolean("snapshot-is-optional") + } catch { + case NonFatal(exc) => + log.error(exc, "Invalid snapshot-is-optional configuration.") + false // fail recovery + } + } + + try message match { + case LoadSnapshotResult(snapshot, toSnr) => + loadSnapshotResult(snapshot, toSnr) case LoadSnapshotFailed(cause) => - timeoutCancellable.cancel() - try onRecoveryFailure(cause, event = None) - finally context.stop(self) - returnRecoveryPermit() + if (isSnapshotOptional) { + log.info( + "Snapshot load error for persistenceId [{}]. Replaying all events since snapshot-is-optional=true", + persistenceId) + loadSnapshotResult(snapshot = None, recovery.toSequenceNr) + } else { + timeoutCancellable.cancel() + try onRecoveryFailure(cause, event = None) + finally context.stop(self) + returnRecoveryPermit() + } case RecoveryTick(true) => try onRecoveryFailure( @@ -700,6 +721,7 @@ private[persistence] trait Eventsourced returnRecoveryPermit() throw e } + } private def returnRecoveryPermit(): Unit = extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index a8c25b8c6b..1623cb99d4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -5,6 +5,7 @@ package akka.persistence import java.io.IOException +import java.util.UUID import scala.concurrent.Future import scala.concurrent.duration._ @@ -75,7 +76,7 @@ object SnapshotFailureRobustnessSpec { class FailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) { override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { - if (metadata.sequenceNr == 2 || snapshot == "boom") { + if (metadata.sequenceNr == 2 || snapshot.toString.startsWith("boom")) { val bytes = "b0rkb0rk".getBytes("UTF-8") // length >= 8 to prevent EOF exception val tmpFile = withOutputStream(metadata)(_.write(bytes)) tmpFile.renameTo(snapshotFileForWrite(metadata)) @@ -205,3 +206,46 @@ class SnapshotFailureRobustnessSpec } } } + +class SnapshotIsOptionalSpec + extends PersistenceSpec( + PersistenceSpec.config( + "inmem", + "SnapshotFailureReplayEventsSpec", + serialization = "off", + extraConfig = Some(s""" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$$FailingLocalSnapshotStore" + akka.persistence.snapshot-store.local.dir = "target/persistence-${UUID.randomUUID().toString}" + akka.persistence.snapshot-store.local.snapshot-is-optional = true + """))) + with ImplicitSender { + + import SnapshotFailureRobustnessSpec._ + + "A persistentActor with a failing snapshot with snapshot-is-optional=true" must { + "fall back to events" in { + val sPersistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor)) + + expectMsg(RecoveryCompleted) + sPersistentActor ! Cmd("boom1") + expectMsg(1) + sPersistentActor ! Cmd("boom2") + expectMsg(2) + + system.eventStream.publish( + TestEvent.Mute(EventFilter[java.io.NotSerializableException](start = "Error loading snapshot"))) + try { + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + + expectMsg("boom1-1") // from event replay + expectMsg("boom2-2") // from event replay + expectMsg(RecoveryCompleted) + expectNoMessage() + } finally { + system.eventStream.publish(TestEvent.UnMute(EventFilter.error(start = "Error loading snapshot ["))) + } + } + + } +}