!per #17518 harmonize and document failure handling in persistence

+ added tests for all failure scenarios (callbacks)
This commit is contained in:
Konrad Malawski 2015-07-02 00:44:10 +02:00
parent 403369a29e
commit 874d07c0ae
22 changed files with 390 additions and 135 deletions

View file

@ -258,6 +258,16 @@ public class LambdaPersistenceDocTest {
static Object o5 = new Object() { static Object o5 = new Object() {
class MyPersistentActor extends AbstractPersistentActor { class MyPersistentActor extends AbstractPersistentActor {
//#snapshot-criteria
@Override
public Recovery recovery() {
return Recovery.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis()));
}
//#snapshot-criteria
//#snapshot-offer //#snapshot-offer
private Object state; private Object state;
@ -291,14 +301,6 @@ public class LambdaPersistenceDocTest {
match(Object.class, o -> {/* ... */}).build() match(Object.class, o -> {/* ... */}).build()
); );
} }
private void recover() {
//#snapshot-criteria
persistentActor.tell(Recovery.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis())), null);
//#snapshot-criteria
}
} }
}; };

View file

@ -240,9 +240,19 @@ public class PersistenceDocTest {
static Object o5 = new Object() { static Object o5 = new Object() {
class MyPersistentActor extends UntypedPersistentActor { class MyPersistentActor extends UntypedPersistentActor {
//#snapshot-criteria
@Override
public Recovery recovery() {
return Recovery.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis()));
}
//#snapshot-criteria
@Override @Override
public String persistenceId() { return "persistence-id"; } public String persistenceId() { return "persistence-id"; }
//#snapshot-offer //#snapshot-offer
private Object state; private Object state;
@ -258,30 +268,12 @@ public class PersistenceDocTest {
} }
} }
//#snapshot-offer //#snapshot-offer
@Override @Override
public void onReceiveCommand(Object message) { public void onReceiveCommand(Object message) {
} }
} }
class MyActor extends UntypedActor {
ActorRef persistentActor;
public MyActor() {
persistentActor = getContext().actorOf(Props.create(MyPersistentActor.class));
}
public void onReceive(Object message) throws Exception {
// ...
}
private void recover() {
//#snapshot-criteria
persistentActor.tell(Recovery.create(SnapshotSelectionCriteria.create(457L,
System.currentTimeMillis())), null);
//#snapshot-criteria
}
}
}; };
static Object o9 = new Object() { static Object o9 = new Object() {

View file

@ -204,7 +204,7 @@ and before any other received messages.
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
is called (logging the error by default) and the actor will be stopped. is called (logging the error by default) and the actor will be stopped.
@ -293,6 +293,8 @@ In this case no stashing is happening, yet the events are still persisted and ca
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
it is not a recommended practice as it may lead to overly complex nesting. it is not a recommended practice as it may lead to overly complex nesting.
.. _failures-lambda:
Failures Failures
-------- --------
@ -312,10 +314,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ
next message. next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
Atomic writes Atomic writes
------------- -------------
@ -346,11 +345,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten
Message deletion Message deletion
---------------- ----------------
To delete all messages (journaled by a single persistent actor) up to a specified sequence number, It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method. persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
and the actor continues with next message. :ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.
Persistence status handling
---------------------------
Persisting, deleting and replaying messages can eitehr succeed or fail.
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. |
| | +-------------------------------+-----------------------------------+
| | | ``onPersistRejected`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which
the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message
(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about
which message caused the failure.
For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure
handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most
likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most
likely not help the journal recover as it would likely cause a `Thundering herd problem`_, as many persistent actors
would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-lambda`) which
implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between
restarts of the persistent actor.
.. note::
Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number
of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure,
it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped.
Check the documentation of the journal implementation you are using for details if/how it is using this technique.
.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem
.. _persistent-views-java-lambda: .. _persistent-views-java-lambda:

View file

@ -206,7 +206,7 @@ and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
is called (logging the error by default) and the actor will be stopped. is called (logging the error by default) and the actor will be stopped.
.. _persist-async-java: .. _persist-async-java:
@ -296,6 +296,8 @@ In this case no stashing is happening, yet the events are still persisted and ca
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
it is not a recommended practice as it may lead to overly complex nesting. it is not a recommended practice as it may lead to overly complex nesting.
.. _failures-java:
Failures Failures
-------- --------
@ -315,7 +317,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ
next message. next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped.
Atomic writes Atomic writes
------------- -------------
@ -346,14 +348,51 @@ writing the previous batch. Batch writes are never timer-based which keeps laten
Message deletion Message deletion
---------------- ----------------
To delete all messages (journaled by a single persistent actor) up to a specified sequence number, It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method. persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
and the actor continues with next message. :ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) Persistence status handling
and the actor continues with next message. ---------------------------
Persisting, deleting and replaying messages can eitehr succeed or fail.
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. |
| | +-------------------------------+-----------------------------------+
| | | ``onPersistRejected`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``recovery`` | ``RecoverySuccess`` | ``onRecoveryFailure`` | Actor is stopped. |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which
the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message
(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about
which message caused the failure.
For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure
handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most
likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most
likely not help the journal recover as it would likely cause a `Thundering herd problem`_, as many persistent actors
would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-java`) which
implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between
restarts of the persistent actor.
.. note::
Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number
of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure,
it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped.
Check the documentation of the journal implementation you are using for details if/how it is using this technique.
.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem
.. _persistent-views-java: .. _persistent-views-java:

