Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
This commit is contained in:
parent
206dafa01d
commit
e09de4fc84
10 changed files with 294 additions and 61 deletions
|
|
@ -640,6 +640,19 @@ If failure messages are left unhandled by the actor, a default warning log messa
|
|||
No default action is performed on the success messages, however you're free to handle them e.g. in order to delete
|
||||
an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot again.
|
||||
|
||||
### Optional snapshots
|
||||
|
||||
By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery.
|
||||
It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case
|
||||
of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events.
|
||||
|
||||
Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration.
|
||||
|
||||
@@@ warning
|
||||
|
||||
Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails.
|
||||
|
||||
@@@
|
||||
|
||||
## Scaling out
|
||||
|
||||
|
|
|
|||
|
|
@ -71,6 +71,20 @@ started, `RecoveryFailed` signal is emitted (logging the error by default), and
|
|||
Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots
|
||||
if you for example know that serialization format has changed in an incompatible way.
|
||||
|
||||
### Optional snapshots
|
||||
|
||||
By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery.
|
||||
It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case
|
||||
of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events.
|
||||
|
||||
Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration.
|
||||
|
||||
@@@ warning
|
||||
|
||||
Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails.
|
||||
|
||||
@@@
|
||||
|
||||
## Snapshot deletion
|
||||
|
||||
To free up space, an event sourced actor can automatically delete older snapshots based on the given `RetentionCriteria`.
|
||||
|
|
|
|||
|
|
@ -128,6 +128,8 @@ object PersistenceTestKitSnapshotPlugin {
|
|||
val config: Config = ConfigFactory.parseMap(
|
||||
Map(
|
||||
"akka.persistence.snapshot-store.plugin" -> PluginId,
|
||||
s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName).asJava)
|
||||
s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName,
|
||||
s"$PluginId.snapshot-is-optional" -> false // fallback isn't used by the testkit
|
||||
).asJava)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,6 +67,18 @@ private[akka] final class BehaviorSetup[C, E, S](
|
|||
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
|
||||
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
|
||||
|
||||
val isSnapshotOptional: Boolean =
|
||||
Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional")
|
||||
|
||||
if (isSnapshotOptional && (retention match {
|
||||
case SnapshotCountRetentionCriteriaImpl(_, _, true) => true
|
||||
case _ => false
|
||||
})) {
|
||||
throw new IllegalArgumentException(
|
||||
"Retention criteria with delete events can't be used together with snapshot-is-optional=false. " +
|
||||
"That can result in wrong recovered state if snapshot load fails.")
|
||||
}
|
||||
|
||||
val replicaId: Option[ReplicaId] = replication.map(_.replicaId)
|
||||
|
||||
def selfClassic: ClassicActorRef = context.self.toClassic
|
||||
|
|
|
|||
|
|
@ -157,7 +157,12 @@ private[akka] trait JournalInteractions[C, E, S] {
|
|||
* is enabled, old messages are deleted based on `SnapshotCountRetentionCriteria.snapshotEveryNEvents`
|
||||
* before old snapshots are deleted.
|
||||
*/
|
||||
protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit =
|
||||
protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit = {
|
||||
if (setup.isSnapshotOptional) {
|
||||
setup.internalLogger.warn(
|
||||
"Delete events shouldn't be used together with snapshot-is-optional=false. " +
|
||||
"That can result in wrong recovered state if snapshot load fails.")
|
||||
}
|
||||
if (toSequenceNr > 0) {
|
||||
val self = setup.selfClassic
|
||||
|
||||
|
|
@ -170,6 +175,7 @@ private[akka] trait JournalInteractions[C, E, S] {
|
|||
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"),
|
||||
toSequenceNr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
|
|
|
|||
|
|
@ -144,40 +144,53 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
|
|||
def onSnapshotterResponse(
|
||||
response: SnapshotProtocol.Response,
|
||||
receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
|
||||
|
||||
def loadSnapshotResult(snapshot: Option[SelectedSnapshot], toSnr: Long): Behavior[InternalProtocol] = {
|
||||
var state: S = setup.emptyState
|
||||
|
||||
val (seqNr: Long, seenPerReplica, version) = snapshot match {
|
||||
case Some(SelectedSnapshot(metadata, snapshot)) =>
|
||||
state = setup.snapshotAdapter.fromJournal(snapshot)
|
||||
setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata)
|
||||
metadata.metadata match {
|
||||
case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version)
|
||||
case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
|
||||
}
|
||||
case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
|
||||
}
|
||||
|
||||
setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version)
|
||||
|
||||
setup.cancelRecoveryTimer()
|
||||
|
||||
ReplayingEvents[C, E, S](
|
||||
setup,
|
||||
ReplayingEvents.ReplayingState(
|
||||
seqNr,
|
||||
state,
|
||||
eventSeenInInterval = false,
|
||||
toSnr,
|
||||
receivedPoisonPill,
|
||||
System.nanoTime(),
|
||||
version,
|
||||
seenPerReplica,
|
||||
eventsReplayed = 0))
|
||||
}
|
||||
|
||||
response match {
|
||||
case LoadSnapshotResult(sso, toSnr) =>
|
||||
var state: S = setup.emptyState
|
||||
|
||||
val (seqNr: Long, seenPerReplica, version) = sso match {
|
||||
case Some(SelectedSnapshot(metadata, snapshot)) =>
|
||||
state = setup.snapshotAdapter.fromJournal(snapshot)
|
||||
setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata)
|
||||
metadata.metadata match {
|
||||
case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version)
|
||||
case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
|
||||
}
|
||||
case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
|
||||
}
|
||||
|
||||
setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version)
|
||||
|
||||
setup.cancelRecoveryTimer()
|
||||
|
||||
ReplayingEvents[C, E, S](
|
||||
setup,
|
||||
ReplayingEvents.ReplayingState(
|
||||
seqNr,
|
||||
state,
|
||||
eventSeenInInterval = false,
|
||||
toSnr,
|
||||
receivedPoisonPill,
|
||||
System.nanoTime(),
|
||||
version,
|
||||
seenPerReplica,
|
||||
eventsReplayed = 0))
|
||||
case LoadSnapshotResult(snapshot, toSnr) =>
|
||||
loadSnapshotResult(snapshot, toSnr)
|
||||
|
||||
case LoadSnapshotFailed(cause) =>
|
||||
onRecoveryFailure(cause)
|
||||
if (setup.isSnapshotOptional) {
|
||||
setup.internalLogger.info(
|
||||
"Snapshot load error for persistenceId [{}]. Replaying all events since snapshot-is-optional=true",
|
||||
setup.persistenceId)
|
||||
|
||||
loadSnapshotResult(snapshot = None, setup.recovery.toSequenceNr)
|
||||
} else {
|
||||
onRecoveryFailure(cause)
|
||||
}
|
||||
|
||||
case _ =>
|
||||
Behaviors.unhandled
|
||||
|
|
|
|||
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.scaladsl
|
||||
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
|
||||
object SnapshotIsOptionalSpec {
|
||||
private val conf: Config = ConfigFactory.parseString(s"""
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
|
||||
akka.persistence.snapshot-store.local.snapshot-is-optional = true
|
||||
""")
|
||||
case class State1(field1: String) extends CborSerializable {
|
||||
@JsonCreator
|
||||
def this() = this(null)
|
||||
|
||||
if (field1 == null)
|
||||
throw new RuntimeException("Deserialization error")
|
||||
}
|
||||
case class Command(c: String) extends CborSerializable
|
||||
case class Event(e: String) extends CborSerializable
|
||||
}
|
||||
|
||||
class SnapshotIsOptionalSpec
|
||||
extends ScalaTestWithActorTestKit(SnapshotIsOptionalSpec.conf)
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
import SnapshotIsOptionalSpec._
|
||||
|
||||
val pidCounter = new AtomicInteger(0)
|
||||
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
|
||||
|
||||
private def behavior(pid: PersistenceId, probe: ActorRef[State1]): EventSourcedBehavior[Command, Event, State1] =
|
||||
EventSourcedBehavior[Command, Event, State1](
|
||||
pid,
|
||||
State1(""),
|
||||
commandHandler = { (state, command) =>
|
||||
command match {
|
||||
case Command("get") =>
|
||||
probe.tell(state)
|
||||
Effect.none
|
||||
case _ =>
|
||||
Effect.persist(Event(command.c)).thenRun(newState => probe ! newState)
|
||||
}
|
||||
},
|
||||
eventHandler = { (state, evt) =>
|
||||
state.copy(field1 = state.field1 + "|" + evt.e)
|
||||
})
|
||||
|
||||
"Snapshot recovery with snapshot-is-optional=true" must {
|
||||
|
||||
"fall back to events when deserialization error" in {
|
||||
val pid = nextPid()
|
||||
|
||||
val stateProbe1 = createTestProbe[State1]()
|
||||
val b1 = behavior(pid, stateProbe1.ref).snapshotWhen { (_, event, _) =>
|
||||
event.e.contains("snapshot")
|
||||
}
|
||||
val ref1 = spawn(b1)
|
||||
ref1.tell(Command("one"))
|
||||
stateProbe1.expectMessage(State1("|one"))
|
||||
ref1.tell(Command("snapshot now"))
|
||||
stateProbe1.expectMessage(State1("|one|snapshot now"))
|
||||
testKit.stop(ref1)
|
||||
|
||||
val stateProbe2 = createTestProbe[State1]()
|
||||
val ref2 = spawn(behavior(pid, stateProbe2.ref))
|
||||
ref2.tell(Command("get"))
|
||||
stateProbe2.expectMessage(State1("|one|snapshot now"))
|
||||
testKit.stop(ref2)
|
||||
}
|
||||
|
||||
"fail fast if used with retention criteria with delete events" in {
|
||||
val pid = nextPid()
|
||||
|
||||
val stateProbe1 = createTestProbe[State1]()
|
||||
val ref = spawn(
|
||||
behavior(pid, stateProbe1.ref).withRetention(RetentionCriteria.snapshotEvery(10, 3).withDeleteEventsOnSnapshot))
|
||||
createTestProbe().expectTerminated(ref)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -168,6 +168,15 @@ akka.persistence {
|
|||
call-timeout = 20s
|
||||
reset-timeout = 60s
|
||||
}
|
||||
|
||||
# Set this to true if successful loading of snapshot is not necessary.
|
||||
# This can be useful when it is alright to ignore snapshot in case of
|
||||
# for example deserialization errors. When snapshot loading fails it will instead
|
||||
# recover by replaying all events.
|
||||
# Don't set to true if events are deleted because that would
|
||||
# result in wrong recovered state if snapshot load fails.
|
||||
snapshot-is-optional = false
|
||||
|
||||
}
|
||||
|
||||
fsm {
|
||||
|
|
|
|||
|
|
@ -655,36 +655,57 @@ private[persistence] trait Eventsourced
|
|||
|
||||
override def recoveryRunning: Boolean = true
|
||||
|
||||
override def stateReceive(receive: Receive, message: Any) =
|
||||
try message match {
|
||||
case LoadSnapshotResult(sso, toSnr) =>
|
||||
timeoutCancellable.cancel()
|
||||
sso.foreach {
|
||||
case SelectedSnapshot(metadata, snapshot) =>
|
||||
val offer = SnapshotOffer(metadata, snapshot)
|
||||
if (recoveryBehavior.isDefinedAt(offer)) {
|
||||
try {
|
||||
setLastSequenceNr(metadata.sequenceNr)
|
||||
// Since we are recovering we can ignore the receive behavior from the stack
|
||||
Eventsourced.super.aroundReceive(recoveryBehavior, offer)
|
||||
} catch {
|
||||
case NonFatal(t) =>
|
||||
try onRecoveryFailure(t, None)
|
||||
finally context.stop(self)
|
||||
returnRecoveryPermit()
|
||||
}
|
||||
} else {
|
||||
unhandled(offer)
|
||||
override def stateReceive(receive: Receive, message: Any): Unit = {
|
||||
def loadSnapshotResult(snapshot: Option[SelectedSnapshot], toSnr: Long): Unit = {
|
||||
timeoutCancellable.cancel()
|
||||
snapshot.foreach {
|
||||
case SelectedSnapshot(metadata, snapshot) =>
|
||||
val offer = SnapshotOffer(metadata, snapshot)
|
||||
if (recoveryBehavior.isDefinedAt(offer)) {
|
||||
try {
|
||||
setLastSequenceNr(metadata.sequenceNr)
|
||||
// Since we are recovering we can ignore the receive behavior from the stack
|
||||
Eventsourced.super.aroundReceive(recoveryBehavior, offer)
|
||||
} catch {
|
||||
case NonFatal(t) =>
|
||||
try onRecoveryFailure(t, None)
|
||||
finally context.stop(self)
|
||||
returnRecoveryPermit()
|
||||
}
|
||||
}
|
||||
changeState(recovering(recoveryBehavior, timeout))
|
||||
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
|
||||
} else {
|
||||
unhandled(offer)
|
||||
}
|
||||
}
|
||||
changeState(recovering(recoveryBehavior, timeout))
|
||||
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
|
||||
}
|
||||
|
||||
def isSnapshotOptional: Boolean = {
|
||||
try {
|
||||
Persistence(context.system).configFor(snapshotStore).getBoolean("snapshot-is-optional")
|
||||
} catch {
|
||||
case NonFatal(exc) =>
|
||||
log.error(exc, "Invalid snapshot-is-optional configuration.")
|
||||
false // fail recovery
|
||||
}
|
||||
}
|
||||
|
||||
try message match {
|
||||
case LoadSnapshotResult(snapshot, toSnr) =>
|
||||
loadSnapshotResult(snapshot, toSnr)
|
||||
|
||||
case LoadSnapshotFailed(cause) =>
|
||||
timeoutCancellable.cancel()
|
||||
try onRecoveryFailure(cause, event = None)
|
||||
finally context.stop(self)
|
||||
returnRecoveryPermit()
|
||||
if (isSnapshotOptional) {
|
||||
log.info(
|
||||
"Snapshot load error for persistenceId [{}]. Replaying all events since snapshot-is-optional=true",
|
||||
persistenceId)
|
||||
loadSnapshotResult(snapshot = None, recovery.toSequenceNr)
|
||||
} else {
|
||||
timeoutCancellable.cancel()
|
||||
try onRecoveryFailure(cause, event = None)
|
||||
finally context.stop(self)
|
||||
returnRecoveryPermit()
|
||||
}
|
||||
|
||||
case RecoveryTick(true) =>
|
||||
try onRecoveryFailure(
|
||||
|
|
@ -700,6 +721,7 @@ private[persistence] trait Eventsourced
|
|||
returnRecoveryPermit()
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private def returnRecoveryPermit(): Unit =
|
||||
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.persistence
|
||||
|
||||
import java.io.IOException
|
||||
import java.util.UUID
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -75,7 +76,7 @@ object SnapshotFailureRobustnessSpec {
|
|||
|
||||
class FailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) {
|
||||
override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
|
||||
if (metadata.sequenceNr == 2 || snapshot == "boom") {
|
||||
if (metadata.sequenceNr == 2 || snapshot.toString.startsWith("boom")) {
|
||||
val bytes = "b0rkb0rk".getBytes("UTF-8") // length >= 8 to prevent EOF exception
|
||||
val tmpFile = withOutputStream(metadata)(_.write(bytes))
|
||||
tmpFile.renameTo(snapshotFileForWrite(metadata))
|
||||
|
|
@ -205,3 +206,46 @@ class SnapshotFailureRobustnessSpec
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SnapshotIsOptionalSpec
|
||||
extends PersistenceSpec(
|
||||
PersistenceSpec.config(
|
||||
"inmem",
|
||||
"SnapshotFailureReplayEventsSpec",
|
||||
serialization = "off",
|
||||
extraConfig = Some(s"""
|
||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$$FailingLocalSnapshotStore"
|
||||
akka.persistence.snapshot-store.local.dir = "target/persistence-${UUID.randomUUID().toString}"
|
||||
akka.persistence.snapshot-store.local.snapshot-is-optional = true
|
||||
""")))
|
||||
with ImplicitSender {
|
||||
|
||||
import SnapshotFailureRobustnessSpec._
|
||||
|
||||
"A persistentActor with a failing snapshot with snapshot-is-optional=true" must {
|
||||
"fall back to events" in {
|
||||
val sPersistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor))
|
||||
|
||||
expectMsg(RecoveryCompleted)
|
||||
sPersistentActor ! Cmd("boom1")
|
||||
expectMsg(1)
|
||||
sPersistentActor ! Cmd("boom2")
|
||||
expectMsg(2)
|
||||
|
||||
system.eventStream.publish(
|
||||
TestEvent.Mute(EventFilter[java.io.NotSerializableException](start = "Error loading snapshot")))
|
||||
try {
|
||||
system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
|
||||
|
||||
expectMsg("boom1-1") // from event replay
|
||||
expectMsg("boom2-2") // from event replay
|
||||
expectMsg(RecoveryCompleted)
|
||||
expectNoMessage()
|
||||
} finally {
|
||||
system.eventStream.publish(TestEvent.UnMute(EventFilter.error(start = "Error loading snapshot [")))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue