Merge pull request #17867 from ktoso/wip-recover-method-ktoso

Make recovery a method
This commit is contained in:
Konrad Malawski 2015-06-30 17:23:28 +02:00
commit afa9549307
23 changed files with 218 additions and 342 deletions

View file

@ -1,8 +1,7 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package doc;
package docs.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorPath;
@ -33,22 +32,14 @@ public class LambdaPersistenceDocTest {
//#recovery-status
}
static Object o1 = new Object() {
private void recover(ActorRef persistentActor) {
//#recover-explicit
persistentActor.tell(Recover.create(), ActorRef.noSender());
//#recover-explicit
}
};
static Object o2 = new Object() {
abstract class MyPersistentActor1 extends AbstractPersistentActor {
//#recover-on-start-disabled
//#recovery-disabled
@Override
public void preStart() {}
//#recover-on-start-disabled
public Recovery recovery() {
return Recovery.none();
}
//#recovery-disabled
//#recover-on-restart-disabled
@Override
@ -57,12 +48,12 @@ public class LambdaPersistenceDocTest {
}
abstract class MyPersistentActor2 extends AbstractPersistentActor {
//#recover-on-start-custom
//#recovery-custom
@Override
public void preStart() {
self().tell(Recover.create(457L), null);
public Recovery recovery() {
return Recovery.create(457L);
}
//#recover-on-start-custom
//#recovery-custom
}
class MyPersistentActor4 extends AbstractPersistentActor implements PersistentActorMethods {
@ -136,15 +127,6 @@ public class LambdaPersistenceDocTest {
}
};
static Object fullyDisabledRecoveyExample = new Object() {
abstract class MyPersistentActor1 extends UntypedPersistentActor {
//#recover-fully-disabled
@Override
public void preStart() { getSelf().tell(Recover.create(0L), getSelf()); }
//#recover-fully-disabled
}
};
static Object atLeastOnceExample = new Object() {
//#at-least-once-example
@ -312,7 +294,7 @@ public class LambdaPersistenceDocTest {
private void recover() {
//#snapshot-criteria
persistentActor.tell(Recover.create(
persistentActor.tell(Recovery.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis())), null);
//#snapshot-criteria

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package doc;
package docs.persistence;
//#plugin-imports
import akka.dispatch.Futures;

View file

@ -32,41 +32,23 @@ public class PersistenceDocTest {
//#recovery-status
}
static Object o1 = new Object() {
class MyActor extends UntypedActor {
ActorRef persistentActor;
public void onReceive(Object message) throws Exception {
}
private void recover() {
//#recover-explicit
persistentActor.tell(Recover.create(), self());
//#recover-explicit
}
}
};
static Object o2 = new Object() {
abstract class MyPersistentActor1 extends UntypedPersistentActor {
//#recover-on-start-disabled
//#recovery-disabled
@Override
public void preStart() {}
//#recover-on-start-disabled
//#recover-on-restart-disabled
@Override
public void preRestart(Throwable reason, Option<Object> message) {}
//#recover-on-restart-disabled
public Recovery recovery() {
return Recovery.none();
}
//#recovery-disabled
}
abstract class MyPersistentActor2 extends UntypedPersistentActor {
//#recover-on-start-custom
//#recovery-custom
@Override
public void preStart() {
self().tell(Recover.create(457L), self());
public Recovery recovery() {
return Recovery.create(457L);
}
//#recover-on-start-custom
//#recovery-custom
}
class MyPersistentActor4 extends UntypedPersistentActor implements PersistentActorMethods {
@ -125,15 +107,6 @@ public class PersistenceDocTest {
}
};
static Object fullyDisabledRecoveryExample = new Object() {
abstract class MyPersistentActor1 extends UntypedPersistentActor {
//#recover-fully-disabled
@Override
public void preStart() { self().tell(Recover.create(0L), self()); }
//#recover-fully-disabled
}
};
static Object atLeastOnceExample = new Object() {
//#at-least-once-example
@ -227,7 +200,7 @@ public class PersistenceDocTest {
if (message instanceof Msg) {
Msg msg = (Msg) message;
// ...
getSender().tell(new Confirm(msg.deliveryId), self());
getSender().tell(new Confirm(msg.deliveryId), getSelf());
} else {
unhandled(message);
}
@ -304,7 +277,7 @@ public class PersistenceDocTest {
private void recover() {
//#snapshot-criteria
persistentActor.tell(Recover.create(SnapshotSelectionCriteria.create(457L,
persistentActor.tell(Recovery.create(SnapshotSelectionCriteria.create(457L,
System.currentTimeMillis())), null);
//#snapshot-criteria
}
@ -541,7 +514,7 @@ public class PersistenceDocTest {
}
};
static Object o12 = new Object() {
static Object o13 = new Object() {
//#view
class MyView extends UntypedPersistentView {
@Override

View file

@ -161,7 +161,7 @@ Identifiers
A persistent actor must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the ``persistenceId`` method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#persistence-id-override
.. _recovery-java-lambda:
@ -175,46 +175,29 @@ only be received by a persistent actor after recovery completes.
Recovery customization
^^^^^^^^^^^^^^^^^^^^^^
Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation.
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``AbstractPersistentActor``, for example setting an upper bound to the replay,
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-custom
In this case, a persistent actor must be recovered explicitly by sending it a ``Recover`` message.
Recovery can be disabled by returning ``Recovery.none`` in the ``recovery`` method of a ``PersistentActor``:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit
.. warning::
If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the
``PersistentActor`` until it receives a ``Recover`` and finishes recovery.
In order to completely skip recovery, you can signal it with ``Recover.create(0L)``
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-fully-disabled
If not overridden, ``preStart`` sends a ``Recover`` message to ``self()``. Applications may also override
``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom
Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-disabled
Recovery status
^^^^^^^^^^^^^^^
A persistent actor can query its own recovery status via the methods
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-status
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the persistent actor.
The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure``
is called (logging the error by default) and the actor will be stopped.
@ -236,7 +219,7 @@ stash incoming Commands while the Journal is still working on persisting and/or
In the below example, the event callbacks may be called "at any time", even after the next Command has been processed.
The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.).
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persist-async
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#persist-async
.. note::
In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync`` on all incoming messages right away,
@ -259,12 +242,12 @@ use it for *read* operations, and actions which do not have corresponding events
Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event.
It will be kept in memory and used when invoking the handler.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#defer
Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender
of the command for which this ``defer`` handler was called.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#defer-caller
.. warning::
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
@ -282,11 +265,11 @@ however there are situations where it may be useful. It is important to understa
those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following
example two persist calls are issued, and each of them issues another persist inside its callback:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist
When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist-caller
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist-caller
First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
@ -296,11 +279,11 @@ is extended until all nested ``persist`` callbacks have been handled.
It is also possible to nest ``persistAsync`` calls, using the same pattern:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync
In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
it is not a recommended practice as it may lead to overly complex nesting.
@ -317,7 +300,7 @@ will most likely fail anyway, since the journal is probably unavailable. It is b
actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor
is provided to support such restarts.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#backoff
If persistence of an event is rejected before it is stored, e.g. due to serialization error,
``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with
@ -372,7 +355,7 @@ Views
Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method
and setting the “initial behavior” in the constructor by calling the :meth:`receive` method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view
The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary
the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a
@ -394,7 +377,7 @@ The default update interval of all persistent views of an actor system is config
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view-update
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view-update
If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the
incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages
@ -439,12 +422,12 @@ in context of persistent actors but this is also applicable to persistent views.
Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot
succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#save-snapshot
During recovery, the persistent actor is offered a previously saved snapshot via a ``SnapshotOffer`` message from
which it can initialize internal state.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#snapshot-offer
The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot.
They finally recover the persistent actor to its current (i.e. latest) state.
@ -452,7 +435,7 @@ They finally recover the persistent actor to its current (i.e. latest) state.
In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots
and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#snapshot-criteria
If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot.
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
@ -509,7 +492,7 @@ between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must
of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender.
The sender will then use it to call ``confirmDelivery`` method to complete delivery routine.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#at-least-once-example
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#at-least-once-example
The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number
without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple
@ -631,7 +614,7 @@ For an example of snapshot store plugin which writes snapshots as individual fil
Applications can provide their own plugins by implementing a plugin API and activate them by configuration.
Plugin development requires the following imports:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports
.. includecode:: code/docs/persistence/LambdaPersistencePluginDocTest.java#plugin-imports
Journal plugin API
------------------
@ -644,7 +627,7 @@ A journal plugin extends ``AsyncWriteJournal``.
If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#sync-journal-plugin-api
.. includecode:: code/docs/persistence/LambdaPersistencePluginDocTest.java#sync-journal-plugin-api
A journal plugin must also implement the methods defined in ``AsyncRecovery`` for replays and sequence number recovery:
@ -719,7 +702,7 @@ plugin.
This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is
done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-usage
Internal journal commands (sent by persistent actors) are buffered until injection completes. Injection is idempotent
i.e. only the first injection is used.

View file

@ -177,32 +177,15 @@ They are cached and received by a persistent actor after recovery phase complete
Recovery customization
^^^^^^^^^^^^^^^^^^^^^^
Automated recovery on start can be disabled by overriding ``preStart`` with an empty or custom implementation.
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``UntypedPersistentActor``, for example setting an upper bound to the replay,
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-custom
In this case, a persistent actor must be recovered explicitly by sending it a ``Recover`` message.
Recovery can be disabled by returning ``Recovery.none()`` in the ``recovery`` method of a ``PersistentActor``:
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit
.. warning::
If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the
``PersistentActor`` until it receives a ``Recover`` and finishes recovery.
In order to completely skip recovery, you can signal it with ``Recover.create(0L)``
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-fully-disabled
If not overridden, ``preStart`` sends a ``Recover`` message to ``getSelf()``. Applications may also override
``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom
Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-disabled
Recovery status
^^^^^^^^^^^^^^^

View file

@ -329,6 +329,30 @@ The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes
any more. Events that were deleted with ``permanent=false`` with older version will
still not be replayed in this version.
Recover message is gone, replaced by Recovery config
----------------------------------------------------
Previously the way to cause recover in PersistentActors was sending them a ``Recover()`` message.
Most of the time it was the actor itself sending such message to ``self`` in its ``preStart`` method,
however it was possible to send this message from an external source to any ``PersistentActor`` or ``PresistentView``
to make it start recovering.
This style of starting recovery does not fit well with usual Actor best practices: an Actor should be independent
and know about its internal state, and also about its recovery or lack thereof. In order to guide users towards
more independent Actors, the ``Recovery()`` object is now not used as a message, but as configuration option
used by the Actor when it starts. In order to migrate previous code which customised its recovery mode use this example
as reference::
// previously
class OldCookieMonster extends PersistentActor {
def preStart() = self ! Recover(toSequenceNr = 42L)
// ...
}
// now:
class NewCookieMonster extends PersistentActor {
override def recovery = Recovery(toSequenceNr = 42L)
// ...
}
Persistence Plugin APIs
=======================

View file

@ -6,7 +6,6 @@ package docs.persistence
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.persistence._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.language.postfixOps
@ -27,28 +26,19 @@ object PersistenceDocSpec {
//#auto-update
"""
object Recovery {
object RecoverySample {
trait MyPersistentActor1 extends PersistentActor {
//#recover-on-start-disabled
override def preStart() = ()
//#recover-on-start-disabled
//#recover-on-restart-disabled
override def preRestart(reason: Throwable, message: Option[Any]) = ()
//#recover-on-restart-disabled
//#recovery-disabled
override def recovery = Recovery.none
//#recovery-disabled
}
trait MyPersistentActor2 extends PersistentActor {
//#recover-on-start-custom
override def preStart() {
self ! Recover(toSequenceNr = 457L)
}
//#recover-on-start-custom
//#recovery-custom
override def recovery = Recovery(toSequenceNr = 457L)
//#recovery-custom
}
//#recover-explicit
persistentActor ! Recover()
//#recover-explicit
class MyPersistentActor4 extends PersistentActor {
override def persistenceId = "my-stable-persistence-id"
@ -68,14 +58,6 @@ object PersistenceDocSpec {
}
}
object NoRecovery {
trait MyPersistentActor1 extends PersistentActor {
//#recover-fully-disabled
override def preStart() = self ! Recover(toSequenceNr = 0L)
//#recover-fully-disabled
}
}
object PersistenceId {
trait PersistentActorMethods {
//#persistence-id
@ -197,10 +179,8 @@ object PersistenceDocSpec {
override def receiveCommand: Receive = ???
}
import akka.actor.Props
//#snapshot-criteria
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(
persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria(
maxSequenceNr = 457L,
maxTimestamp = System.currentTimeMillis))
//#snapshot-criteria
@ -330,7 +310,6 @@ object PersistenceDocSpec {
//#nested-persist-persist-caller
class MyPersistAsyncActor extends PersistentActor {
override def persistenceId = "my-stable-persistence-id"
@ -351,7 +330,7 @@ object PersistenceDocSpec {
persistAsync(c + "-inner-2") { inner sender() ! inner }
}
}
//#nested-persistAsync-persistAsync
//#nested-persistAsync-persistAsync
}
//#nested-persistAsync-persistAsync-caller

View file

@ -161,32 +161,15 @@ They are cached and received by a persistent actor after recovery phase complete
Recovery customization
^^^^^^^^^^^^^^^^^^^^^^
Automated recovery on start can be disabled by overriding ``preStart`` with an empty or custom implementation.
Applications may also customise how recovery is performed by returning a customised ``Recovery`` object
in the ``recovery`` method of a ``PersistentActor``, for example setting an upper bound to the replay,
which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-custom
In this case, a persistent actor must be recovered explicitly by sending it a ``Recover()`` message.
Recovery can be disabled by returning ``Recovery.none()`` in the ``recovery`` method of a ``PersistentActor``:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit
.. warning::
If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the
``PersistentActor`` until it receives a ``Recover`` and finishes recovery.
In order to completely skip recovery, you can signal it with ``Recover(toSequenceNr = OL)``
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-fully-disabled
If not overridden, ``preStart`` sends a ``Recover()`` message to ``self``. Applications may also override
``preStart`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom
Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-disabled
Recovery status
^^^^^^^^^^^^^^^

View file

@ -103,13 +103,11 @@ final class BackoffSupervisor(
private var child: Option[ActorRef] = None
private var restartCount = 0
override def preStart(): Unit = {
override def preStart(): Unit =
startChild()
super.preStart()
}
def startChild(): Unit =
if (child == None) {
if (child.isEmpty) {
child = Some(context.watch(context.actorOf(childProps, childName)))
}

View file

@ -37,7 +37,7 @@ private[persistence] object Eventsourced {
* Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and
* [[UntypedPersistentActor]].
*/
private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity {
private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity with PersistenceRecovery {
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import Eventsourced._
@ -56,7 +56,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private var sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L
private var currentState: State = recoveryPending
// safely null because we initialize it with a proper `recoveryStarted` state in aroundPreStart before any real action happens
private var currentState: State = null
// Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands
private var pendingStashingPersistInvocations: Long = 0
@ -159,15 +160,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
toSequenceNr, persistenceId, cause.getMessage)
}
/**
* User-overridable callback. Called when a persistent actor is started or restarted.
* Default implementation sends a `Recover()` to `self`. Note that if you override
* `preStart` (or `preRestart`) and not call `super.preStart` you must send
* a `Recover()` message to `self` to activate the persistent actor.
*/
@throws(classOf[Exception])
override def preStart(): Unit =
self ! Recover()
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
}
/** INTERNAL API. */
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
@ -177,6 +173,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override protected[akka] def aroundPreStart(): Unit = {
// Fail fast on missing plugins.
val j = journal; val s = snapshotStore
startRecovery(recovery)
super.aroundPreStart()
}
@ -203,6 +200,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}
}
/** INTERNAL API. */
override protected[akka] def aroundPostRestart(reason: Throwable): Unit = {
startRecovery(recovery)
super.aroundPostRestart(reason)
}
/** INTERNAL API. */
override protected[akka] def aroundPostStop(): Unit =
try {
@ -252,7 +255,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recover]]
* @see [[Recovery]]
*/
def receiveRecover: Receive
@ -419,22 +422,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
def recoveryRunning: Boolean
}
/**
* Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot
* store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed.
*/
private def recoveryPending = new State {
override def toString: String = "recovery pending"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case Recover(fromSnap, toSnr, replayMax)
changeState(recoveryStarted(replayMax))
loadSnapshot(snapshotterId, fromSnap, toSnr)
case _ internalStash.stash()
}
}
/**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receiveRecover`. Then initiates a message replay, either starting
@ -458,11 +445,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}
}
override def toString: String = s"recovery started (replayMax = [${replayMax}])"
override def toString: String = s"recovery started (replayMax = [$replayMax])"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case LoadSnapshotResult(sso, toSnr)
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
@ -491,7 +477,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case ReplayedMessage(p)
try {
updateLastSequenceNr(p)
@ -654,17 +639,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
internalStash.unstash()
}
private def addToBatch(p: PersistentEnvelope): Unit = p match {
case a: AtomicWrite
journalBatch :+= a.copy(payload =
a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid)))
case r: PersistentEnvelope
journalBatch :+= r
}
private def maxBatchSizeReached: Boolean =
journalBatch.size >= maxMessageBatchSize
}
/**

View file

@ -102,6 +102,19 @@ trait PersistenceIdentity {
}
//#persistence-identity
trait PersistenceRecovery {
//#persistence-recovery
/**
* Called when the persistent actor is started for the first time.
* The returned [[Recovery]] object defines how the Actor will recover its persistent state before
* handling the first incoming message.
*
* To skip recovery completely return `Recovery.none`.
*/
def recovery: Recovery = Recovery()
//#persistence-recovery
}
/**
* Persistence extension provider.
*/

View file

@ -22,10 +22,13 @@ case object RecoveryCompleted extends RecoveryCompleted {
}
/**
* Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has
* previously saved one or more snapshots and at least one of these snapshots matches the specified
* `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled
* messages.
* Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
*
* By default recovers from latest snapshot replays through to the last available event (last sequenceId).
*
* Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
* and at least one of these snapshots matches the specified `fromSnapshot` criteria.
* Otherwise, recovery will start from scratch by replaying all stored events.
*
* If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
* message, followed by replayed messages, if any, that are younger than the snapshot, up to the
@ -37,47 +40,52 @@ case object RecoveryCompleted extends RecoveryCompleted {
* @param replayMax maximum number of messages to replay. Default is no limit.
*/
@SerialVersionUID(1L)
final case class Recover(fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue, replayMax: Long = Long.MaxValue)
final case class Recovery(
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
toSequenceNr: Long = Long.MaxValue,
replayMax: Long = Long.MaxValue)
object Recovery {
object Recover {
/**
* Java API.
*
* @see [[Recover]]
* Java API
* @see [[Recovery]]
*/
def create() = Recover()
def create() = Recovery()
/**
* Java API.
*
* @see [[Recover]]
* Java API
* @see [[Recovery]]
*/
def create(toSequenceNr: Long) =
Recover(toSequenceNr = toSequenceNr)
Recovery(toSequenceNr = toSequenceNr)
/**
* Java API.
*
* @see [[Recover]]
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria) =
Recover(fromSnapshot = fromSnapshot)
Recovery(fromSnapshot = fromSnapshot)
/**
* Java API.
*
* @see [[Recover]]
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) =
Recover(fromSnapshot, toSequenceNr)
Recovery(fromSnapshot, toSequenceNr)
/**
* Java API.
*
* @see [[Recover]]
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long, replayMax: Long) =
Recover(fromSnapshot, toSequenceNr, replayMax)
Recovery(fromSnapshot, toSequenceNr, replayMax)
/**
* Convenience method for skipping recovery in [[PersistentActor]].
* @see [[Recovery]]
*/
val none: Recovery = Recovery(toSequenceNr = 0L)
}
/**
@ -214,7 +222,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recover]]
* @see [[Recovery]]
*/
@throws(classOf[Exception])
def onReceiveRecover(msg: Any): Unit

View file

@ -80,7 +80,8 @@ private[akka] object PersistentView {
* - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
with PersistenceIdentity with PersistenceRecovery
with ActorLogging {
import PersistentView._
import JournalProtocol._
@ -97,7 +98,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
private var _lastSequenceNr: Long = 0L
private val internalStash = createStash()
private var currentState: State = recoveryPending
private var currentState: State = recoveryStarted(Long.MaxValue)
/**
* View id is used as identifier for snapshots performed by this [[PersistentView]].
@ -155,7 +156,10 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
* implementation classes to return non-default values.
*/
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax
viewSettings.autoUpdateReplayMax match {
case -1 Long.MaxValue
case value value
}
/**
* Highest received sequence number so far or `0L` if this actor hasn't replayed
@ -174,21 +178,26 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
override def recovery = Recovery(replayMax = autoUpdateReplayMax)
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
*/
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = autoUpdateReplayMax)
if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval,
self, ScheduledUpdate(autoUpdateReplayMax)))
startRecovery(recovery)
if (autoUpdate)
schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax)))
}
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
}
/** INTERNAL API. */
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = {
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
currentState.stateReceive(receive, message)
}
/** INTERNAL API. */
override protected[akka] def aroundPreStart(): Unit = {
@ -230,22 +239,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
def recoveryRunning: Boolean
}
/**
* Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot
* store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed.
*/
private def recoveryPending = new State {
override def toString: String = "recovery pending"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case Recover(fromSnap, toSnr, replayMax)
changeState(recoveryStarted(replayMax))
loadSnapshot(snapshotterId, fromSnap, toSnr)
case _ internalStash.stash()
}
}
/**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receive`. Then initiates a message replay, either starting
@ -260,7 +253,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case LoadSnapshotResult(sso, toSnr)
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
@ -306,7 +298,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
case ReplayMessagesFailure(cause)
try onReplayError(cause) finally onReplayComplete()
case ScheduledUpdate(_) // ignore
case r: Recover // ignore
case Update(a, _)
if (a)
internalStash.stash()
@ -349,7 +340,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
// Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted(receive)
case r: Recover // ignore
case _ internalStash.stash()
}
@ -371,7 +361,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover // ignore
case ScheduledUpdate(replayMax) changeStateToReplayStarted(await = false, replayMax)
case Update(awaitUpdate, replayMax) changeStateToReplayStarted(awaitUpdate, replayMax)
case other PersistentView.super.aroundReceive(receive, other)

View file

@ -87,7 +87,7 @@ final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
* @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound.
* @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound.
*
* @see [[Recover]]
* @see [[Recovery]]
*/
@SerialVersionUID(1L)
final case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) {

View file

@ -85,7 +85,7 @@ abstract class NamedPersistentActor(name: String) extends PersistentActor {
}
trait TurnOffRecoverOnStart { this: Eventsourced
override def preStart(): Unit = ()
override def recovery = Recovery.none
}
class TestException(msg: String) extends Exception(msg) with NoStackTrace

View file

@ -7,15 +7,13 @@ package akka.persistence
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe }
import akka.testkit.{ ImplicitSender, TestLatch, TestProbe }
import com.typesafe.config.Config
import org.scalatest.matchers.{ MatchResult, Matcher }
import scala.collection.immutable.Seq
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Try, Random }
import scala.util.Random
import scala.util.control.NoStackTrace
object PersistentActorSpec {
@ -24,7 +22,7 @@ object PersistentActorSpec {
final case class LatchCmd(latch: TestLatch, data: Any) extends NoSerializationVerificationNeeded
final case class Delete(toSequenceNr: Long)
abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) with PersistentActor {
abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) {
var events: List[Any] = Nil
val updateState: Receive = {
@ -680,7 +678,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-10", "b-11", "b-12", "c-10", "c-11", "c-12"))
}
"recover on command failure" in {
"recover on command failure xoxo" in {
val persistentActor = namedPersistentActor[Behavior3PersistentActor]
persistentActor ! Cmd("b")
persistentActor ! "boom"
@ -1089,7 +1087,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg("a")
receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2"))
}
"make sure persist retains promised semantics when nested in persistAsync callback xoxo" in {
"make sure persist retains promised semantics when nested in persistAsync callback" in {
val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor))
persistentActor ! "a"

View file

@ -14,8 +14,7 @@ object PersistentViewSpec {
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg
persist(msg) { m probe ! s"${m}-${lastSequenceNr}" }
case msg persist(msg) { m probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {

View file

@ -4,19 +4,19 @@
package akka.persistence
import akka.testkit.{ ImplicitSender, EventFilter, TestEvent, AkkaSpec }
import java.io.{ IOException, File }
import akka.actor.{ ActorInitializationException, Props, ActorRef }
import java.io.{ File, IOException }
import akka.actor.{ ActorInitializationException, ActorRef, Props }
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender }
object SnapshotDirectoryFailureSpec {
val inUseSnapshotPath = "target/inUseSnapshotPath"
class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor {
class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor
with TurnOffRecoverOnStart {
override def persistenceId: String = name
override def preStart(): Unit = ()
override def receiveRecover: Receive = {
case SnapshotOffer(md, s) probe ! ((md, s))
}

View file

@ -69,7 +69,6 @@ object SnapshotFailureRobustnessSpec {
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
}
override def preStart() = ()
}
class FailingLocalSnapshotStore extends LocalSnapshotStore {
@ -117,7 +116,6 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf
system.eventStream.subscribe(testActor, classOf[Logging.Error])
try {
val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
lPersistentActor ! Recover()
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 1, timestamp), state)
state should ===("blahonga")

View file

@ -22,6 +22,7 @@ object SnapshotRecoveryLocalStoreSpec {
}
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name)
with TurnOffRecoverOnStart
with ActorLogging {
def receiveCommand = {
@ -30,7 +31,6 @@ object SnapshotRecoveryLocalStoreSpec {
def receiveRecover = {
case other probe ! other
}
override def preStart() = ()
}
}
@ -53,7 +53,7 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con
val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor))
recoveringActor ! Recover()
recoveringActor ! Recovery()
expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) }
expectMsg(RecoveryCompleted)
}

View file

@ -87,7 +87,7 @@ class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config("
sPersistentActor ! "blahonga"
expectMsg(0)
val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor))
lPersistentActor ! Recover()
lPersistentActor ! Recovery()
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 0, timestamp), state)
state should ===(new MySnapshot("blahonga"))

View file

@ -29,7 +29,9 @@ object SnapshotSpec {
}
}
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
class LoadSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef) extends NamedPersistentActor(name) {
override def recovery: Recovery = _recovery
override def receiveRecover: Receive = {
case payload: String probe ! s"${payload}-${lastSequenceNr}"
case offer @ SnapshotOffer(md, s) probe ! offer
@ -45,13 +47,12 @@ object SnapshotSpec {
case offer @ SnapshotOffer(md, s) probe ! offer
case other probe ! other
}
override def preStart() = ()
}
final case class Delete1(metadata: SnapshotMetadata)
final case class DeleteN(criteria: SnapshotSelectionCriteria)
class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, probe) {
class DeleteSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, _recovery, probe) {
override def receiveCommand = receiveDelete orElse super.receiveCommand
def receiveDelete: Receive = {
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr)
@ -82,11 +83,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
"A persistentActor" must {
"recover state starting from the most recent snapshot" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(), testActor))
val persistenceId = name
persistentActor ! Recover()
expectMsgPF() {
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state)
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
@ -97,11 +96,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching an upper sequence number bound" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 3), testActor))
val persistenceId = name
persistentActor ! Recover(toSequenceNr = 3)
expectMsgPF() {
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
@ -111,10 +108,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 4), testActor))
val persistenceId = name
persistentActor ! Recover(toSequenceNr = 4)
persistentActor ! "done"
expectMsgPF() {
@ -126,11 +122,10 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg("done")
}
"recover state starting from the most recent snapshot matching criteria" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2))
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor))
val persistenceId = name
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2))
expectMsgPF() {
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
@ -143,11 +138,10 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3)
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor))
val persistenceId = name
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3)
expectMsgPF() {
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
@ -157,9 +151,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg(RecoveryCompleted)
}
"recover state from scratch if snapshot based recovery is disabled" in {
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3)
val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3)
val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor))
expectMsg("a-1")
expectMsg("b-2")
@ -169,13 +162,13 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
"support single snapshot deletions" in {
val deleteProbe = TestProbe()
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
// recover persistentActor from 3rd snapshot and then delete snapshot
val recovery = Recovery(toSequenceNr = 4)
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor))
val persistenceId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot])
// recover persistentActor from 3rd snapshot and then delete snapshot
persistentActor1 ! Recover(toSequenceNr = 4)
persistentActor1 ! "done"
val metadata = expectMsgPF() {
@ -191,9 +184,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) }
// recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor))
persistentActor2 ! Recover(toSequenceNr = 4)
expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) {
case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state)
state should ===(List("a-1", "b-2").reverse)
@ -206,13 +198,13 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
"support bulk snapshot deletions" in {
val deleteProbe = TestProbe()
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val recovery = Recovery(toSequenceNr = 4)
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor))
val persistenceId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots])
// recover persistentActor and the delete first three (= all) snapshots
persistentActor1 ! Recover(toSequenceNr = 4)
val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4)
persistentActor1 ! DeleteN(criteria)
expectMsgPF() {
@ -224,9 +216,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) }
// recover persistentActor from replayed messages (all snapshots deleted)
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor))
persistentActor2 ! Recover(toSequenceNr = 4)
expectMsg("a-1")
expectMsg("b-2")
expectMsg("c-3")

View file

@ -300,6 +300,7 @@ object PersistentFSMActorSpec {
case Event(Buy, _)
goto(Paid) applying OrderExecuted andThen {
case NonEmptyShoppingCart(items) reportActor ! PurchaseWasMade(items)
case EmptyShoppingCart // do nothing...
}
case Event(Leave, _)
stop applying OrderDiscarded andThen {