View file

@ -489,3 +489,12 @@ The ``permanent`` deletion flag was removed. Support for non-permanent deletions
removed. Events that were deleted with ``permanent=false`` with older version will removed. Events that were deleted with ``permanent=false`` with older version will
still not be replayed in this version. still not be replayed in this version.
References to "replay" in names
-------------------------------
Previously a number of classes and methods used the word "replay" interchangeably with the word "recover".
This lead to slight inconsistencies in APIs, where a method would be called ``recovery``, yet the
signal for a completed recovery was named ``ReplayMessagesSuccess``.
This is now fixed, and all methods use the same "recovery" wording consistently across the entire API.
The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure``
has been introduced.

View file

@ -166,6 +166,12 @@ object PersistenceDocSpec {
class MyPersistentActor extends PersistentActor { class MyPersistentActor extends PersistentActor {
override def persistenceId = "my-stable-persistence-id" override def persistenceId = "my-stable-persistence-id"
//#snapshot-criteria
override def recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(
maxSequenceNr = 457L,
maxTimestamp = System.currentTimeMillis))
//#snapshot-criteria
//#snapshot-offer //#snapshot-offer
var state: Any = _ var state: Any = _
@ -179,11 +185,6 @@ object PersistenceDocSpec {
override def receiveCommand: Receive = ??? override def receiveCommand: Receive = ???
} }
//#snapshot-criteria
persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria(
maxSequenceNr = 457L,
maxTimestamp = System.currentTimeMillis))
//#snapshot-criteria
} }
object PersistAsync { object PersistAsync {

View file

@ -190,7 +190,7 @@ and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
is called (logging the error by default) and the actor will be stopped. is called (logging the error by default) and the actor will be stopped.
.. _persist-async-scala: .. _persist-async-scala:
@ -282,6 +282,8 @@ In this case no stashing is happening, yet the events are still persisted and ca
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
it is not a recommended practice as it may lead to overly complex nesting. it is not a recommended practice as it may lead to overly complex nesting.
.. _failures-scala:
Failures Failures
-------- --------
@ -301,10 +303,7 @@ If persistence of an event is rejected before it is stored, e.g. due to serializ
next message. next message.
If there is a problem with recovering the state of the actor from the journal when the actor is If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. started, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
Atomic writes Atomic writes
------------- -------------
@ -334,15 +333,55 @@ the maximum throughput dramatically.
A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
Message deletion Message deletion
---------------- ----------------
To delete all messages (journaled by a single persistent actor) up to a specified sequence number, It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method. persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
and the actor continues with next message. :ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.
Persistence status handling
---------------------------
Persisting, deleting and replaying messages can eitehr succeed or fail.
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| **Method** | **Success** | **Failure / Rejection** | **After failure handler invoked** |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``persist`` / ``persistAsync`` | persist handler invoked | ``onPersistFailure`` | Actor is stopped. |
| | +-------------------------------+-----------------------------------+
| | | ``onPersistRejected`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``recovery`` | ``RecoveryCompleted`` | ``onRecoveryFailure`` | Actor is stopped. |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
| ``deleteMessages`` | ``DeleteMessagesSuccess`` | ``DeleteMessagesFailure`` | --- |
+---------------------------------+-----------------------------+-------------------------------+-----------------------------------+
The most important operations (``persist`` and ``recovery``) have failure handlers modelled as explicit callbacks which
the user can override in the ``PersistentActor``. The default implementations of these handlers emit a log message
(``error`` for persist/recovery failures, and ``warning`` for others), logging the failure cause and information about
which message caused the failure.
For critical failures, such as recovery or persisting events failing, the persistent actor will be stopped after the failure
handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most
likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most
likely not help the journal recover as it would likely cause a `Thundering herd problem`_, as many persistent actors
would restart and try to persist their events again. Instead, using a ``BackoffSupervisor`` (as described in :ref:`failures-scala`) which
implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between
restarts of the persistent actor.
.. note::
Journal implementations may choose to implement a retry mechanisms, e.g. such that only after a write fails N number
of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure,
it is considered *fatal* by Akka Persistence, and the persistent actor which caused the failure will be stopped.
Check the documentation of the journal implementation you are using for details if/how it is using this technique.
.. _Thundering herd problem: https://en.wikipedia.org/wiki/Thundering_herd_problem
.. _persistent-views: .. _persistent-views:
@ -451,6 +490,24 @@ when the snapshot was taken.
To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``,
persistent actors should use the ``deleteSnapshots`` method. persistent actors should use the ``deleteSnapshots`` method.
Snapshot status handling
------------------------
Saving or deleting snapshots can either succeed or fail this information is reported back to the persistent actor via
status messages as illustrated in the following table.
============================================== ========================== ==============================
**Method** **Success** **Failure message**
============================================== ========================== ==============================
``saveSnapshot`` ``SaveSnapshotSuccess`` ``SaveSnapshotFailure``
``deleteSnapshot(Long)`` ``DeleteSnapshotSuccess`` ``DeleteSnapshotFailure``
``deleteSnapshots(SnapshotSelectionCriteria)`` ``DeleteSnapshotsSuccess`` ``DeleteSnapshotsFailure``
============================================== ========================== ==============================
If failure messages are left unhandled by the actor, a default warning log message will be logged for each incoming failure message.
No default action is performed on the success messages, however you're free to handle them e.g. in order to delete
an in memory representation of the snapshot, or in the case of failure to attempt save the snapshot aggain.
.. _at-least-once-delivery: .. _at-least-once-delivery:
At-Least-Once Delivery At-Least-Once Delivery

View file

@ -92,54 +92,54 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
"replay all messages" in { "replay all messages" in {
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
1 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 1 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay messages using a lower sequence number bound" in { "replay messages using a lower sequence number bound" in {
journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
3 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 3 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay messages using an upper sequence number bound" in { "replay messages using an upper sequence number bound" in {
journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref)
1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay messages using a count limit" in { "replay messages using a count limit" in {
journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref)
1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay messages using a lower and upper sequence number bound" in { "replay messages using a lower and upper sequence number bound" in {
journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref)
2 to 4 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 2 to 4 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay messages using a lower and upper sequence number bound and a count limit" in { "replay messages using a lower and upper sequence number bound and a count limit" in {
journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref)
2 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 2 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay a single if lower sequence number bound equals upper sequence number bound" in { "replay a single if lower sequence number bound equals upper sequence number bound" in {
journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref)
2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"replay a single message if count limit equals 1" in { "replay a single message if count limit equals 1" in {
journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref)
2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) } 2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"not replay messages if count limit equals 0" in { "not replay messages if count limit equals 0" in {
journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"not replay messages if lower sequence number bound is greater than upper sequence number bound" in { "not replay messages if lower sequence number bound is greater than upper sequence number bound" in {
journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
} }
"not replay messages if the persistent actor has not yet written messages" in { "not replay messages if the persistent actor has not yet written messages" in {
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref) journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 0L)) receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 0L))
} }
"not replay permanently deleted messages (range deletion)" in { "not replay permanently deleted messages (range deletion)" in {
val receiverProbe2 = TestProbe() val receiverProbe2 = TestProbe()

View file

@ -107,7 +107,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* @param event the event that was processed in `receiveRecover`, if the exception * @param event the event that was processed in `receiveRecover`, if the exception
* was thrown there * was thrown there
*/ */
protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
event match { event match {
case Some(evt) case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
@ -148,18 +148,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
event.getClass.getName, seqNr, persistenceId, cause.getMessage) event.getClass.getName, seqNr, persistenceId, cause.getMessage)
} }
/**
* Called when ``deleteMessages`` failed. By default this method logs the problem
* as a warning, and the actor continues.
*
* @param cause failure cause
* @param toSequenceNr the sequence number parameter of the ``deleteMessages`` call
*/
protected def onDeleteMessagesFailure(cause: Throwable, toSequenceNr: Long): Unit = {
log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}].",
toSequenceNr, persistenceId, cause.getMessage)
}
private def startRecovery(recovery: Recovery): Unit = { private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax)) changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
@ -216,7 +204,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def unhandled(message: Any): Unit = { override def unhandled(message: Any): Unit = {
message match { message match {
case RecoveryCompleted // mute case RecoveryCompleted // mute
case m super.unhandled(m) case SaveSnapshotFailure(m, e)
log.warning("Failed to saveSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage)
case DeleteSnapshotFailure(m, e)
log.warning("Failed to deleteSnapshot given metadata [{}] due to: [{}: {}]", m, e.getClass.getCanonicalName, e.getMessage)
case DeleteSnapshotsFailure(c, e)
log.warning("Failed to deleteSnapshots given criteria [{}] due to: [{}: {}]", c, e.getClass.getCanonicalName, e.getMessage)
case DeleteMessagesFailure(e, toSequenceNr)
log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}: {}].",
toSequenceNr, persistenceId, e.getClass.getCanonicalName, e.getMessage)
case m super.unhandled(m)
} }
} }
@ -393,7 +390,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
/** /**
* Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
* *
* If the delete fails [[#onDeleteMessagesFailure]] will be called. * If the delete fails an [[akka.persistence.JournalProtocol.DeleteMessagesFailure]] will be sent to the actor.
* *
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted. * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/ */
@ -425,7 +422,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
/** /**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receiveRecover`. Then initiates a message replay, either starting * message to the actor's `receiveRecover`. Then initiates a message replay, either starting
* from the loaded snapshot or from scratch, and switches to `replayStarted` state. * from the loaded snapshot or from scratch, and switches to `recoveryStarted` state.
* All incoming messages are stashed. * All incoming messages are stashed.
* *
* @param replayMax maximum number of messages to replay. * @param replayMax maximum number of messages to replay.
@ -456,7 +453,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
// Since we are recovering we can ignore the receive behavior from the stack // Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot))
} }
changeState(replayStarted(recoveryBehavior)) changeState(recovering(recoveryBehavior))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case other internalStash.stash() case other internalStash.stash()
} }
@ -468,11 +465,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* *
* If replay succeeds it got highest stored sequence number response from the journal and then switches * If replay succeeds it got highest stored sequence number response from the journal and then switches
* to `processingCommands` state. Otherwise the actor is stopped. * to `processingCommands` state. Otherwise the actor is stopped.
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onRecoveryFailure`.
* *
* All incoming messages are stashed. * All incoming messages are stashed.
*/ */
private def replayStarted(recoveryBehavior: Receive) = new State { private def recovering(recoveryBehavior: Receive) = new State {
override def toString: String = s"replay started" override def toString: String = s"replay started"
override def recoveryRunning: Boolean = true override def recoveryRunning: Boolean = true
@ -483,9 +480,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
Eventsourced.super.aroundReceive(recoveryBehavior, p) Eventsourced.super.aroundReceive(recoveryBehavior, p)
} catch { } catch {
case NonFatal(t) case NonFatal(t)
try onReplayFailure(t, Some(p.payload)) finally context.stop(self) try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
} }
case ReplayMessagesSuccess(highestSeqNr) case RecoverySuccess(highestSeqNr)
onReplaySuccess() // callback for subclass implementation onReplaySuccess() // callback for subclass implementation
changeState(processingCommands) changeState(processingCommands)
sequenceNr = highestSeqNr sequenceNr = highestSeqNr
@ -493,7 +490,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
internalStash.unstashAll() internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
case ReplayMessagesFailure(cause) case ReplayMessagesFailure(cause)
try onReplayFailure(cause, event = None) finally context.stop(self) try onRecoveryFailure(cause, event = None) finally context.stop(self)
case other case other
internalStash.stash() internalStash.stash()
} }
@ -581,10 +578,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case WriteMessagesFailed(_) case WriteMessagesFailed(_)
() // it will be stopped by the first WriteMessageFailure message () // it will be stopped by the first WriteMessageFailure message
case DeleteMessagesFailure(e, toSequenceNr)
onDeleteMessagesFailure(e, toSequenceNr)
} }
def onWriteMessageComplete(err: Boolean): Unit = def onWriteMessageComplete(err: Boolean): Unit =
@ -616,7 +609,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
else else
internalStash.unstash() internalStash.unstash()
} }
} }
/** /**

View file

@ -22,6 +22,12 @@ private[persistence] object JournalProtocol {
/** Internal journal acknowledgement. */ /** Internal journal acknowledgement. */
sealed trait Response extends Message sealed trait Response extends Message
/**
* Reply message to a successful [[DeleteMessagesTo]] request.
*/
final case class DeleteMessagesSuccess(toSequenceNr: Long)
extends Response
/** /**
* Reply message to a failed [[DeleteMessagesTo]] request. * Reply message to a failed [[DeleteMessagesTo]] request.
*/ */
@ -128,7 +134,7 @@ private[persistence] object JournalProtocol {
* *
* @param highestSequenceNr highest stored sequence number. * @param highestSequenceNr highest stored sequence number.
*/ */
case class ReplayMessagesSuccess(highestSequenceNr: Long) case class RecoverySuccess(highestSequenceNr: Long)
extends Response with DeadLetterSuppression extends Response with DeadLetterSuppression
/** /**

View file

@ -293,7 +293,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
case NonFatal(t) case NonFatal(t)
changeState(ignoreRemainingReplay(t)) changeState(ignoreRemainingReplay(t))
} }
case _: ReplayMessagesSuccess case _: RecoverySuccess
onReplayComplete() onReplayComplete()
case ReplayMessagesFailure(cause) case ReplayMessagesFailure(cause)
try onReplayError(cause) finally onReplayComplete() try onReplayError(cause) finally onReplayComplete()
@ -339,8 +339,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
// replay must be a full replay (up to the highest stored sequence number) // replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart // Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue) setLastSequenceNr(Long.MaxValue)
case _: ReplayMessagesSuccess replayCompleted(receive) case _: RecoverySuccess replayCompleted(receive)
case _ internalStash.stash() case _ internalStash.stash()
} }
def replayCompleted(receive: Receive): Unit = { def replayCompleted(receive: Receive): Unit = {

View file

@ -12,8 +12,7 @@ package akka.persistence
* @param sequenceNr sequence number at which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
*/ */
@SerialVersionUID(1L) // @SerialVersionUID(1L) //#snapshot-metadata
//#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata //#snapshot-metadata

View file

@ -31,13 +31,13 @@ trait AsyncRecovery {
* @param fromSequenceNr sequence number where replay should start (inclusive). * @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive).
* @param max maximum number of messages to be replayed. * @param max maximum number of messages to be replayed.
* @param replayCallback called to replay a single message. Can be called from any * @param recoveryCallback called to replay a single message. Can be called from any
* thread. * thread.
* *
* @see [[AsyncWriteJournal]] * @see [[AsyncWriteJournal]]
*/ */
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] max: Long)(recoveryCallback: PersistentRepr Unit): Future[Unit]
/** /**
* Plugin API: asynchronously reads the highest stored sequence number for the * Plugin API: asynchronously reads the highest stored sequence number for the

View file

@ -8,13 +8,11 @@ package akka.persistence.journal
import akka.actor._ import akka.actor._
import akka.pattern.pipe import akka.pattern.pipe
import akka.persistence._ import akka.persistence._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.Try
import scala.util.Success
import scala.util.Failure
import akka.AkkaException
/** /**
* Abstract journal, optimized for asynchronous, non-blocking writes. * Abstract journal, optimized for asynchronous, non-blocking writes.
@ -110,7 +108,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
}.map(_ highSeqNr) }.map(_ highSeqNr)
} }
}.map { }.map {
highSeqNr ReplayMessagesSuccess(highSeqNr) highSeqNr RecoverySuccess(highSeqNr)
}.recover { }.recover {
case e ReplayMessagesFailure(e) case e ReplayMessagesFailure(e)
}.pipeTo(persistentActor).onSuccess { }.pipeTo(persistentActor).onSuccess {
@ -118,9 +116,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
} }
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { asyncDeleteMessagesTo(persistenceId, toSequenceNr) map {
case Success(_) if (publish) context.system.eventStream.publish(d) case _ DeleteMessagesSuccess(toSequenceNr)
case Failure(e) persistentActor ! DeleteMessagesFailure(e, toSequenceNr) } recover {
case e DeleteMessagesFailure(e, toSequenceNr)
} pipeTo persistentActor onComplete {
case _ if publish context.system.eventStream.publish(d)
} }
} }

View file

@ -113,7 +113,7 @@ object AtLeastOnceDeliveryFailureSpec {
private def debugMessage(msg: String): String = private def debugMessage(msg: String): String =
s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})"
override protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = { override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = {
// mute logging // mute logging
} }

View file

@ -88,8 +88,6 @@ object EndToEndEventAdapterSpec {
} }
override def receiveCommand = persistIncoming override def receiveCommand = persistIncoming
override def onReplayFailure(cause: Throwable, event: Option[Any]): Unit =
probe.foreach { _ ! cause }
} }
} }

View file

@ -0,0 +1,77 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.event.Logging
import akka.event.Logging.Warning
import akka.persistence.JournalProtocol.DeleteMessagesFailure
import akka.persistence.journal.inmem.InmemJournal
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
object PersistentActorDeleteFailureSpec {
case class DeleteTo(n: Long)
class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace
class SimulatedSerializationException(msg: String) extends RuntimeException(msg) with NoStackTrace
class DeleteFailingInmemJournal extends InmemJournal {
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
Future.failed(new SimulatedException("Boom! Unable to delete events!"))
}
class DoesNotHandleDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor {
override def persistenceId = name
override def receiveCommand: Receive = {
case DeleteTo(n) deleteMessages(n)
}
override def receiveRecover: Receive = Actor.emptyBehavior
}
class HandlesDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor {
override def persistenceId = name
override def receiveCommand: Receive = {
case DeleteTo(n) deleteMessages(n)
case f: DeleteMessagesFailure probe ! f
}
override def receiveRecover: Receive = Actor.emptyBehavior
}
}
class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
"""
akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorDeleteFailureSpec$DeleteFailingInmemJournal"
"""))) with ImplicitSender {
import PersistentActorDeleteFailureSpec._
system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]()))
"A persistent actor" must {
"have default warn logging be triggered, when deletion failed" in {
val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor))
system.eventStream.subscribe(testActor, classOf[Logging.Warning])
persistentActor ! DeleteTo(100)
val message = expectMsgType[Warning].message.toString
message should include("Failed to deleteMessages")
message should include("Boom! Unable to delete events!") // the `cause` message
}
"be receive an DeleteMessagesFailure when deletion failed, and the default logging should not be triggered" in {
val persistentActor = system.actorOf(Props(classOf[HandlesDeleteFailureActor], name, testActor))
system.eventStream.subscribe(testActor, classOf[Logging.Warning])
persistentActor ! DeleteTo(100)
expectMsgType[DeleteMessagesFailure]
expectNoMsg(100.millis) // since the actor handled the message, we do not issue warn logging automatically
}
}
}

View file

@ -4,20 +4,18 @@
package akka.persistence package akka.persistence
import scala.collection.immutable
import akka.actor.{ OneForOneStrategy, _ } import akka.actor.{ OneForOneStrategy, _ }
import akka.persistence.journal.AsyncWriteProxy
import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages }
import akka.persistence.journal.inmem.InmemStore import akka.persistence.journal.inmem.InmemStore
import akka.testkit.{ ImplicitSender, TestProbe, TestEvent, EventFilter } import akka.persistence.journal.{ AsyncWriteJournal, AsyncWriteProxy }
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import akka.util.Timeout import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.util.Try import scala.util.{ Failure, Try }
import akka.persistence.journal.AsyncWriteJournal
import scala.util.Failure
object PersistentActorFailureSpec { object PersistentActorFailureSpec {
import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor }
@ -32,7 +30,7 @@ object PersistentActorFailureSpec {
override def preStart(): Unit = { override def preStart(): Unit = {
super.preStart() super.preStart()
self ! SetStore(context.actorOf(Props[FailingInmemStore])) self ! SetStore(context.actorOf(Props[FailingInmemStore]()))
} }
} }
@ -46,7 +44,7 @@ object PersistentActorFailureSpec {
case ReplayMessages(pid, fromSnr, toSnr, max) case ReplayMessages(pid, fromSnr, toSnr, max)
val highest = highestSequenceNr(pid) val highest = highestSequenceNr(pid)
val readFromStore = read(pid, fromSnr, toSnr, max) val readFromStore = read(pid, fromSnr, toSnr, max)
if (readFromStore.length == 0) if (readFromStore.isEmpty)
sender() ! ReplaySuccess(highest) sender() ! ReplaySuccess(highest)
else if (isCorrupt(readFromStore)) else if (isCorrupt(readFromStore))
sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr"))
@ -66,9 +64,9 @@ object PersistentActorFailureSpec {
def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] = def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] =
w.messages.collect { w.messages.collect {
case a: AtomicWrite case a: AtomicWrite
(a.payload.collectFirst { a.payload.collectFirst {
case PersistentRepr(Evt(s: String), _) if s.contains("not serializable") s case PersistentRepr(Evt(s: String), _: Long) if s.contains("not serializable") s
}) match { } match {
case Some(s) Failure(new SimulatedSerializationException(s)) case Some(s) Failure(new SimulatedSerializationException(s))
case None AsyncWriteJournal.successUnit case None AsyncWriteJournal.successUnit
} }
@ -80,6 +78,15 @@ object PersistentActorFailureSpec {
override def receive = failingReceive.orElse(super.receive) override def receive = failingReceive.orElse(super.receive)
} }
class OnRecoveryFailurePersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case c @ Cmd(txt) persist(Evt(txt))(updateState)
}
override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
probe ! "recovery-failure:" + cause.getMessage
}
class Supervisor(testActor: ActorRef) extends Actor { class Supervisor(testActor: ActorRef) extends Actor {
override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e case e
@ -174,14 +181,30 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
watch(ref) watch(ref)
expectTerminated(ref) expectTerminated(ref)
} }
"stop if persist fails" in { "call onRecoveryFailure when recovery from persisted events fails" in {
val props = Props(classOf[OnRecoveryFailurePersistentActor], name, testActor)
val persistentActor = system.actorOf(props)
persistentActor ! Cmd("corrupt")
persistentActor ! GetState
expectMsg(List("corrupt"))
// recover by creating another with same name
system.actorOf(Props(classOf[Supervisor], testActor)) ! props
val ref = expectMsgType[ActorRef]
expectMsg("recovery-failure:blahonga 1 1")
watch(ref)
expectTerminated(ref)
}
"call onPersistFailure and stop when persist fails" in {
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
val persistentActor = expectMsgType[ActorRef] val persistentActor = expectMsgType[ActorRef]
watch(persistentActor) watch(persistentActor)
persistentActor ! Cmd("wrong") persistentActor ! Cmd("wrong")
expectMsg("Failure: wrong-1")
expectTerminated(persistentActor) expectTerminated(persistentActor)
} }
"stop if persistAsync fails" in { "call onPersistFailure and stop if persistAsync fails xoxo" in {
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name)
val persistentActor = expectMsgType[ActorRef] val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a") persistentActor ! Cmd("a")
@ -190,6 +213,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
expectMsg("a-1") // reply after successful persistAsync expectMsg("a-1") // reply after successful persistAsync
persistentActor ! Cmd("wrong") persistentActor ! Cmd("wrong")
expectMsg("wrong") // reply before persistAsync expectMsg("wrong") // reply before persistAsync
expectMsg("Failure: wrong-2") // onPersistFailure sent message
expectTerminated(persistentActor) expectTerminated(persistentActor)
} }
"call onPersistRejected and continue if persist rejected" in { "call onPersistRejected and continue if persist rejected" in {

View file

@ -7,6 +7,7 @@ package akka.persistence
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor._ import akka.actor._
import akka.persistence.JournalProtocol.DeleteMessagesSuccess
import akka.testkit.{ ImplicitSender, TestLatch, TestProbe } import akka.testkit.{ ImplicitSender, TestLatch, TestProbe }
import com.typesafe.config.Config import com.typesafe.config.Config
@ -24,15 +25,19 @@ object PersistentActorSpec {
abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) {
var events: List[Any] = Nil var events: List[Any] = Nil
var askedForDelete: Option[ActorRef] = None
val updateState: Receive = { val updateState: Receive = {
case Evt(data) events = data :: events case Evt(data) events = data :: events
case d @ Some(ref: ActorRef) askedForDelete = d.asInstanceOf[Some[ActorRef]]
} }
val commonBehavior: Receive = { val commonBehavior: Receive = {
case "boom" throw new TestException("boom") case "boom" throw new TestException("boom")
case Delete(toSequenceNr) deleteMessages(toSequenceNr) case GetState sender() ! events.reverse
case GetState sender() ! events.reverse case Delete(toSequenceNr)
persist(Some(sender())) { s askedForDelete = s }
deleteMessages(toSequenceNr)
} }
def receiveRecover = updateState def receiveRecover = updateState
@ -42,6 +47,9 @@ object PersistentActorSpec {
val receiveCommand: Receive = commonBehavior orElse { val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) case Cmd(data)
persistAll(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) persistAll(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState)
case d: DeleteMessagesSuccess
val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesSuccess without anyone asking for delete!"))
replyTo ! d
} }
override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit =
@ -49,6 +57,11 @@ object PersistentActorSpec {
case Evt(data) sender() ! s"Rejected: $data" case Evt(data) sender() ! s"Rejected: $data"
case _ super.onPersistRejected(cause, event, seqNr) case _ super.onPersistRejected(cause, event, seqNr)
} }
override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit =
event match {
case Evt(data) sender() ! s"Failure: $data"
case _ super.onPersistFailure(cause, event, seqNr)
}
} }
class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) { class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) {
@ -220,6 +233,13 @@ object PersistentActorSpec {
counter += 1 counter += 1
counter counter
} }
override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit =
event match {
case Evt(data) sender() ! s"Failure: $data"
case _ super.onPersistFailure(cause, event, seqNr)
}
} }
class AsyncPersistThreeTimesPersistentActor(name: String) extends ExamplePersistentActor(name) { class AsyncPersistThreeTimesPersistentActor(name: String) extends ExamplePersistentActor(name) {
var counter = 0 var counter = 0
@ -1107,6 +1127,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg(List("a-1", "a-2", "b-1", "b-2")) expectMsg(List("a-1", "a-2", "b-1", "b-2"))
persistentActor ! Delete(2L) // delete "a-1" and "a-2" persistentActor ! Delete(2L) // delete "a-1" and "a-2"
persistentActor ! "boom" // restart, recover persistentActor ! "boom" // restart, recover
expectMsgType[DeleteMessagesSuccess]
persistentActor ! GetState persistentActor ! GetState
expectMsg(List("b-1", "b-2")) expectMsg(List("b-1", "b-2"))
} }

View file

@ -53,7 +53,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con
val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor))
recoveringActor ! Recovery()
expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) } expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) }
expectMsg(RecoveryCompleted) expectMsg(RecoveryCompleted)
} }

View file

@ -87,7 +87,6 @@ class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config("
sPersistentActor ! "blahonga" sPersistentActor ! "blahonga"
expectMsg(0) expectMsg(0)
val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor))
lPersistentActor ! Recovery()
expectMsgPF() { expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) case (SnapshotMetadata(`persistenceId`, 0, timestamp), state)
state should ===(new MySnapshot("blahonga")) state should ===(new MySnapshot("blahonga"))

View file

@ -208,8 +208,8 @@ private class PersistentPublisherBuffer(override val processorId: String, publis
self ! Filled self ! Filled
} }
override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { override def onRecoveryFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = {
super.onReplayFailure(receive, await, cause) super.onRecoveryFailure(receive, await, cause)
self ! Filled self ! Filled
} }