From ea84b4bfdd126e5ed47474baa58ec80098b14893 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Nov 2016 10:39:18 +0100 Subject: [PATCH] add LoadSnapshotFailed in snapshot protocol, #21842 * treat snapshot load failure in same way as other recovery failures * if load of snapshot fails the persistent actor will be stopped, since we can't assume that a consistent state would be recovered just by replaying all events, since events may have been deleted * additional recovery docs * improve log message --- .../persistence/LambdaPersistenceDocTest.java | 9 ++++ .../docs/persistence/PersistenceDocTest.java | 9 ++++ akka-docs/rst/java/lambda-persistence.rst | 19 ++++++++- akka-docs/rst/java/persistence.rst | 19 ++++++++- .../docs/persistence/PersistenceDocSpec.scala | 7 ++++ akka-docs/rst/scala/persistence.rst | 19 ++++++++- .../src/main/resources/reference.conf | 3 +- .../scala/akka/persistence/Eventsourced.scala | 5 +++ .../akka/persistence/SnapshotProtocol.scala | 6 +++ .../journal/PersistencePluginProxy.scala | 2 +- .../persistence/snapshot/SnapshotStore.scala | 8 +++- .../snapshot/local/LocalSnapshotStore.scala | 24 +++++++---- .../SnapshotFailureRobustnessSpec.scala | 42 +++++++++++++++++-- 13 files changed, 153 insertions(+), 19 deletions(-) diff --git a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index 4c5512cef8..1fc77894b6 100644 --- a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -108,6 +108,15 @@ public class LambdaPersistenceDocTest { } //#recovery-completed + abstract class MyPersistentActor6 extends AbstractPersistentActor { + //#recovery-no-snap + @Override + public Recovery recovery() { + return Recovery.create(SnapshotSelectionCriteria.none()); + } + //#recovery-no-snap + } + abstract class MyActor extends AbstractPersistentActor { //#backoff @Override diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 764f3c52cf..92f667f02d 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -85,6 +85,15 @@ public class PersistenceDocTest { } //#recovery-completed } + + abstract class MyPersistentActor6 extends UntypedPersistentActor { + //#recovery-no-snap + @Override + public Recovery recovery() { + return Recovery.create(SnapshotSelectionCriteria.none()); + } + //#recovery-no-snap + } abstract class MyActor extends UntypedPersistentActor { //#backoff diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 3cd002c1f8..1c5076d191 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -160,12 +160,25 @@ only be received by a persistent actor after recovery completes. as the original sender is presumed to be long gone. If you indeed have to notify an actor during recovery in the future, store its ``ActorPath`` explicitly in your persisted events. +.. _recovery-custom-java-lambda: + Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ Applications may also customise how recovery is performed by returning a customised ``Recovery`` object -in the ``recovery`` method of a ``AbstractPersistentActor``, for example setting an upper bound to the replay -which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: +in the ``recovery`` method of a ``AbstractPersistentActor``. + +To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.none()``. +This can be useful if snapshot serialization format has changed in an incompatible way. +It should typically not be used when events have been deleted. + +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-no-snap + +Another example, which can be fun for experiments but probably not in a real application, is setting an +upper bound to the replay which allows the actor to be replayed to a certain point "in the past" +instead to its most up to date state. Note that after that it is a bad idea to persist new +events because a later recovery will probably be confused by the new events that follow the +events that were previously skipped. .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-custom @@ -339,6 +352,8 @@ next message. If there is a problem with recovering the state of the actor from the journal when the actor is started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. +Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots +if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-java-lambda`. Atomic writes ------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 15bd77621d..b9bef5d105 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -168,12 +168,25 @@ They are cached and received by a persistent actor after recovery phase complete as the original sender is presumed to be long gone. If you indeed have to notify an actor during recovery in the future, store its ``ActorPath`` explicitly in your persisted events. +.. _recovery-custom-java: + Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ Applications may also customise how recovery is performed by returning a customised ``Recovery`` object -in the ``recovery`` method of a ``UntypedPersistentActor``, for example setting an upper bound to the replay -which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: +in the ``recovery`` method of a ``UntypedPersistentActor``. + +To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.none()``. +This can be useful if snapshot serialization format has changed in an incompatible way. +It should typically not be used when events have been deleted. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-no-snap + +Another example, which can be fun for experiments but probably not in a real application, is setting an +upper bound to the replay which allows the actor to be replayed to a certain point "in the past" +instead to its most up to date state. Note that after that it is a bad idea to persist new +events because a later recovery will probably be confused by the new events that follow the +events that were previously skipped. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-custom @@ -359,6 +372,8 @@ next message. If there is a problem with recovering the state of the actor from the journal when the actor is started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. +Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots +if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-java`. Atomic writes ------------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 19d2c3946c..d9b69fa2ad 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -59,6 +59,13 @@ object PersistenceDocSpec { } //#recovery-completed } + + trait MyPersistentActor5 extends PersistentActor { + //#recovery-no-snap + override def recovery = + Recovery(fromSnapshot = SnapshotSelectionCriteria.None) + //#recovery-no-snap + } } object PersistenceId { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index afdc2fb86f..5a3ea8acbc 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -153,12 +153,25 @@ They are cached and received by a persistent actor after recovery phase complete as the original sender is presumed to be long gone. If you indeed have to notify an actor during recovery in the future, store its ``ActorPath`` explicitly in your persisted events. +.. _recovery-custom-scala: + Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ Applications may also customise how recovery is performed by returning a customised ``Recovery`` object -in the ``recovery`` method of a ``PersistentActor``, for example setting an upper bound to the replay -which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: +in the ``recovery`` method of a ``PersistentActor``, + +To skip loading snapshots and replay all events you can use ``SnapshotSelectionCriteria.None``. +This can be useful if snapshot serialization format has changed in an incompatible way. +It should typically not be used when events have been deleted. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-no-snap + +Another example, which can be fun for experiments but probably not in a real application, is setting an +upper bound to the replay which allows the actor to be replayed to a certain point "in the past" +instead to its most up to date state. Note that after that it is a bad idea to persist new +events because a later recovery will probably be confused by the new events that follow the +events that were previously skipped. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-custom @@ -345,6 +358,8 @@ next message. If there is a problem with recovering the state of the actor from the journal when the actor is started, ``onRecoveryFailure`` is called (logging the error by default), and the actor will be stopped. +Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots +if you for example know that serialization format has changed in an incompatible way, see :ref:`recovery-custom-scala`. Atomic writes ------------- diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 4619f42756..57fcaa6490 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -215,7 +215,8 @@ akka.persistence.snapshot-store.local { # Number load attempts when recovering from the latest snapshot fails # yet older snapshot files are available. Each recovery attempt will try # to recover using an older than previously failed-on snapshot file - # (if any are present). + # (if any are present). If all attempts fail the recovery will fail and + # the persistent actor will be stopped. max-load-attempts = 3 } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index f14a55def5..eae82b1bb3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -45,6 +45,7 @@ private[persistence] object Eventsourced { private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery { import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult + import SnapshotProtocol.LoadSnapshotFailed import Eventsourced._ private val extension = Persistence(context.system) @@ -502,6 +503,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas changeState(recovering(recoveryBehavior, timeout)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) + case LoadSnapshotFailed(cause) ⇒ + timeoutCancellable.cancel() + try onRecoveryFailure(cause, event = None) finally context.stop(self) + case RecoveryTick(true) ⇒ try onRecoveryFailure( new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index 8a489dbd33..82de87ca21 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -208,6 +208,12 @@ private[persistence] object SnapshotProtocol { final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long) extends Response + /** + * Reply message to a failed [[LoadSnapshot]] request. + * @param cause failure cause. + */ + final case class LoadSnapshotFailed(cause: Throwable) extends Response + /** * Instructs snapshot store to save a snapshot. * diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala index 052f04153c..be58e5c40b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala @@ -183,7 +183,7 @@ final class PersistencePluginProxy(config: Config) extends Actor with Stash with case req: SnapshotProtocol.Request ⇒ req match { // exhaustive match case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ - sender() ! LoadSnapshotResult(None, toSequenceNr) + sender() ! LoadSnapshotFailed(timeoutException) case SaveSnapshot(metadata, snapshot) ⇒ sender() ! SaveSnapshotFailure(metadata, timeoutException) case DeleteSnapshot(metadata) ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 8cce9ad16d..b0deb8ffc3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -37,7 +37,7 @@ trait SnapshotStore extends Actor with ActorLogging { breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { - case e ⇒ LoadSnapshotResult(None, toSequenceNr) + case e ⇒ LoadSnapshotFailed(e) } pipeTo senderPersistentActor() case SaveSnapshot(metadata, snapshot) ⇒ @@ -96,6 +96,12 @@ trait SnapshotStore extends Actor with ActorLogging { /** * Plugin API: asynchronously loads a snapshot. * + * If the future `Option` is `None` then all events will be replayed, + * i.e. there was no snapshot. If snapshot could not be loaded the `Future` + * should be completed with failure. That is important because events may + * have been deleted and just replaying the events might not result in a valid + * state. + * * This call is protected with a circuit-breaker. * * @param persistenceId id of the persistent actor. diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index cc326f3528..030e21c7dd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -48,7 +48,12 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // Hence, an attempt to load that snapshot will fail but loading an older snapshot may succeed. // val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts) - Future(load(metadata))(streamDispatcher) + Future { + load(metadata) match { + case Success(s) ⇒ s + case Failure(e) ⇒ throw e // all attempts failed, fail the future + } + }(streamDispatcher) } override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { @@ -86,14 +91,19 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo } @scala.annotation.tailrec - private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { - case None ⇒ None + private def load(metadata: immutable.Seq[SnapshotMetadata]): Try[Option[SelectedSnapshot]] = metadata.lastOption match { + case None ⇒ Success(None) // no snapshots stored case Some(md) ⇒ Try(withInputStream(md)(deserialize)) match { - case Success(s) ⇒ Some(SelectedSnapshot(md, s.data)) + case Success(s) ⇒ + Success(Some(SelectedSnapshot(md, s.data))) case Failure(e) ⇒ - log.error(e, s"Error loading snapshot [${md}]") - load(metadata.init) // try older snapshot + val remaining = metadata.init + log.error(e, s"Error loading snapshot [{}], remaining attempts: [{}]", md, remaining.size) + if (remaining.isEmpty) + Failure(e) // all attempts failed + else + load(remaining) // try older snapshot } } @@ -121,7 +131,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo try { p(stream) } finally { stream.close() } /** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */ - private def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File = + protected def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index 7c837558c8..81859da6c9 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -73,9 +73,10 @@ object SnapshotFailureRobustnessSpec { class FailingLocalSnapshotStore extends LocalSnapshotStore { override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { - if (metadata.sequenceNr == 2) { + if (metadata.sequenceNr == 2 || snapshot == "boom") { val bytes = "b0rk".getBytes("UTF-8") - withOutputStream(metadata)(_.write(bytes)) + val tmpFile = withOutputStream(metadata)(_.write(bytes)) + tmpFile.renameTo(snapshotFileForWrite(metadata)) } else super.save(metadata, snapshot) } } @@ -112,10 +113,11 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf sPersistentActor ! Cmd("kablama") expectMsg(2) system.eventStream.publish(TestEvent.Mute( - EventFilter.error(start = "Error loading snapshot ["))) + EventFilter[java.io.NotSerializableException](start = "Error loading snapshot"))) system.eventStream.subscribe(testActor, classOf[Logging.Error]) try { val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot") expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) ⇒ state should ===("blahonga") @@ -131,6 +133,40 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf } } + "fail recovery and stop actor when no snapshot could be loaded" in { + val sPersistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor)) + val persistenceId = name + + expectMsg(RecoveryCompleted) + sPersistentActor ! Cmd("ok") + expectMsg(1) + // max-attempts = 3 + sPersistentActor ! Cmd("boom") + expectMsg(2) + sPersistentActor ! Cmd("boom") + expectMsg(3) + sPersistentActor ! Cmd("boom") + expectMsg(4) + system.eventStream.publish(TestEvent.Mute( + EventFilter[java.io.NotSerializableException](start = "Error loading snapshot"))) + system.eventStream.publish(TestEvent.Mute( + EventFilter[java.io.NotSerializableException](start = "Persistence failure"))) + system.eventStream.subscribe(testActor, classOf[Logging.Error]) + try { + val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + (1 to 3).foreach { _ ⇒ + expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot") + } + expectMsgType[Logging.Error].message.toString should startWith("Persistence failure") + watch(lPersistentActor) + expectTerminated(lPersistentActor) + } finally { + system.eventStream.unsubscribe(testActor, classOf[Logging.Error]) + system.eventStream.publish(TestEvent.UnMute( + EventFilter.error(start = "Error loading snapshot ["))) + } + } + "receive failure message when deleting a single snapshot fails" in { val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) val persistenceId = name