add LoadSnapshotFailed in snapshot protocol, #21842

* treat snapshot load failure in same way as other recovery failures
* if load of snapshot fails the persistent actor will be stopped, since
  we can't assume that a consistent state would be recovered just by
  replaying all events, since events may have been  deleted
* additional recovery docs
* improve log message
This commit is contained in:
Patrik Nordwall 2016-11-17 10:39:18 +01:00
parent a5e94dd3ed
commit ea84b4bfdd
13 changed files with 153 additions and 19 deletions

View file

@ -108,6 +108,15 @@ public class LambdaPersistenceDocTest {
} }
//#recovery-completed //#recovery-completed
abstract class MyPersistentActor6 extends AbstractPersistentActor {
//#recovery-no-snap
@Override
public Recovery recovery() {
return Recovery.create(SnapshotSelectionCriteria.none());
}
//#recovery-no-snap
}
abstract class MyActor extends AbstractPersistentActor { abstract class MyActor extends AbstractPersistentActor {
//#backoff //#backoff
@Override @Override

View file

@ -85,6 +85,15 @@ public class PersistenceDocTest {
} }
//#recovery-completed //#recovery-completed
} }
abstract class MyPersistentActor6 extends UntypedPersistentActor {
//#recovery-no-snap
@Override
public Recovery recovery() {
return Recovery.create(SnapshotSelectionCriteria.none());
}
//#recovery-no-snap
}
abstract class MyActor extends UntypedPersistentActor { abstract class MyActor extends UntypedPersistentActor {
//#backoff //#backoff

View file

@ -160,12 +160,25 @@ only be received by a persistent actor after recovery completes.
as the original sender is presumed to be long gone. If you indeed have to notify an actor during as the original sender is presumed to be long gone. If you indeed have to notify an actor during
recovery in the future, store its ``ActorPath`` explicitly in your persisted events. recovery in the future, store its ``ActorPath`` explicitly in your persisted events.
.. _recovery-custom-java-lambda:
Recovery customization Recovery customization
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``AbstractPersistentActor``, for example setting an upper bound to the replay in the ``recovery`` method of a ``AbstractPersistentActor``.
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.none()``.
This can be useful if snapshot serialization format has changed in an incompatible way.
It should typically not be used when events have been deleted.
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-no-snap
Another example, which can be fun for experiments but probably not in a real application, is setting an
upper bound to the replay which allows the actor to be replayed to a certain point "in the past"
instead to its most up to date state. Note that after that it is a bad idea to persist new
events because a later recovery will probably be confused by the new events that follow the
events that were previously skipped.
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-custom .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-custom
@ -339,6 +352,8 @@ next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped.
Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots
if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-java-lambda`.
Atomic writes Atomic writes
------------- -------------

View file

@ -168,12 +168,25 @@ They are cached and received by a persistent actor after recovery phase complete
as the original sender is presumed to be long gone. If you indeed have to notify an actor during as the original sender is presumed to be long gone. If you indeed have to notify an actor during
recovery in the future, store its ``ActorPath`` explicitly in your persisted events. recovery in the future, store its ``ActorPath`` explicitly in your persisted events.
.. _recovery-custom-java:
Recovery customization Recovery customization
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``UntypedPersistentActor``, for example setting an upper bound to the replay in the ``recovery`` method of a ``UntypedPersistentActor``.
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.none()``.
This can be useful if snapshot serialization format has changed in an incompatible way.
It should typically not be used when events have been deleted.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-no-snap
Another example, which can be fun for experiments but probably not in a real application, is setting an
upper bound to the replay which allows the actor to be replayed to a certain point "in the past"
instead to its most up to date state. Note that after that it is a bad idea to persist new
events because a later recovery will probably be confused by the new events that follow the
events that were previously skipped.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-custom .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-custom
@ -359,6 +372,8 @@ next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped.
Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots
if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-java`.
Atomic writes Atomic writes
------------- -------------

View file

@ -59,6 +59,13 @@ object PersistenceDocSpec {
} }
//#recovery-completed //#recovery-completed
} }
trait MyPersistentActor5 extends PersistentActor {
//#recovery-no-snap
override def recovery =
Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
//#recovery-no-snap
}
} }
object PersistenceId { object PersistenceId {

View file

@ -153,12 +153,25 @@ They are cached and received by a persistent actor after recovery phase complete
as the original sender is presumed to be long gone. If you indeed have to notify an actor during as the original sender is presumed to be long gone. If you indeed have to notify an actor during
recovery in the future, store its ``ActorPath`` explicitly in your persisted events. recovery in the future, store its ``ActorPath`` explicitly in your persisted events.
.. _recovery-custom-scala:
Recovery customization Recovery customization
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``PersistentActor``, for example setting an upper bound to the replay in the ``recovery`` method of a ``PersistentActor``,
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.None``.
This can be useful if snapshot serialization format has changed in an incompatible way.
It should typically not be used when events have been deleted.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-no-snap
Another example, which can be fun for experiments but probably not in a real application, is setting an
upper bound to the replay which allows the actor to be replayed to a certain point "in the past"
instead to its most up to date state. Note that after that it is a bad idea to persist new
events because a later recovery will probably be confused by the new events that follow the
events that were previously skipped.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-custom .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-custom
@ -345,6 +358,8 @@ next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped.
Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots
if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-scala`.
Atomic writes Atomic writes
------------- -------------

View file

@ -215,7 +215,8 @@ akka.persistence.snapshot-store.local {
# Number load attempts when recovering from the latest snapshot fails # Number load attempts when recovering from the latest snapshot fails
# yet older snapshot files are available. Each recovery attempt will try # yet older snapshot files are available. Each recovery attempt will try
# to recover using an older than previously failed-on snapshot file # to recover using an older than previously failed-on snapshot file
# (if any are present). # (if any are present). If all attempts fail the recovery will fail and
# the persistent actor will be stopped.
max-load-attempts = 3 max-load-attempts = 3
} }

View file

@ -45,6 +45,7 @@ private[persistence] object Eventsourced {
private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery { private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
import JournalProtocol._ import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult import SnapshotProtocol.LoadSnapshotResult
import SnapshotProtocol.LoadSnapshotFailed
import Eventsourced._ import Eventsourced._
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
@ -502,6 +503,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
changeState(recovering(recoveryBehavior, timeout)) changeState(recovering(recoveryBehavior, timeout))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case LoadSnapshotFailed(cause)
timeoutCancellable.cancel()
try onRecoveryFailure(cause, event = None) finally context.stop(self)
case RecoveryTick(true) case RecoveryTick(true)
try onRecoveryFailure( try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"),

View file

@ -208,6 +208,12 @@ private[persistence] object SnapshotProtocol {
final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long) final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
extends Response extends Response
/**
* Reply message to a failed [[LoadSnapshot]] request.
* @param cause failure cause.
*/
final case class LoadSnapshotFailed(cause: Throwable) extends Response
/** /**
* Instructs snapshot store to save a snapshot. * Instructs snapshot store to save a snapshot.
* *

View file

@ -183,7 +183,7 @@ final class PersistencePluginProxy(config: Config) extends Actor with Stash with
case req: SnapshotProtocol.Request req match { // exhaustive match case req: SnapshotProtocol.Request req match { // exhaustive match
case LoadSnapshot(persistenceId, criteria, toSequenceNr) case LoadSnapshot(persistenceId, criteria, toSequenceNr)
sender() ! LoadSnapshotResult(None, toSequenceNr) sender() ! LoadSnapshotFailed(timeoutException)
case SaveSnapshot(metadata, snapshot) case SaveSnapshot(metadata, snapshot)
sender() ! SaveSnapshotFailure(metadata, timeoutException) sender() ! SaveSnapshotFailure(metadata, timeoutException)
case DeleteSnapshot(metadata) case DeleteSnapshot(metadata)

View file

@ -37,7 +37,7 @@ trait SnapshotStore extends Actor with ActorLogging {
breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map { breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map {
sso LoadSnapshotResult(sso, toSequenceNr) sso LoadSnapshotResult(sso, toSequenceNr)
} recover { } recover {
case e LoadSnapshotResult(None, toSequenceNr) case e LoadSnapshotFailed(e)
} pipeTo senderPersistentActor() } pipeTo senderPersistentActor()
case SaveSnapshot(metadata, snapshot) case SaveSnapshot(metadata, snapshot)
@ -96,6 +96,12 @@ trait SnapshotStore extends Actor with ActorLogging {
/** /**
* Plugin API: asynchronously loads a snapshot. * Plugin API: asynchronously loads a snapshot.
* *
* If the future `Option` is `None` then all events will be replayed,
* i.e. there was no snapshot. If snapshot could not be loaded the `Future`
* should be completed with failure. That is important because events may
* have been deleted and just replaying the events might not result in a valid
* state.
*
* This call is protected with a circuit-breaker. * This call is protected with a circuit-breaker.
* *
* @param persistenceId id of the persistent actor. * @param persistenceId id of the persistent actor.

View file

@ -48,7 +48,12 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
// Hence, an attempt to load that snapshot will fail but loading an older snapshot may succeed. // Hence, an attempt to load that snapshot will fail but loading an older snapshot may succeed.
// //
val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts) val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts)
Future(load(metadata))(streamDispatcher) Future {
load(metadata) match {
case Success(s) s
case Failure(e) throw e // all attempts failed, fail the future
}
}(streamDispatcher)
} }
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
@ -86,14 +91,19 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
} }
@scala.annotation.tailrec @scala.annotation.tailrec
private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { private def load(metadata: immutable.Seq[SnapshotMetadata]): Try[Option[SelectedSnapshot]] = metadata.lastOption match {
case None None case None Success(None) // no snapshots stored
case Some(md) case Some(md)
Try(withInputStream(md)(deserialize)) match { Try(withInputStream(md)(deserialize)) match {
case Success(s) Some(SelectedSnapshot(md, s.data)) case Success(s)
Success(Some(SelectedSnapshot(md, s.data)))
case Failure(e) case Failure(e)
log.error(e, s"Error loading snapshot [${md}]") val remaining = metadata.init
load(metadata.init) // try older snapshot log.error(e, s"Error loading snapshot [{}], remaining attempts: [{}]", md, remaining.size)
if (remaining.isEmpty)
Failure(e) // all attempts failed
else
load(remaining) // try older snapshot
} }
} }
@ -121,7 +131,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
try { p(stream) } finally { stream.close() } try { p(stream) } finally { stream.close() }
/** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */ /** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */
private def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File = protected def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = {

View file

@ -73,9 +73,10 @@ object SnapshotFailureRobustnessSpec {
class FailingLocalSnapshotStore extends LocalSnapshotStore { class FailingLocalSnapshotStore extends LocalSnapshotStore {
override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
if (metadata.sequenceNr == 2) { if (metadata.sequenceNr == 2 || snapshot == "boom") {
val bytes = "b0rk".getBytes("UTF-8") val bytes = "b0rk".getBytes("UTF-8")
withOutputStream(metadata)(_.write(bytes)) val tmpFile = withOutputStream(metadata)(_.write(bytes))
tmpFile.renameTo(snapshotFileForWrite(metadata))
} else super.save(metadata, snapshot) } else super.save(metadata, snapshot)
} }
} }
@ -112,10 +113,11 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf
sPersistentActor ! Cmd("kablama") sPersistentActor ! Cmd("kablama")
expectMsg(2) expectMsg(2)
system.eventStream.publish(TestEvent.Mute( system.eventStream.publish(TestEvent.Mute(
EventFilter.error(start = "Error loading snapshot ["))) EventFilter[java.io.NotSerializableException](start = "Error loading snapshot")))
system.eventStream.subscribe(testActor, classOf[Logging.Error]) system.eventStream.subscribe(testActor, classOf[Logging.Error])
try { try {
val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot")
expectMsgPF() { expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) case (SnapshotMetadata(`persistenceId`, 1, timestamp), state)
state should ===("blahonga") state should ===("blahonga")
@ -131,6 +133,40 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf
} }
} }
"fail recovery and stop actor when no snapshot could be loaded" in {
val sPersistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor))
val persistenceId = name
expectMsg(RecoveryCompleted)
sPersistentActor ! Cmd("ok")
expectMsg(1)
// max-attempts = 3
sPersistentActor ! Cmd("boom")
expectMsg(2)
sPersistentActor ! Cmd("boom")
expectMsg(3)
sPersistentActor ! Cmd("boom")
expectMsg(4)
system.eventStream.publish(TestEvent.Mute(
EventFilter[java.io.NotSerializableException](start = "Error loading snapshot")))
system.eventStream.publish(TestEvent.Mute(
EventFilter[java.io.NotSerializableException](start = "Persistence failure")))
system.eventStream.subscribe(testActor, classOf[Logging.Error])
try {
val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
(1 to 3).foreach { _
expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot")
}
expectMsgType[Logging.Error].message.toString should startWith("Persistence failure")
watch(lPersistentActor)
expectTerminated(lPersistentActor)
} finally {
system.eventStream.unsubscribe(testActor, classOf[Logging.Error])
system.eventStream.publish(TestEvent.UnMute(
EventFilter.error(start = "Error loading snapshot [")))
}
}
"receive failure message when deleting a single snapshot fails" in { "receive failure message when deleting a single snapshot fails" in {
val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val persistenceId = name val persistenceId = name