diff --git a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index 66f937591c..d5f3207b43 100644 --- a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -258,6 +258,16 @@ public class LambdaPersistenceDocTest { static Object o5 = new Object() { class MyPersistentActor extends AbstractPersistentActor { + + //#snapshot-criteria + @Override + public Recovery recovery() { + return Recovery.create( + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())); + } + //#snapshot-criteria + //#snapshot-offer private Object state; @@ -291,14 +301,6 @@ public class LambdaPersistenceDocTest { match(Object.class, o -> {/* ... */}).build() ); } - - private void recover() { - //#snapshot-criteria - persistentActor.tell(Recovery.create( - SnapshotSelectionCriteria - .create(457L, System.currentTimeMillis())), null); - //#snapshot-criteria - } } }; diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index ce6002f338..e9963615d7 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -240,9 +240,19 @@ public class PersistenceDocTest { static Object o5 = new Object() { class MyPersistentActor extends UntypedPersistentActor { + + //#snapshot-criteria + @Override + public Recovery recovery() { + return Recovery.create( + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())); + } + //#snapshot-criteria + @Override public String persistenceId() { return "persistence-id"; } - + //#snapshot-offer private Object state; @@ -258,30 +268,12 @@ public class PersistenceDocTest { } } //#snapshot-offer - + @Override public void onReceiveCommand(Object message) { } } - class MyActor extends UntypedActor { - ActorRef persistentActor; - - public MyActor() { - persistentActor = getContext().actorOf(Props.create(MyPersistentActor.class)); - } - - public void onReceive(Object message) throws Exception { - // ... - } - - private void recover() { - //#snapshot-criteria - persistentActor.tell(Recovery.create(SnapshotSelectionCriteria.create(457L, - System.currentTimeMillis())), null); - //#snapshot-criteria - } - } }; static Object o9 = new Object() { diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index a2003ba439..30aa9ab83f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -204,7 +204,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. @@ -293,6 +293,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-lambda: + Failures -------- @@ -312,10 +314,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. - -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -346,11 +345,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. + +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-lambda`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem .. _persistent-views-java-lambda: diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b8470651e4..1db0257c0b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -206,7 +206,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. .. _persist-async-java: @@ -296,6 +296,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-java: + Failures -------- @@ -315,7 +317,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -346,14 +348,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-java`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem .. _persistent-views-java: diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 22052a368a..e04211f0ff 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -489,3 +489,12 @@ The ``permanent`` deletion flag was removed. Support for non-permanent deletions removed. Events that were deleted with ``permanent=false`` with older version will still not be replayed in this version. +References to "replay" in names +------------------------------- +Previously a number of classes and methods used the word "replay" interchangeably with the word "recover". +This lead to slight inconsistencies in APIs, where a method would be called ``recovery``, yet the +signal for a completed recovery was named ``ReplayMessagesSuccess``. + +This is now fixed, and all methods use the same "recovery" wording consistently across the entire API. +The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure`` +has been introduced. \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 36c621b165..7228b31be3 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -166,6 +166,12 @@ object PersistenceDocSpec { class MyPersistentActor extends PersistentActor { override def persistenceId = "my-stable-persistence-id" + //#snapshot-criteria + override def recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria( + maxSequenceNr = 457L, + maxTimestamp = System.currentTimeMillis)) + //#snapshot-criteria + //#snapshot-offer var state: Any = _ @@ -179,11 +185,6 @@ object PersistenceDocSpec { override def receiveCommand: Receive = ??? } - //#snapshot-criteria - persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria( - maxSequenceNr = 457L, - maxTimestamp = System.currentTimeMillis)) - //#snapshot-criteria } object PersistAsync { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0eaad129fd..ed9abd8190 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -190,7 +190,7 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed -If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. .. _persist-async-scala: @@ -282,6 +282,8 @@ In this case no stashing is happening, yet the events are still persisted and ca While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. +.. _failures-scala: + Failures -------- @@ -301,10 +303,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ next message. If there is a problem with recovering the state of the actor from the journal when the actor is -started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. - -If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. Atomic writes ------------- @@ -334,15 +333,55 @@ the maximum throughput dramatically. A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. - Message deletion ---------------- -To delete all messages (journaled by a single persistent actor) up to a specified sequence number, +It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) -and the actor continues with next message. +Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, +while still having access to the accumulated state during replays - by loading the snapshot. + +Persistence status handling +--------------------------- +Persisting, deleting and replaying messages can eitehr succeed or fail. + ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. | +| | +-------------------------------+-----------------------------------+ +| | | ``onPersistRejected`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``recovery`` | ``RecoveryCompleted`` | ``onRecoveryFailure`` | Actor is stopped. | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ +| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- | ++---------------------------------+-----------------------------+-------------------------------+-----------------------------------+ + +The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which +the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message +(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about +which message caused the failure. + +For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure +handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most +likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most +likely not help the journal recover – as it would likely cause a `Thundering herd problem`_, as many persistent actors +would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-scala`) which +implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between +restarts of the persistent actor. + +.. note:: + Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number + of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, + it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped. + + Check the documentation of the journal implementation you are using for details if/how it is using this technique. + +.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem + .. _persistent-views: @@ -451,6 +490,24 @@ when the snapshot was taken. To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +Snapshot status handling +------------------------ + +Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via +status messages as illustrated in the following table. + +============================================== ========================== ============================== +**Method** **Success** **Failure message** +============================================== ========================== ============================== +``saveSnapshot`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure`` +``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure`` +``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure`` +============================================== ========================== ============================== + +If failure messages are left unhandled by the actor, a default warning log message will be logged for each incoming failure message. +No default action is performed on the success messages, however you're free to handle them e.g. in order to delete +an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot aggain. + .. _at-least-once-delivery: At-Least-Once Delivery diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 64dd33bb34..72900b1e7a 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -92,54 +92,54 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { "replay all messages" in { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower sequence number bound" in { journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using an upper sequence number bound" in { journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a count limit" in { journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound" in { journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) 2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound and a count limit" in { journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) 2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay a single if lower sequence number bound equals upper sequence number bound" in { journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "replay a single message if count limit equals 1" in { journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if count limit equals 0" in { journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if lower sequence number bound is greater than upper sequence number bound" in { journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L)) } "not replay messages if the persistent actor has not yet written messages" in { journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 0L)) + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 0L)) } "not replay permanently deleted messages (range deletion)" in { val receiverProbe2 = TestProbe() diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 181e54117a..23088fd52e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -107,7 +107,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param event the event that was processed in `receiveRecover`, if the exception * was thrown there */ - protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = event match { case Some(evt) ⇒ log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + @@ -148,18 +148,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas event.getClass.getName, seqNr, persistenceId, cause.getMessage) } - /** - * Called when ``deleteMessages`` failed. By default this method logs the problem - * as a warning, and the actor continues. - * - * @param cause failure cause - * @param toSequenceNr the sequence number parameter of the ``deleteMessages`` call - */ - protected def onDeleteMessagesFailure(cause: Throwable, toSequenceNr: Long): Unit = { - log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}].", - toSequenceNr, persistenceId, cause.getMessage) - } - private def startRecovery(recovery: Recovery): Unit = { changeState(recoveryStarted(recovery.replayMax)) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) @@ -216,7 +204,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def unhandled(message: Any): Unit = { message match { case RecoveryCompleted ⇒ // mute - case m ⇒ super.unhandled(m) + case SaveSnapshotFailure(m, e) ⇒ + log.warning("Failed to saveSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage) + case DeleteSnapshotFailure(m, e) ⇒ + log.warning("Failed to deleteSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage) + case DeleteSnapshotsFailure(c, e) ⇒ + log.warning("Failed to deleteSnapshots given criteria [{}] due to: [{}: {}]", c, e.getClass.getCanonicalName, e.getMessage) + case DeleteMessagesFailure(e, toSequenceNr) ⇒ + log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}: {}].", + toSequenceNr, persistenceId, e.getClass.getCanonicalName, e.getMessage) + case m ⇒ super.unhandled(m) } } @@ -393,7 +390,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * - * If the delete fails [[#onDeleteMessagesFailure]] will be called. + * If the delete fails an [[akka.persistence.JournalProtocol.DeleteMessagesFailure]] will be sent to the actor. * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ @@ -425,7 +422,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receiveRecover`. Then initiates a message replay, either starting - * from the loaded snapshot or from scratch, and switches to `replayStarted` state. + * from the loaded snapshot or from scratch, and switches to `recoveryStarted` state. * All incoming messages are stashed. * * @param replayMax maximum number of messages to replay. @@ -456,7 +453,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // Since we are recovering we can ignore the receive behavior from the stack Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) } - changeState(replayStarted(recoveryBehavior)) + changeState(recovering(recoveryBehavior)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ internalStash.stash() } @@ -468,11 +465,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * * If replay succeeds it got highest stored sequence number response from the journal and then switches * to `processingCommands` state. Otherwise the actor is stopped. - * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. + * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onRecoveryFailure`. * * All incoming messages are stashed. */ - private def replayStarted(recoveryBehavior: Receive) = new State { + private def recovering(recoveryBehavior: Receive) = new State { override def toString: String = s"replay started" override def recoveryRunning: Boolean = true @@ -483,9 +480,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ - try onReplayFailure(t, Some(p.payload)) finally context.stop(self) + try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) } - case ReplayMessagesSuccess(highestSeqNr) ⇒ + case RecoverySuccess(highestSeqNr) ⇒ onReplaySuccess() // callback for subclass implementation changeState(processingCommands) sequenceNr = highestSeqNr @@ -493,7 +490,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) case ReplayMessagesFailure(cause) ⇒ - try onReplayFailure(cause, event = None) finally context.stop(self) + try onRecoveryFailure(cause, event = None) finally context.stop(self) case other ⇒ internalStash.stash() } @@ -581,10 +578,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case WriteMessagesFailed(_) ⇒ () // it will be stopped by the first WriteMessageFailure message - - case DeleteMessagesFailure(e, toSequenceNr) ⇒ - onDeleteMessagesFailure(e, toSequenceNr) - } def onWriteMessageComplete(err: Boolean): Unit = @@ -616,7 +609,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas else internalStash.unstash() } - } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 5a3d72cd5a..cd7ca2eaee 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -22,6 +22,12 @@ private[persistence] object JournalProtocol { /** Internal journal acknowledgement. */ sealed trait Response extends Message + /** + * Reply message to a successful [[DeleteMessagesTo]] request. + */ + final case class DeleteMessagesSuccess(toSequenceNr: Long) + extends Response + /** * Reply message to a failed [[DeleteMessagesTo]] request. */ @@ -128,7 +134,7 @@ private[persistence] object JournalProtocol { * * @param highestSequenceNr highest stored sequence number. */ - case class ReplayMessagesSuccess(highestSequenceNr: Long) + case class RecoverySuccess(highestSequenceNr: Long) extends Response with DeadLetterSuppression /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 6e22a7b683..d3585aff02 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -293,7 +293,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory case NonFatal(t) ⇒ changeState(ignoreRemainingReplay(t)) } - case _: ReplayMessagesSuccess ⇒ + case _: RecoverySuccess ⇒ onReplayComplete() case ReplayMessagesFailure(cause) ⇒ try onReplayError(cause) finally onReplayComplete() @@ -339,8 +339,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory // replay must be a full replay (up to the highest stored sequence number) // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) - case _: ReplayMessagesSuccess ⇒ replayCompleted(receive) - case _ ⇒ internalStash.stash() + case _: RecoverySuccess ⇒ replayCompleted(receive) + case _ ⇒ internalStash.stash() } def replayCompleted(receive: Receive): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index 556d5ea879..042b7be716 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -12,8 +12,7 @@ package akka.persistence * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. */ -@SerialVersionUID(1L) // -//#snapshot-metadata +@SerialVersionUID(1L) //#snapshot-metadata final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) //#snapshot-metadata diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index 5b3c6f9912..5c7e14877f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -31,13 +31,13 @@ trait AsyncRecovery { * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. - * @param replayCallback called to replay a single message. Can be called from any + * @param recoveryCallback called to replay a single message. Can be called from any * thread. * * @see [[AsyncWriteJournal]] */ def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] + max: Long)(recoveryCallback: PersistentRepr ⇒ Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3b4ab470a7..3ebda5bf2a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -8,13 +8,11 @@ package akka.persistence.journal import akka.actor._ import akka.pattern.pipe import akka.persistence._ + import scala.collection.immutable import scala.concurrent.Future +import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal -import scala.util.Try -import scala.util.Success -import scala.util.Failure -import akka.AkkaException /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -110,7 +108,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { }.map(_ ⇒ highSeqNr) } }.map { - highSeqNr ⇒ ReplayMessagesSuccess(highSeqNr) + highSeqNr ⇒ RecoverySuccess(highSeqNr) }.recover { case e ⇒ ReplayMessagesFailure(e) }.pipeTo(persistentActor).onSuccess { @@ -118,9 +116,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ - asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { - case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) - case Failure(e) ⇒ persistentActor ! DeleteMessagesFailure(e, toSequenceNr) + asyncDeleteMessagesTo(persistenceId, toSequenceNr) map { + case _ ⇒ DeleteMessagesSuccess(toSequenceNr) + } recover { + case e ⇒ DeleteMessagesFailure(e, toSequenceNr) + } pipeTo persistentActor onComplete { + case _ if publish ⇒ context.system.eventStream.publish(d) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index b22bfbde7e..b1b2693ab7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -113,7 +113,7 @@ object AtLeastOnceDeliveryFailureSpec { private def debugMessage(msg: String): String = s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" - override protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = { + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = { // mute logging } diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala index 868f56f2fe..7b9d0f64e6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -88,8 +88,6 @@ object EndToEndEventAdapterSpec { } override def receiveCommand = persistIncoming - override def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = - probe.foreach { _ ! cause } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala new file mode 100644 index 0000000000..bc511df98d --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ +import akka.event.Logging +import akka.event.Logging.Warning +import akka.persistence.JournalProtocol.DeleteMessagesFailure +import akka.persistence.journal.inmem.InmemJournal +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.control.NoStackTrace + +object PersistentActorDeleteFailureSpec { + + case class DeleteTo(n: Long) + class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace + class SimulatedSerializationException(msg: String) extends RuntimeException(msg) with NoStackTrace + + class DeleteFailingInmemJournal extends InmemJournal { + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = + Future.failed(new SimulatedException("Boom! Unable to delete events!")) + } + + class DoesNotHandleDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor { + override def persistenceId = name + override def receiveCommand: Receive = { + case DeleteTo(n) ⇒ deleteMessages(n) + } + override def receiveRecover: Receive = Actor.emptyBehavior + } + + class HandlesDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor { + override def persistenceId = name + override def receiveCommand: Receive = { + case DeleteTo(n) ⇒ deleteMessages(n) + case f: DeleteMessagesFailure ⇒ probe ! f + } + override def receiveRecover: Receive = Actor.emptyBehavior + } + +} + +class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( + """ + akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorDeleteFailureSpec$DeleteFailingInmemJournal" + """))) with ImplicitSender { + import PersistentActorDeleteFailureSpec._ + + system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]())) + + "A persistent actor" must { + "have default warn logging be triggered, when deletion failed" in { + val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor)) + system.eventStream.subscribe(testActor, classOf[Logging.Warning]) + persistentActor ! DeleteTo(100) + val message = expectMsgType[Warning].message.toString + message should include("Failed to deleteMessages") + message should include("Boom! Unable to delete events!") // the `cause` message + } + + "be receive an DeleteMessagesFailure when deletion failed, and the default logging should not be triggered" in { + val persistentActor = system.actorOf(Props(classOf[HandlesDeleteFailureActor], name, testActor)) + system.eventStream.subscribe(testActor, classOf[Logging.Warning]) + persistentActor ! DeleteTo(100) + expectMsgType[DeleteMessagesFailure] + expectNoMsg(100.millis) // since the actor handled the message, we do not issue warn logging automatically + } + + } +} + diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 06cee55e6f..0eb78a9c46 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -4,20 +4,18 @@ package akka.persistence -import scala.collection.immutable import akka.actor.{ OneForOneStrategy, _ } -import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } import akka.persistence.journal.inmem.InmemStore -import akka.testkit.{ ImplicitSender, TestProbe, TestEvent, EventFilter } +import akka.persistence.journal.{ AsyncWriteJournal, AsyncWriteProxy } +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } import akka.util.Timeout +import scala.collection.immutable import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace -import scala.util.Try -import akka.persistence.journal.AsyncWriteJournal -import scala.util.Failure +import scala.util.{ Failure, Try } object PersistentActorFailureSpec { import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } @@ -32,7 +30,7 @@ object PersistentActorFailureSpec { override def preStart(): Unit = { super.preStart() - self ! SetStore(context.actorOf(Props[FailingInmemStore])) + self ! SetStore(context.actorOf(Props[FailingInmemStore]())) } } @@ -46,7 +44,7 @@ object PersistentActorFailureSpec { case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ val highest = highestSequenceNr(pid) val readFromStore = read(pid, fromSnr, toSnr, max) - if (readFromStore.length == 0) + if (readFromStore.isEmpty) sender() ! ReplaySuccess(highest) else if (isCorrupt(readFromStore)) sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) @@ -66,9 +64,9 @@ object PersistentActorFailureSpec { def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] = w.messages.collect { case a: AtomicWrite ⇒ - (a.payload.collectFirst { - case PersistentRepr(Evt(s: String), _) if s.contains("not serializable") ⇒ s - }) match { + a.payload.collectFirst { + case PersistentRepr(Evt(s: String), _: Long) if s.contains("not serializable") ⇒ s + } match { case Some(s) ⇒ Failure(new SimulatedSerializationException(s)) case None ⇒ AsyncWriteJournal.successUnit } @@ -80,6 +78,15 @@ object PersistentActorFailureSpec { override def receive = failingReceive.orElse(super.receive) } + class OnRecoveryFailurePersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case c @ Cmd(txt) ⇒ persist(Evt(txt))(updateState) + } + + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = + probe ! "recovery-failure:" + cause.getMessage + } + class Supervisor(testActor: ActorRef) extends Actor { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e ⇒ @@ -174,14 +181,30 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( watch(ref) expectTerminated(ref) } - "stop if persist fails" in { + "call onRecoveryFailure when recovery from persisted events fails" in { + val props = Props(classOf[OnRecoveryFailurePersistentActor], name, testActor) + + val persistentActor = system.actorOf(props) + persistentActor ! Cmd("corrupt") + persistentActor ! GetState + expectMsg(List("corrupt")) + + // recover by creating another with same name + system.actorOf(Props(classOf[Supervisor], testActor)) ! props + val ref = expectMsgType[ActorRef] + expectMsg("recovery-failure:blahonga 1 1") + watch(ref) + expectTerminated(ref) + } + "call onPersistFailure and stop when persist fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) val persistentActor = expectMsgType[ActorRef] watch(persistentActor) persistentActor ! Cmd("wrong") + expectMsg("Failure: wrong-1") expectTerminated(persistentActor) } - "stop if persistAsync fails" in { + "call onPersistFailure and stop if persistAsync fails xoxo" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") @@ -190,6 +213,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("a-1") // reply after successful persistAsync persistentActor ! Cmd("wrong") expectMsg("wrong") // reply before persistAsync + expectMsg("Failure: wrong-2") // onPersistFailure sent message expectTerminated(persistentActor) } "call onPersistRejected and continue if persist rejected" in { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index cc7f976d24..67dcd7a319 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -7,6 +7,7 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import akka.actor._ +import akka.persistence.JournalProtocol.DeleteMessagesSuccess import akka.testkit.{ ImplicitSender, TestLatch, TestProbe } import com.typesafe.config.Config @@ -24,15 +25,19 @@ object PersistentActorSpec { abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { var events: List[Any] = Nil + var askedForDelete: Option[ActorRef] = None val updateState: Receive = { - case Evt(data) ⇒ events = data :: events + case Evt(data) ⇒ events = data :: events + case d @ Some(ref: ActorRef) ⇒ askedForDelete = d.asInstanceOf[Some[ActorRef]] } val commonBehavior: Receive = { - case "boom" ⇒ throw new TestException("boom") - case Delete(toSequenceNr) ⇒ deleteMessages(toSequenceNr) - case GetState ⇒ sender() ! events.reverse + case "boom" ⇒ throw new TestException("boom") + case GetState ⇒ sender() ! events.reverse + case Delete(toSequenceNr) ⇒ + persist(Some(sender())) { s ⇒ askedForDelete = s } + deleteMessages(toSequenceNr) } def receiveRecover = updateState @@ -42,6 +47,9 @@ object PersistentActorSpec { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persistAll(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) + case d: DeleteMessagesSuccess ⇒ + val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesSuccess without anyone asking for delete!")) + replyTo ! d } override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = @@ -49,6 +57,11 @@ object PersistentActorSpec { case Evt(data) ⇒ sender() ! s"Rejected: $data" case _ ⇒ super.onPersistRejected(cause, event, seqNr) } + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = + event match { + case Evt(data) ⇒ sender() ! s"Failure: $data" + case _ ⇒ super.onPersistFailure(cause, event, seqNr) + } } class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) { @@ -220,6 +233,13 @@ object PersistentActorSpec { counter += 1 counter } + + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = + event match { + case Evt(data) ⇒ sender() ! s"Failure: $data" + case _ ⇒ super.onPersistFailure(cause, event, seqNr) + } + } class AsyncPersistThreeTimesPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -1107,6 +1127,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-1", "b-2")) persistentActor ! Delete(2L) // delete "a-1" and "a-2" persistentActor ! "boom" // restart, recover + expectMsgType[DeleteMessagesSuccess] persistentActor ! GetState expectMsg(List("b-1", "b-2")) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index cb34674b4a..bae79abb39 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -53,7 +53,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) - recoveringActor ! Recovery() expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) ⇒ } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index 08f1c46c64..01d9a644ea 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -87,7 +87,6 @@ class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config(" sPersistentActor ! "blahonga" expectMsg(0) val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) - lPersistentActor ! Recovery() expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) ⇒ state should ===(new MySnapshot("blahonga")) diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index cfef8a465a..adbf3728ae 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -208,8 +208,8 @@ private class PersistentPublisherBuffer(override val processorId: String, publis self ! Filled } - override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { - super.onReplayFailure(receive, await, cause) + override def onRecoveryFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { + super.onRecoveryFailure(receive, await, cause) self ! Filled }