From 874d07c0ae30767a276ef768f66ca37f2eb20406 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 2 Jul 2015 00:44:10 +0200 Subject: [PATCH 1/2] !per #17518 harmonize and document failure handling in persistence + added tests for all failure scenarios (callbacks) --- .../persistence/LambdaPersistenceDocTest.java | 18 +++-- .../docs/persistence/PersistenceDocTest.java | 32 +++----- akka-docs/rst/java/lambda-persistence.rst | 55 +++++++++++-- akka-docs/rst/java/persistence.rst | 53 +++++++++++-- .../project/migration-guide-2.3.x-2.4.x.rst | 9 +++ .../docs/persistence/PersistenceDocSpec.scala | 11 +-- akka-docs/rst/scala/persistence.rst | 75 +++++++++++++++--- .../persistence/journal/JournalSpec.scala | 22 +++--- .../scala/akka/persistence/Eventsourced.scala | 46 +++++------ .../akka/persistence/JournalProtocol.scala | 8 +- .../akka/persistence/PersistentView.scala | 6 +- .../akka/persistence/SnapshotProtocol.scala | 3 +- .../persistence/journal/AsyncRecovery.scala | 4 +- .../journal/AsyncWriteJournal.scala | 17 ++-- .../AtLeastOnceDeliveryFailureSpec.scala | 2 +- .../EndToEndEventAdapterSpec.scala | 2 - .../PersistentActorDeleteFailureSpec.scala | 77 +++++++++++++++++++ .../PersistentActorFailureSpec.scala | 50 ++++++++---- .../persistence/PersistentActorSpec.scala | 29 ++++++- .../SnapshotRecoveryLocalStoreSpec.scala | 1 - .../SnapshotSerializationSpec.scala | 1 - .../stream/PersistentPublisher.scala | 4 +- 22 files changed, 390 insertions(+), 135 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala diff --git a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index 66f937591c..d5f3207b43 100644 --- a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -258,6 +258,16 @@ public class LambdaPersistenceDocTest { static Object o5 = new Object() { class MyPersistentActor extends AbstractPersistentActor { + + //#snapshot-criteria + @Override + public Recovery recovery() { + return Recovery.create( + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())); + } + //#snapshot-criteria + //#snapshot-offer private Object state; @@ -291,14 +301,6 @@ public class LambdaPersistenceDocTest { match(Object.class, o -> {/* ... */}).build() ); } - - private void recover() { - //#snapshot-criteria - persistentActor.tell(Recovery.create( - SnapshotSelectionCriteria - .create(457L, System.currentTimeMillis())), null); - //#snapshot-criteria - } } }; diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index ce6002f338..e9963615d7 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -240,9 +240,19 @@ public class PersistenceDocTest { static Object o5 = new Object() { class MyPersistentActor extends UntypedPersistentActor { + + //#snapshot-criteria + @Override + public Recovery recovery() { + return Recovery.create( + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())); + } + //#snapshot-criteria + @Override public String persistenceId() { return "persistence-id"; } - + //#snapshot-offer private Object state; @@ -258,30 +268,12 @@ public class PersistenceDocTest { } } //#snapshot-offer - + @Override public void onReceiveCommand(Object message) { } } - class MyActor extends UntypedActor { - ActorRef persistentActor; - - public MyActor() { - persistentActor = getContext().actorOf(Props.create(MyPersistentActor.class)); - } - - public void onReceive(Object message) throws Exception { - // ... - } - - private void recover() { - //#snapshot-criteria - persistentActor.tell(Recovery.create(SnapshotSelectionCriteria.create(457L, - System.currentTimeMillis())), null); - //#snapshot-criteria - } - } }; static Object o9 = new Object() { diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index a2003ba439..30aa9ab83f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -204,7 +204,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. @@ -293,6 +293,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-lambda: + Failures -------- @@ -312,10 +314,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. - -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -346,11 +345,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. + +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-lambda`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem .. _persistent-views-java-lambda: diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b8470651e4..1db0257c0b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -206,7 +206,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. .. _persist-async-java: @@ -296,6 +296,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-java: + Failures -------- @@ -315,7 +317,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -346,14 +348,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-java`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem .. _persistent-views-java: diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 22052a368a..e04211f0ff 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -489,3 +489,12 @@ The ``permanent`` deletion flag was removed. Support for non-permanent deletions removed. Events that were deleted with ``permanent=false`` with older version will still not be replayed in this version. +References to "replay" in names +------------------------------- +Previously a number of classes and methods used the word "replay" interchangeably with the word "recover". +This lead to slight inconsistencies in APIs, where a method would be called ``recovery``, yet the +signal for a completed recovery was named ``ReplayMessagesSuccess``. + +This is now fixed, and all methods use the same "recovery" wording consistently across the entire API. +The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure`` +has been introduced. \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 36c621b165..7228b31be3 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -166,6 +166,12 @@ object PersistenceDocSpec { class MyPersistentActor extends PersistentActor { override def persistenceId = "my-stable-persistence-id" + //#snapshot-criteria + override def recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria( + maxSequenceNr = 457L, + maxTimestamp = System.currentTimeMillis)) + //#snapshot-criteria + //#snapshot-offer var state: Any = _ @@ -179,11 +185,6 @@ object PersistenceDocSpec { override def receiveCommand: Receive = ??? } - //#snapshot-criteria - persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria( - maxSequenceNr = 457L, - maxTimestamp = System.currentTimeMillis)) - //#snapshot-criteria } object PersistAsync { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0eaad129fd..ed9abd8190 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -190,7 +190,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. .. _persist-async-scala: @@ -282,6 +282,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-scala: + Failures -------- @@ -301,10 +303,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. - -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -334,15 +333,55 @@ the maximum throughput dramatically. A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. - Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. + +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoveryCompleted`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-scala`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem + .. _persistent-views: @@ -451,6 +490,24 @@ when the snapshot was taken. To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +Snapshot status handling +------------------------ + +Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via +status messages as illustrated in the following table. + +============================================== ========================== ============================== +**Method** **Success** **Failure message** +============================================== ========================== ============================== +``saveSnapshot`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` +``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure`` +``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure`` +============================================== ========================== ============================== + +If failure messages are left unhandled by the actor, a default warning log message will be logged for each incoming failure message. +No default action is performed on the success messages, however you're free to handle them e.g. in order to delete +an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot aggain. + .. _at-least-once-delivery: At-Least-Once Delivery diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 64dd33bb34..72900b1e7a 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -92,54 +92,54 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { "replay all messages" in { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower sequence number bound" in { journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using an upper sequence number bound" in { journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a count limit" in { journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound" in { journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) 2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound and a count limit" in { journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) 2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay a single if lower sequence number bound equals upper sequence number bound" in { journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay a single message if count limit equals 1" in { journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if count limit equals 0" in { journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if lower sequence number bound is greater than upper sequence number bound" in { journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if the persistent actor has not yet written messages" in { journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 0L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 0L)) } "not replay permanently deleted messages (range deletion)" in { val receiverProbe2 = TestProbe() diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 181e54117a..23088fd52e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -107,7 +107,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param event the event that was processed in `receiveRecover`, if the exception * was thrown there */ - protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = event match { case Some(evt) ⇒ log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + @@ -148,18 +148,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas event.getClass.getName, seqNr, persistenceId, cause.getMessage) } - /** - * Called when ``deleteMessages`` failed. By default this method logs the problem - * as a warning, and the actor continues. - * - * @param cause failure cause - * @param toSequenceNr the sequence number parameter of the ``deleteMessages`` call - */ - protected def onDeleteMessagesFailure(cause: Throwable, toSequenceNr: Long): Unit = { - log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}].", - toSequenceNr, persistenceId, cause.getMessage) - } - private def startRecovery(recovery: Recovery): Unit = { changeState(recoveryStarted(recovery.replayMax)) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) @@ -216,7 +204,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def unhandled(message: Any): Unit = { message match { case RecoveryCompleted ⇒ // mute - case m ⇒ super.unhandled(m) + case SaveSnapshotFailure(m, e) ⇒ + log.warning("Failed to saveSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage) + case DeleteSnapshotFailure(m, e) ⇒ + log.warning("Failed to deleteSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage) + case DeleteSnapshotsFailure(c, e) ⇒ + log.warning("Failed to deleteSnapshots given criteria [{}] due to: [{}: {}]", c, e.getClass.getCanonicalName, e.getMessage) + case DeleteMessagesFailure(e, toSequenceNr) ⇒ + log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}: {}].", + toSequenceNr, persistenceId, e.getClass.getCanonicalName, e.getMessage) + case m ⇒ super.unhandled(m) } } @@ -393,7 +390,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * - * If the delete fails [[#onDeleteMessagesFailure]] will be called. + * If the delete fails an [[akka.persistence.JournalProtocol.DeleteMessagesFailure]] will be sent to the actor. * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ @@ -425,7 +422,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receiveRecover`. Then initiates a message replay, either starting - * from the loaded snapshot or from scratch, and switches to `replayStarted` state. + * from the loaded snapshot or from scratch, and switches to `recoveryStarted` state. * All incoming messages are stashed. * * @param replayMax maximum number of messages to replay. @@ -456,7 +453,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // Since we are recovering we can ignore the receive behavior from the stack Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) } - changeState(replayStarted(recoveryBehavior)) + changeState(recovering(recoveryBehavior)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ internalStash.stash() } @@ -468,11 +465,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * * If replay succeeds it got highest stored sequence number response from the journal and then switches * to `processingCommands` state. Otherwise the actor is stopped. - * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. + * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onRecoveryFailure`. * * All incoming messages are stashed. */ - private def replayStarted(recoveryBehavior: Receive) = new State { + private def recovering(recoveryBehavior: Receive) = new State { override def toString: String = s"replay started" override def recoveryRunning: Boolean = true @@ -483,9 +480,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ - try onReplayFailure(t, Some(p.payload)) finally context.stop(self) + try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) } - case ReplayMessagesSuccess(highestSeqNr) ⇒ + case RecoverySuccess(highestSeqNr) ⇒ onReplaySuccess() // callback for subclass implementation changeState(processingCommands) sequenceNr = highestSeqNr @@ -493,7 +490,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) case ReplayMessagesFailure(cause) ⇒ - try onReplayFailure(cause, event = None) finally context.stop(self) + try onRecoveryFailure(cause, event = None) finally context.stop(self) case other ⇒ internalStash.stash() } @@ -581,10 +578,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case WriteMessagesFailed(_) ⇒ () // it will be stopped by the first WriteMessageFailure message - - case DeleteMessagesFailure(e, toSequenceNr) ⇒ - onDeleteMessagesFailure(e, toSequenceNr) - } def onWriteMessageComplete(err: Boolean): Unit = @@ -616,7 +609,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas else internalStash.unstash() } - } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 5a3d72cd5a..cd7ca2eaee 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -22,6 +22,12 @@ private[persistence] object JournalProtocol { /** Internal journal acknowledgement. */ sealed trait Response extends Message + /** + * Reply message to a successful [[DeleteMessagesTo]] request. + */ + final case class DeleteMessagesSuccess(toSequenceNr: Long) + extends Response + /** * Reply message to a failed [[DeleteMessagesTo]] request. */ @@ -128,7 +134,7 @@ private[persistence] object JournalProtocol { * * @param highestSequenceNr highest stored sequence number. */ - case class ReplayMessagesSuccess(highestSequenceNr: Long) + case class RecoverySuccess(highestSequenceNr: Long) extends Response with DeadLetterSuppression /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 6e22a7b683..d3585aff02 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -293,7 +293,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory case NonFatal(t) ⇒ changeState(ignoreRemainingReplay(t)) } - case _: ReplayMessagesSuccess ⇒ + case _: RecoverySuccess ⇒ onReplayComplete() case ReplayMessagesFailure(cause) ⇒ try onReplayError(cause) finally onReplayComplete() @@ -339,8 +339,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory // replay must be a full replay (up to the highest stored sequence number) // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) - case _: ReplayMessagesSuccess ⇒ replayCompleted(receive) - case _ ⇒ internalStash.stash() + case _: RecoverySuccess ⇒ replayCompleted(receive) + case _ ⇒ internalStash.stash() } def replayCompleted(receive: Receive): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index 556d5ea879..042b7be716 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -12,8 +12,7 @@ package akka.persistence * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. */ -@SerialVersionUID(1L) // -//#snapshot-metadata +@SerialVersionUID(1L) //#snapshot-metadata final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) //#snapshot-metadata diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index 5b3c6f9912..5c7e14877f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -31,13 +31,13 @@ trait AsyncRecovery { * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. - * @param replayCallback called to replay a single message. Can be called from any + * @param recoveryCallback called to replay a single message. Can be called from any * thread. * * @see [[AsyncWriteJournal]] */ def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] + max: Long)(recoveryCallback: PersistentRepr ⇒ Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3b4ab470a7..3ebda5bf2a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -8,13 +8,11 @@ package akka.persistence.journal import akka.actor._ import akka.pattern.pipe import akka.persistence._ + import scala.collection.immutable import scala.concurrent.Future +import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal -import scala.util.Try -import scala.util.Success -import scala.util.Failure -import akka.AkkaException /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -110,7 +108,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { }.map(_ ⇒ highSeqNr) } }.map { - highSeqNr ⇒ ReplayMessagesSuccess(highSeqNr) + highSeqNr ⇒ RecoverySuccess(highSeqNr) }.recover { case e ⇒ ReplayMessagesFailure(e) }.pipeTo(persistentActor).onSuccess { @@ -118,9 +116,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ - asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { - case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) - case Failure(e) ⇒ persistentActor ! DeleteMessagesFailure(e, toSequenceNr) + asyncDeleteMessagesTo(persistenceId, toSequenceNr) map { + case _ ⇒ DeleteMessagesSuccess(toSequenceNr) + } recover { + case e ⇒ DeleteMessagesFailure(e, toSequenceNr) + } pipeTo persistentActor onComplete { + case _ if publish ⇒ context.system.eventStream.publish(d) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index b22bfbde7e..b1b2693ab7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -113,7 +113,7 @@ object AtLeastOnceDeliveryFailureSpec { private def debugMessage(msg: String): String = s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" - override protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = { + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = { // mute logging } diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala index 868f56f2fe..7b9d0f64e6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -88,8 +88,6 @@ object EndToEndEventAdapterSpec { } override def receiveCommand = persistIncoming - override def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = - probe.foreach { _ ! cause } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala new file mode 100644 index 0000000000..bc511df98d --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ +import akka.event.Logging +import akka.event.Logging.Warning +import akka.persistence.JournalProtocol.DeleteMessagesFailure +import akka.persistence.journal.inmem.InmemJournal +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.control.NoStackTrace + +object PersistentActorDeleteFailureSpec { + + case class DeleteTo(n: Long) + class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace + class SimulatedSerializationException(msg: String) extends RuntimeException(msg) with NoStackTrace + + class DeleteFailingInmemJournal extends InmemJournal { + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = + Future.failed(new SimulatedException("Boom! Unable to delete events!")) + } + + class DoesNotHandleDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor { + override def persistenceId = name + override def receiveCommand: Receive = { + case DeleteTo(n) ⇒ deleteMessages(n) + } + override def receiveRecover: Receive = Actor.emptyBehavior + } + + class HandlesDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor { + override def persistenceId = name + override def receiveCommand: Receive = { + case DeleteTo(n) ⇒ deleteMessages(n) + case f: DeleteMessagesFailure ⇒ probe ! f + } + override def receiveRecover: Receive = Actor.emptyBehavior + } + +} + +class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( + """ + akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorDeleteFailureSpec$DeleteFailingInmemJournal" + """))) with ImplicitSender { + import PersistentActorDeleteFailureSpec._ + + system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]())) + + "A persistent actor" must { + "have default warn logging be triggered, when deletion failed" in { + val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor)) + system.eventStream.subscribe(testActor, classOf[Logging.Warning]) + persistentActor ! DeleteTo(100) + val message = expectMsgType[Warning].message.toString + message should include("Failed to deleteMessages") + message should include("Boom! Unable to delete events!") // the `cause` message + } + + "be receive an DeleteMessagesFailure when deletion failed, and the default logging should not be triggered" in { + val persistentActor = system.actorOf(Props(classOf[HandlesDeleteFailureActor], name, testActor)) + system.eventStream.subscribe(testActor, classOf[Logging.Warning]) + persistentActor ! DeleteTo(100) + expectMsgType[DeleteMessagesFailure] + expectNoMsg(100.millis) // since the actor handled the message, we do not issue warn logging automatically + } + + } +} + diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 06cee55e6f..0eb78a9c46 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -4,20 +4,18 @@ package akka.persistence -import scala.collection.immutable import akka.actor.{ OneForOneStrategy, _ } -import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } import akka.persistence.journal.inmem.InmemStore -import akka.testkit.{ ImplicitSender, TestProbe, TestEvent, EventFilter } +import akka.persistence.journal.{ AsyncWriteJournal, AsyncWriteProxy } +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } import akka.util.Timeout +import scala.collection.immutable import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace -import scala.util.Try -import akka.persistence.journal.AsyncWriteJournal -import scala.util.Failure +import scala.util.{ Failure, Try } object PersistentActorFailureSpec { import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } @@ -32,7 +30,7 @@ object PersistentActorFailureSpec { override def preStart(): Unit = { super.preStart() - self ! SetStore(context.actorOf(Props[FailingInmemStore])) + self ! SetStore(context.actorOf(Props[FailingInmemStore]())) } } @@ -46,7 +44,7 @@ object PersistentActorFailureSpec { case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ val highest = highestSequenceNr(pid) val readFromStore = read(pid, fromSnr, toSnr, max) - if (readFromStore.length == 0) + if (readFromStore.isEmpty) sender() ! ReplaySuccess(highest) else if (isCorrupt(readFromStore)) sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) @@ -66,9 +64,9 @@ object PersistentActorFailureSpec { def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] = w.messages.collect { case a: AtomicWrite ⇒ - (a.payload.collectFirst { - case PersistentRepr(Evt(s: String), _) if s.contains("not serializable") ⇒ s - }) match { + a.payload.collectFirst { + case PersistentRepr(Evt(s: String), _: Long) if s.contains("not serializable") ⇒ s + } match { case Some(s) ⇒ Failure(new SimulatedSerializationException(s)) case None ⇒ AsyncWriteJournal.successUnit } @@ -80,6 +78,15 @@ object PersistentActorFailureSpec { override def receive = failingReceive.orElse(super.receive) } + class OnRecoveryFailurePersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case c @ Cmd(txt) ⇒ persist(Evt(txt))(updateState) + } + + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = + probe ! "recovery-failure:" + cause.getMessage + } + class Supervisor(testActor: ActorRef) extends Actor { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e ⇒ @@ -174,14 +181,30 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( watch(ref) expectTerminated(ref) } - "stop if persist fails" in { + "call onRecoveryFailure when recovery from persisted events fails" in { + val props = Props(classOf[OnRecoveryFailurePersistentActor], name, testActor) + + val persistentActor = system.actorOf(props) + persistentActor ! Cmd("corrupt") + persistentActor ! GetState + expectMsg(List("corrupt")) + + // recover by creating another with same name + system.actorOf(Props(classOf[Supervisor], testActor)) ! props + val ref = expectMsgType[ActorRef] + expectMsg("recovery-failure:blahonga 1 1") + watch(ref) + expectTerminated(ref) + } + "call onPersistFailure and stop when persist fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) val persistentActor = expectMsgType[ActorRef] watch(persistentActor) persistentActor ! Cmd("wrong") + expectMsg("Failure: wrong-1") expectTerminated(persistentActor) } - "stop if persistAsync fails" in { + "call onPersistFailure and stop if persistAsync fails xoxo" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") @@ -190,6 +213,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("a-1") // reply after successful persistAsync persistentActor ! Cmd("wrong") expectMsg("wrong") // reply before persistAsync + expectMsg("Failure: wrong-2") // onPersistFailure sent message expectTerminated(persistentActor) } "call onPersistRejected and continue if persist rejected" in { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index cc7f976d24..67dcd7a319 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -7,6 +7,7 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import akka.actor._ +import akka.persistence.JournalProtocol.DeleteMessagesSuccess import akka.testkit.{ ImplicitSender, TestLatch, TestProbe } import com.typesafe.config.Config @@ -24,15 +25,19 @@ object PersistentActorSpec { abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { var events: List[Any] = Nil + var askedForDelete: Option[ActorRef] = None val updateState: Receive = { - case Evt(data) ⇒ events = data :: events + case Evt(data) ⇒ events = data :: events + case d @ Some(ref: ActorRef) ⇒ askedForDelete = d.asInstanceOf[Some[ActorRef]] } val commonBehavior: Receive = { - case "boom" ⇒ throw new TestException("boom") - case Delete(toSequenceNr) ⇒ deleteMessages(toSequenceNr) - case GetState ⇒ sender() ! events.reverse + case "boom" ⇒ throw new TestException("boom") + case GetState ⇒ sender() ! events.reverse + case Delete(toSequenceNr) ⇒ + persist(Some(sender())) { s ⇒ askedForDelete = s } + deleteMessages(toSequenceNr) } def receiveRecover = updateState @@ -42,6 +47,9 @@ object PersistentActorSpec { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persistAll(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) + case d: DeleteMessagesSuccess ⇒ + val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesSuccess without anyone asking for delete!")) + replyTo ! d } override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = @@ -49,6 +57,11 @@ object PersistentActorSpec { case Evt(data) ⇒ sender() ! s"Rejected: $data" case _ ⇒ super.onPersistRejected(cause, event, seqNr) } + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = + event match { + case Evt(data) ⇒ sender() ! s"Failure: $data" + case _ ⇒ super.onPersistFailure(cause, event, seqNr) + } } class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) { @@ -220,6 +233,13 @@ object PersistentActorSpec { counter += 1 counter } + + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = + event match { + case Evt(data) ⇒ sender() ! s"Failure: $data" + case _ ⇒ super.onPersistFailure(cause, event, seqNr) + } + } class AsyncPersistThreeTimesPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -1107,6 +1127,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-1", "b-2")) persistentActor ! Delete(2L) // delete "a-1" and "a-2" persistentActor ! "boom" // restart, recover + expectMsgType[DeleteMessagesSuccess] persistentActor ! GetState expectMsg(List("b-1", "b-2")) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index cb34674b4a..bae79abb39 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -53,7 +53,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) - recoveringActor ! Recovery() expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) ⇒ } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index 08f1c46c64..01d9a644ea 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -87,7 +87,6 @@ class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config(" sPersistentActor ! "blahonga" expectMsg(0) val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) - lPersistentActor ! Recovery() expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) ⇒ state should ===(new MySnapshot("blahonga")) diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index cfef8a465a..adbf3728ae 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -208,8 +208,8 @@ private class PersistentPublisherBuffer(override val processorId: String, publis self ! Filled } - override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { - super.onReplayFailure(receive, await, cause) + override def onRecoveryFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { + super.onRecoveryFailure(receive, await, cause) self ! Filled } From 561d46e2dac3a983a134c7bc7ab5fd4d532d274e Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 7 Jul 2015 16:55:35 +0200 Subject: [PATCH 2/2] =pro fixup in PR validation logging logic --- akka-docs/rst/java/lambda-persistence.rst | 26 +++++++++++++++---- akka-docs/rst/java/persistence.rst | 18 +++++++++++-- akka-docs/rst/scala/persistence.rst | 8 +++--- .../persistence/journal/JournalSpec.scala | 1 + project/ValidatePullRequest.scala | 8 +++--- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 30aa9ab83f..346d08736d 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -362,11 +362,11 @@ Persisting, deleting and replaying messages can eitehr succeed or fail. +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | | | +-------------------------------+-----------------------------------+ -| | | ``onPersistRejected`` | --- | +| | | ``onPersistRejected`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ -| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which @@ -488,9 +488,25 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a Snapshot deletion ----------------- -A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the -timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should -use the ``deleteSnapshots`` method. +A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of +when the snapshot was taken. + +To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, +persistent actors should use the ``deleteSnapshots`` method. + +Snapshot status handling +------------------------ + +Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via +status messages as illustrated in the following table. + +============================================== ========================== ============================== +**Method** **Success** **Failure message** +============================================== ========================== ============================== +``saveSnapshot(Any)`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` +``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure`` +``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure`` +============================================== ========================== ============================== .. _at-least-once-delivery-java-lambda: diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 1db0257c0b..a1e69ee8e4 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -365,11 +365,11 @@ Persisting, deleting and replaying messages can eitehr succeed or fail. +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | | | +-------------------------------+-----------------------------------+ -| | | ``onPersistRejected`` | --- | +| | | ``onPersistRejected`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ -| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which @@ -497,6 +497,20 @@ when the snapshot was taken. To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +Snapshot status handling +------------------------ + +Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via +status messages as illustrated in the following table. + +============================================== ========================== ============================== +**Method** **Success** **Failure message** +============================================== ========================== ============================== +``saveSnapshot(Any)`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` +``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure`` +``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure`` +============================================== ========================== ============================== + .. _at-least-once-delivery-java: At-Least-Once Delivery diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index ed9abd8190..34c880e020 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -353,11 +353,11 @@ Persisting, deleting and replaying messages can eitehr succeed or fail. +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | | | +-------------------------------+-----------------------------------+ -| | | ``onPersistRejected`` | --- | +| | | ``onPersistRejected`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ | ``recovery`` | ``RecoveryCompleted`` | ``onRecoveryFailure`` | Actor is stopped. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ -| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | No automatic actions. | +---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which @@ -499,14 +499,14 @@ status messages as illustrated in the following table. ============================================== ========================== ============================== **Method** **Success** **Failure message** ============================================== ========================== ============================== -``saveSnapshot`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` +``saveSnapshot(Any)`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` ``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure`` ``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure`` ============================================== ========================== ============================== If failure messages are left unhandled by the actor, a default warning log message will be logged for each incoming failure message. No default action is performed on the success messages, however you're free to handle them e.g. in order to delete -an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot aggain. +an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot again. .. _at-least-once-delivery: diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 72900b1e7a..90c3c320c2 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -149,6 +149,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { subscribe[DeleteMessagesTo](sub.ref) journal ! cmd sub.expectMsg(cmd) + receiverProbe2.expectMsg(DeleteMessagesSuccess(cmd.toSequenceNr)) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } diff --git a/project/ValidatePullRequest.scala b/project/ValidatePullRequest.scala index 93b7915012..5066fd1f3e 100644 --- a/project/ValidatePullRequest.scala +++ b/project/ValidatePullRequest.scala @@ -106,10 +106,10 @@ object ValidatePullRequest extends AutoPlugin { // if this project depends on a modified module, we must test it deps.nodes.exists { m => - val depends = modifiedModuleIds exists { - _.name == m.id.name - } // match just by name, we'd rather include too much than too little - if (depends) log.info(s"Project [$name] must be verified, because depends on [${modifiedModuleIds.find(_ == m.id).get}]") + // match just by name, we'd rather include too much than too little + val dependsOnModule = modifiedModuleIds find { _.name == m.id.name } + val depends = dependsOnModule.isDefined + if (depends) log.info(s"Project [$name] must be verified, because depends on [$dependsOnModule]") depends } }