diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java similarity index 94% rename from akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java rename to akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index 3c40fbfa4e..66f937591c 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -1,8 +1,7 @@ /** * Copyright (C) 2009-2015 Typesafe Inc. */ - -package doc; +package docs.persistence; import akka.actor.AbstractActor; import akka.actor.ActorPath; @@ -33,22 +32,14 @@ public class LambdaPersistenceDocTest { //#recovery-status } - static Object o1 = new Object() { - - private void recover(ActorRef persistentActor) { - //#recover-explicit - persistentActor.tell(Recover.create(), ActorRef.noSender()); - //#recover-explicit - } - - }; - static Object o2 = new Object() { abstract class MyPersistentActor1 extends AbstractPersistentActor { - //#recover-on-start-disabled + //#recovery-disabled @Override - public void preStart() {} - //#recover-on-start-disabled + public Recovery recovery() { + return Recovery.none(); + } + //#recovery-disabled //#recover-on-restart-disabled @Override @@ -57,12 +48,12 @@ public class LambdaPersistenceDocTest { } abstract class MyPersistentActor2 extends AbstractPersistentActor { - //#recover-on-start-custom + //#recovery-custom @Override - public void preStart() { - self().tell(Recover.create(457L), null); + public Recovery recovery() { + return Recovery.create(457L); } - //#recover-on-start-custom + //#recovery-custom } class MyPersistentActor4 extends AbstractPersistentActor implements PersistentActorMethods { @@ -136,15 +127,6 @@ public class LambdaPersistenceDocTest { } }; - static Object fullyDisabledRecoveyExample = new Object() { - abstract class MyPersistentActor1 extends UntypedPersistentActor { - //#recover-fully-disabled - @Override - public void preStart() { getSelf().tell(Recover.create(0L), getSelf()); } - //#recover-fully-disabled - } - }; - static Object atLeastOnceExample = new Object() { //#at-least-once-example @@ -312,7 +294,7 @@ public class LambdaPersistenceDocTest { private void recover() { //#snapshot-criteria - persistentActor.tell(Recover.create( + persistentActor.tell(Recovery.create( SnapshotSelectionCriteria .create(457L, System.currentTimeMillis())), null); //#snapshot-criteria diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistencePluginDocTest.java similarity index 99% rename from akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java rename to akka-docs/rst/java/code/docs/persistence/LambdaPersistencePluginDocTest.java index 5adf109f99..a485947113 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistencePluginDocTest.java @@ -2,7 +2,7 @@ * Copyright (C) 2009-2015 Typesafe Inc. */ -package doc; +package docs.persistence; //#plugin-imports import akka.dispatch.Futures; diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 5e458a534f..ce6002f338 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -32,41 +32,23 @@ public class PersistenceDocTest { //#recovery-status } - static Object o1 = new Object() { - class MyActor extends UntypedActor { - ActorRef persistentActor; - - public void onReceive(Object message) throws Exception { - } - - private void recover() { - //#recover-explicit - persistentActor.tell(Recover.create(), self()); - //#recover-explicit - } - } - }; - static Object o2 = new Object() { abstract class MyPersistentActor1 extends UntypedPersistentActor { - //#recover-on-start-disabled + //#recovery-disabled @Override - public void preStart() {} - //#recover-on-start-disabled - - //#recover-on-restart-disabled - @Override - public void preRestart(Throwable reason, Option message) {} - //#recover-on-restart-disabled + public Recovery recovery() { + return Recovery.none(); + } + //#recovery-disabled } abstract class MyPersistentActor2 extends UntypedPersistentActor { - //#recover-on-start-custom + //#recovery-custom @Override - public void preStart() { - self().tell(Recover.create(457L), self()); + public Recovery recovery() { + return Recovery.create(457L); } - //#recover-on-start-custom + //#recovery-custom } class MyPersistentActor4 extends UntypedPersistentActor implements PersistentActorMethods { @@ -125,15 +107,6 @@ public class PersistenceDocTest { } }; - static Object fullyDisabledRecoveryExample = new Object() { - abstract class MyPersistentActor1 extends UntypedPersistentActor { - //#recover-fully-disabled - @Override - public void preStart() { self().tell(Recover.create(0L), self()); } - //#recover-fully-disabled - } - }; - static Object atLeastOnceExample = new Object() { //#at-least-once-example @@ -227,7 +200,7 @@ public class PersistenceDocTest { if (message instanceof Msg) { Msg msg = (Msg) message; // ... - getSender().tell(new Confirm(msg.deliveryId), self()); + getSender().tell(new Confirm(msg.deliveryId), getSelf()); } else { unhandled(message); } @@ -304,7 +277,7 @@ public class PersistenceDocTest { private void recover() { //#snapshot-criteria - persistentActor.tell(Recover.create(SnapshotSelectionCriteria.create(457L, + persistentActor.tell(Recovery.create(SnapshotSelectionCriteria.create(457L, System.currentTimeMillis())), null); //#snapshot-criteria } @@ -541,7 +514,7 @@ public class PersistenceDocTest { } }; - static Object o12 = new Object() { + static Object o13 = new Object() { //#view class MyView extends UntypedPersistentView { @Override diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index adfa0d3e5f..fa8bba733f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -161,7 +161,7 @@ Identifiers A persistent actor must have an identifier that doesn't change across different actor incarnations. The identifier must be defined with the ``persistenceId`` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#persistence-id-override .. _recovery-java-lambda: @@ -175,46 +175,29 @@ only be received by a persistent actor after recovery completes. Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ -Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation. +Applications may also customise how recovery is performed by returning a customised ``Recovery`` object +in the ``recovery`` method of a ``AbstractPersistentActor``, for example setting an upper bound to the replay, +which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-custom -In this case, a persistent actor must be recovered explicitly by sending it a ``Recover`` message. +Recovery can be disabled by returning ``Recovery.none`` in the ``recovery`` method of a ``PersistentActor``: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit - -.. warning:: - - If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the - ``PersistentActor`` until it receives a ``Recover`` and finishes recovery. - -In order to completely skip recovery, you can signal it with ``Recover.create(0L)`` - -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-fully-disabled - -If not overridden, ``preStart`` sends a ``Recover`` message to ``self()``. Applications may also override -``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. - -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom - -Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated -recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. - -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-disabled Recovery status ^^^^^^^^^^^^^^^ A persistent actor can query its own recovery status via the methods -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-status Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the persistent actor. The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery and before any other received messages. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#recovery-completed If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. @@ -236,7 +219,7 @@ stash incoming Commands while the Journal is still working on persisting and/or In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.). -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persist-async +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#persist-async .. note:: In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync`` on all incoming messages right away, @@ -259,12 +242,12 @@ use it for *read* operations, and actions which do not have corresponding events Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. It will be kept in memory and used when invoking the handler. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#defer Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender of the command for which this ``defer`` handler was called. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#defer-caller .. warning:: The callback will not be invoked if the actor is restarted (or stopped) in between the call to @@ -282,11 +265,11 @@ however there are situations where it may be useful. It is important to understa those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following example two persist calls are issued, and each of them issues another persist inside its callback: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist-caller +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist-caller First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). @@ -296,11 +279,11 @@ is extended until all nested ``persist`` callbacks have been handled. It is also possible to nest ``persistAsync`` calls, using the same pattern: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting. @@ -317,7 +300,7 @@ will most likely fail anyway, since the journal is probably unavailable. It is b actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#backoff If persistence of an event is rejected before it is stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with @@ -372,7 +355,7 @@ Views Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a @@ -394,7 +377,7 @@ The default update interval of all persistent views of an actor system is config interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an ``Update`` message. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view-update +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view-update If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages @@ -439,12 +422,12 @@ in context of persistent actors but this is also applicable to persistent views. Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#save-snapshot During recovery, the persistent actor is offered a previously saved snapshot via a ``SnapshotOffer`` message from which it can initialize internal state. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#snapshot-offer The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot. They finally recover the persistent actor to its current (i.e. latest) state. @@ -452,7 +435,7 @@ They finally recover the persistent actor to its current (i.e. latest) state. In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#snapshot-criteria If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no @@ -509,7 +492,7 @@ between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender. The sender will then use it to call ``confirmDelivery`` method to complete delivery routine. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#at-least-once-example +.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#at-least-once-example The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple @@ -631,7 +614,7 @@ For an example of snapshot store plugin which writes snapshots as individual fil Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports +.. includecode:: code/docs/persistence/LambdaPersistencePluginDocTest.java#plugin-imports Journal plugin API ------------------ @@ -644,7 +627,7 @@ A journal plugin extends ``AsyncWriteJournal``. If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#sync-journal-plugin-api +.. includecode:: code/docs/persistence/LambdaPersistencePluginDocTest.java#sync-journal-plugin-api A journal plugin must also implement the methods defined in ``AsyncRecovery`` for replays and sequence number recovery: @@ -719,7 +702,7 @@ plugin. This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-usage Internal journal commands (sent by persistent actors) are buffered until injection completes. Injection is idempotent i.e. only the first injection is used. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 3b8442bd97..65bc812108 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -177,32 +177,15 @@ They are cached and received by a persistent actor after recovery phase complete Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ -Automated recovery on start can be disabled by overriding ``preStart`` with an empty or custom implementation. +Applications may also customise how recovery is performed by returning a customised ``Recovery`` object +in the ``recovery`` method of a ``UntypedPersistentActor``, for example setting an upper bound to the replay, +which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-custom -In this case, a persistent actor must be recovered explicitly by sending it a ``Recover`` message. +Recovery can be disabled by returning ``Recovery.none()`` in the ``recovery`` method of a ``PersistentActor``: -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit - -.. warning:: - - If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the - ``PersistentActor`` until it receives a ``Recover`` and finishes recovery. - -In order to completely skip recovery, you can signal it with ``Recover.create(0L)`` - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-fully-disabled - -If not overridden, ``preStart`` sends a ``Recover`` message to ``getSelf()``. Applications may also override -``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom - -Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated -recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-disabled Recovery status ^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 003a2bbbe9..30ea3516f6 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -329,6 +329,30 @@ The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes any more. Events that were deleted with ``permanent=false`` with older version will still not be replayed in this version. +Recover message is gone, replaced by Recovery config +---------------------------------------------------- +Previously the way to cause recover in PersistentActors was sending them a ``Recover()`` message. +Most of the time it was the actor itself sending such message to ``self`` in its ``preStart`` method, +however it was possible to send this message from an external source to any ``PersistentActor`` or ``PresistentView`` +to make it start recovering. + +This style of starting recovery does not fit well with usual Actor best practices: an Actor should be independent +and know about its internal state, and also about its recovery or lack thereof. In order to guide users towards +more independent Actors, the ``Recovery()`` object is now not used as a message, but as configuration option +used by the Actor when it starts. In order to migrate previous code which customised its recovery mode use this example +as reference:: + + // previously + class OldCookieMonster extends PersistentActor { + def preStart() = self ! Recover(toSequenceNr = 42L) + // ... + } + // now: + class NewCookieMonster extends PersistentActor { + override def recovery = Recovery(toSequenceNr = 42L) + // ... + } + Persistence Plugin APIs ======================= diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index aac0379da3..36c621b165 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -6,7 +6,6 @@ package docs.persistence import akka.actor.{ Actor, ActorRef, ActorSystem, Props } import akka.persistence._ -import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.language.postfixOps @@ -27,28 +26,19 @@ object PersistenceDocSpec { //#auto-update """ - object Recovery { + object RecoverySample { trait MyPersistentActor1 extends PersistentActor { - //#recover-on-start-disabled - override def preStart() = () - //#recover-on-start-disabled - //#recover-on-restart-disabled - override def preRestart(reason: Throwable, message: Option[Any]) = () - //#recover-on-restart-disabled + //#recovery-disabled + override def recovery = Recovery.none + //#recovery-disabled } trait MyPersistentActor2 extends PersistentActor { - //#recover-on-start-custom - override def preStart() { - self ! Recover(toSequenceNr = 457L) - } - //#recover-on-start-custom + //#recovery-custom + override def recovery = Recovery(toSequenceNr = 457L) + //#recovery-custom } - //#recover-explicit - persistentActor ! Recover() - //#recover-explicit - class MyPersistentActor4 extends PersistentActor { override def persistenceId = "my-stable-persistence-id" @@ -68,14 +58,6 @@ object PersistenceDocSpec { } } - object NoRecovery { - trait MyPersistentActor1 extends PersistentActor { - //#recover-fully-disabled - override def preStart() = self ! Recover(toSequenceNr = 0L) - //#recover-fully-disabled - } - } - object PersistenceId { trait PersistentActorMethods { //#persistence-id @@ -197,10 +179,8 @@ object PersistenceDocSpec { override def receiveCommand: Receive = ??? } - import akka.actor.Props - //#snapshot-criteria - persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria( + persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria( maxSequenceNr = 457L, maxTimestamp = System.currentTimeMillis)) //#snapshot-criteria @@ -330,7 +310,6 @@ object PersistenceDocSpec { //#nested-persist-persist-caller - class MyPersistAsyncActor extends PersistentActor { override def persistenceId = "my-stable-persistence-id" @@ -351,7 +330,7 @@ object PersistenceDocSpec { persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner } } } - //#nested-persistAsync-persistAsync + //#nested-persistAsync-persistAsync } //#nested-persistAsync-persistAsync-caller diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index fd943bc3b0..fa630c813f 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -161,32 +161,15 @@ They are cached and received by a persistent actor after recovery phase complete Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ -Automated recovery on start can be disabled by overriding ``preStart`` with an empty or custom implementation. +Applications may also customise how recovery is performed by returning a customised ``Recovery`` object +in the ``recovery`` method of a ``PersistentActor``, for example setting an upper bound to the replay, +which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state: -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-custom -In this case, a persistent actor must be recovered explicitly by sending it a ``Recover()`` message. +Recovery can be disabled by returning ``Recovery.none()`` in the ``recovery`` method of a ``PersistentActor``: -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit - -.. warning:: - - If ``preStart`` is overridden by an empty implementation, incoming commands will not be processed by the - ``PersistentActor`` until it receives a ``Recover`` and finishes recovery. - -In order to completely skip recovery, you can signal it with ``Recover(toSequenceNr = OL)`` - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-fully-disabled - -If not overridden, ``preStart`` sends a ``Recover()`` message to ``self``. Applications may also override -``preStart`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom - -Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated -recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-disabled Recovery status ^^^^^^^^^^^^^^^ diff --git a/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala b/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala index 0388bfc906..47e2bde299 100644 --- a/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala @@ -103,13 +103,11 @@ final class BackoffSupervisor( private var child: Option[ActorRef] = None private var restartCount = 0 - override def preStart(): Unit = { + override def preStart(): Unit = startChild() - super.preStart() - } def startChild(): Unit = - if (child == None) { + if (child.isEmpty) { child = Some(context.watch(context.actorOf(childProps, childName))) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 654cb7ea71..675bc2e398 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -37,7 +37,7 @@ private[persistence] object Eventsourced { * Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and * [[UntypedPersistentActor]]. */ -private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity { +private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity with PersistenceRecovery { import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import Eventsourced._ @@ -56,7 +56,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas private var sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L - private var currentState: State = recoveryPending + // safely null because we initialize it with a proper `recoveryStarted` state in aroundPreStart before any real action happens + private var currentState: State = null // Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands private var pendingStashingPersistInvocations: Long = 0 @@ -159,15 +160,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas toSequenceNr, persistenceId, cause.getMessage) } - /** - * User-overridable callback. Called when a persistent actor is started or restarted. - * Default implementation sends a `Recover()` to `self`. Note that if you override - * `preStart` (or `preRestart`) and not call `super.preStart` you must send - * a `Recover()` message to `self` to activate the persistent actor. - */ - @throws(classOf[Exception]) - override def preStart(): Unit = - self ! Recover() + private def startRecovery(recovery: Recovery): Unit = { + changeState(recoveryStarted(recovery.replayMax)) + loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) + } /** INTERNAL API. */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = @@ -177,6 +173,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override protected[akka] def aroundPreStart(): Unit = { // Fail fast on missing plugins. val j = journal; val s = snapshotStore + startRecovery(recovery) super.aroundPreStart() } @@ -203,6 +200,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } + /** INTERNAL API. */ + override protected[akka] def aroundPostRestart(reason: Throwable): Unit = { + startRecovery(recovery) + super.aroundPostRestart(reason) + } + /** INTERNAL API. */ override protected[akka] def aroundPostStop(): Unit = try { @@ -252,7 +255,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * If there is a problem with recovering the state of the actor from the journal, the error * will be logged and the actor will be stopped. * - * @see [[Recover]] + * @see [[Recovery]] */ def receiveRecover: Receive @@ -419,22 +422,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas def recoveryRunning: Boolean } - /** - * Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot - * store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed. - */ - private def recoveryPending = new State { - override def toString: String = "recovery pending" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any): Unit = message match { - case Recover(fromSnap, toSnr, replayMax) ⇒ - changeState(recoveryStarted(replayMax)) - loadSnapshot(snapshotterId, fromSnap, toSnr) - case _ ⇒ internalStash.stash() - } - } - /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receiveRecover`. Then initiates a message replay, either starting @@ -458,11 +445,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } - override def toString: String = s"recovery started (replayMax = [${replayMax}])" + override def toString: String = s"recovery started (replayMax = [$replayMax])" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { - case r: Recover ⇒ // ignore case LoadSnapshotResult(sso, toSnr) ⇒ sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ @@ -491,7 +477,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { - case r: Recover ⇒ // ignore case ReplayedMessage(p) ⇒ try { updateLastSequenceNr(p) @@ -654,17 +639,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas internalStash.unstash() } - private def addToBatch(p: PersistentEnvelope): Unit = p match { - case a: AtomicWrite ⇒ - journalBatch :+= a.copy(payload = - a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid))) - case r: PersistentEnvelope ⇒ - journalBatch :+= r - } - - private def maxBatchSizeReached: Boolean = - journalBatch.size >= maxMessageBatchSize - } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 44f641a2d3..41b2724fa3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -102,6 +102,19 @@ trait PersistenceIdentity { } //#persistence-identity +trait PersistenceRecovery { + //#persistence-recovery + /** + * Called when the persistent actor is started for the first time. + * The returned [[Recovery]] object defines how the Actor will recover its persistent state before + * handling the first incoming message. + * + * To skip recovery completely return `Recovery.none`. + */ + def recovery: Recovery = Recovery() + //#persistence-recovery +} + /** * Persistence extension provider. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index e9f75f30ec..80cf783248 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -22,10 +22,13 @@ case object RecoveryCompleted extends RecoveryCompleted { } /** - * Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has - * previously saved one or more snapshots and at least one of these snapshots matches the specified - * `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled - * messages. + * Recovery mode configuration object to be returned in [[PersistentActor#recovery]]. + * + * By default recovers from latest snapshot replays through to the last available event (last sequenceId). + * + * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots + * and at least one of these snapshots matches the specified `fromSnapshot` criteria. + * Otherwise, recovery will start from scratch by replaying all stored events. * * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]] * message, followed by replayed messages, if any, that are younger than the snapshot, up to the @@ -37,47 +40,52 @@ case object RecoveryCompleted extends RecoveryCompleted { * @param replayMax maximum number of messages to replay. Default is no limit. */ @SerialVersionUID(1L) -final case class Recover(fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue, replayMax: Long = Long.MaxValue) +final case class Recovery( + fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, + toSequenceNr: Long = Long.MaxValue, + replayMax: Long = Long.MaxValue) + +object Recovery { -object Recover { /** - * Java API. - * - * @see [[Recover]] + * Java API + * @see [[Recovery]] */ - def create() = Recover() + def create() = Recovery() /** - * Java API. - * - * @see [[Recover]] + * Java API + * @see [[Recovery]] */ def create(toSequenceNr: Long) = - Recover(toSequenceNr = toSequenceNr) + Recovery(toSequenceNr = toSequenceNr) /** - * Java API. - * - * @see [[Recover]] + * Java API + * @see [[Recovery]] */ def create(fromSnapshot: SnapshotSelectionCriteria) = - Recover(fromSnapshot = fromSnapshot) + Recovery(fromSnapshot = fromSnapshot) /** - * Java API. - * - * @see [[Recover]] + * Java API + * @see [[Recovery]] */ def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) = - Recover(fromSnapshot, toSequenceNr) + Recovery(fromSnapshot, toSequenceNr) /** - * Java API. - * - * @see [[Recover]] + * Java API + * @see [[Recovery]] */ def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long, replayMax: Long) = - Recover(fromSnapshot, toSequenceNr, replayMax) + Recovery(fromSnapshot, toSequenceNr, replayMax) + + /** + * Convenience method for skipping recovery in [[PersistentActor]]. + * @see [[Recovery]] + */ + val none: Recovery = Recovery(toSequenceNr = 0L) } /** @@ -214,7 +222,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * If there is a problem with recovering the state of the actor from the journal, the error * will be logged and the actor will be stopped. * - * @see [[Recover]] + * @see [[Recovery]] */ @throws(classOf[Exception]) def onReceiveRecover(msg: Any): Unit diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 1c02334e45..e21370f211 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -80,7 +80,8 @@ private[akka] object PersistentView { * - [[autoUpdate]] for turning automated updates on or off * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle */ -trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity +trait PersistentView extends Actor with Snapshotter with Stash with StashFactory + with PersistenceIdentity with PersistenceRecovery with ActorLogging { import PersistentView._ import JournalProtocol._ @@ -97,7 +98,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory private var _lastSequenceNr: Long = 0L private val internalStash = createStash() - private var currentState: State = recoveryPending + private var currentState: State = recoveryStarted(Long.MaxValue) /** * View id is used as identifier for snapshots performed by this [[PersistentView]]. @@ -155,7 +156,10 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory * implementation classes to return non-default values. */ def autoUpdateReplayMax: Long = - viewSettings.autoUpdateReplayMax + viewSettings.autoUpdateReplayMax match { + case -1 ⇒ Long.MaxValue + case value ⇒ value + } /** * Highest received sequence number so far or `0L` if this actor hasn't replayed @@ -174,21 +178,26 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory private def updateLastSequenceNr(persistent: PersistentRepr): Unit = if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr + override def recovery = Recovery(replayMax = autoUpdateReplayMax) + /** * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax` * messages (following that snapshot). */ override def preStart(): Unit = { - super.preStart() - self ! Recover(replayMax = autoUpdateReplayMax) - if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, - self, ScheduledUpdate(autoUpdateReplayMax))) + startRecovery(recovery) + if (autoUpdate) + schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax))) + } + + private def startRecovery(recovery: Recovery): Unit = { + changeState(recoveryStarted(recovery.replayMax)) + loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) } /** INTERNAL API. */ - override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { + override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = currentState.stateReceive(receive, message) - } /** INTERNAL API. */ override protected[akka] def aroundPreStart(): Unit = { @@ -230,22 +239,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory def recoveryRunning: Boolean } - /** - * Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot - * store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed. - */ - private def recoveryPending = new State { - override def toString: String = "recovery pending" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any): Unit = message match { - case Recover(fromSnap, toSnr, replayMax) ⇒ - changeState(recoveryStarted(replayMax)) - loadSnapshot(snapshotterId, fromSnap, toSnr) - case _ ⇒ internalStash.stash() - } - } - /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receive`. Then initiates a message replay, either starting @@ -260,7 +253,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { - case r: Recover ⇒ // ignore case LoadSnapshotResult(sso, toSnr) ⇒ sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ @@ -306,7 +298,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory case ReplayMessagesFailure(cause) ⇒ try onReplayError(cause) finally onReplayComplete() case ScheduledUpdate(_) ⇒ // ignore - case r: Recover ⇒ // ignore case Update(a, _) ⇒ if (a) internalStash.stash() @@ -349,7 +340,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) case ReplayMessagesSuccess ⇒ replayCompleted(receive) - case r: Recover ⇒ // ignore case _ ⇒ internalStash.stash() } @@ -371,7 +361,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any): Unit = message match { - case r: Recover ⇒ // ignore case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax) case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax) case other ⇒ PersistentView.super.aroundReceive(receive, other) diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index 526e2b1750..556d5ea879 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -87,7 +87,7 @@ final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound. * - * @see [[Recover]] + * @see [[Recovery]] */ @SerialVersionUID(1L) final case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index e10d4ddf40..57afa899a6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -85,7 +85,7 @@ abstract class NamedPersistentActor(name: String) extends PersistentActor { } trait TurnOffRecoverOnStart { this: Eventsourced ⇒ - override def preStart(): Unit = () + override def recovery = Recovery.none } class TestException(msg: String) extends Exception(msg) with NoStackTrace diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 0cc89b1a3a..cc7f976d24 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -7,15 +7,13 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import akka.actor._ -import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe } +import akka.testkit.{ ImplicitSender, TestLatch, TestProbe } import com.typesafe.config.Config -import org.scalatest.matchers.{ MatchResult, Matcher } import scala.collection.immutable.Seq -import scala.collection.immutable -import scala.concurrent.{ Future, Await } +import scala.concurrent.Await import scala.concurrent.duration._ -import scala.util.{ Try, Random } +import scala.util.Random import scala.util.control.NoStackTrace object PersistentActorSpec { @@ -24,7 +22,7 @@ object PersistentActorSpec { final case class LatchCmd(latch: TestLatch, data: Any) extends NoSerializationVerificationNeeded final case class Delete(toSequenceNr: Long) - abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) with PersistentActor { + abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { var events: List[Any] = Nil val updateState: Receive = { @@ -680,7 +678,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-10", "b-11", "b-12", "c-10", "c-11", "c-12")) } - "recover on command failure" in { + "recover on command failure xoxo" in { val persistentActor = namedPersistentActor[Behavior3PersistentActor] persistentActor ! Cmd("b") persistentActor ! "boom" @@ -1089,7 +1087,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("a") receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2")) } - "make sure persist retains promised semantics when nested in persistAsync callback xoxo" in { + "make sure persist retains promised semantics when nested in persistAsync callback" in { val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor)) persistentActor ! "a" diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index a16b26680b..02c0d6b677 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -14,8 +14,7 @@ object PersistentViewSpec { private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { def receiveCommand = { - case msg ⇒ - persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" } + case msg ⇒ persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" } } override def receiveRecover: Receive = { diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala index 4d604431cd..acc9f07899 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala @@ -4,19 +4,19 @@ package akka.persistence -import akka.testkit.{ ImplicitSender, EventFilter, TestEvent, AkkaSpec } -import java.io.{ IOException, File } -import akka.actor.{ ActorInitializationException, Props, ActorRef } +import java.io.{ File, IOException } + +import akka.actor.{ ActorInitializationException, ActorRef, Props } +import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender } object SnapshotDirectoryFailureSpec { val inUseSnapshotPath = "target/inUseSnapshotPath" - class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor { + class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor + with TurnOffRecoverOnStart { override def persistenceId: String = name - override def preStart(): Unit = () - override def receiveRecover: Receive = { case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index a1ee7b3d1c..4856e8de0b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -69,7 +69,6 @@ object SnapshotFailureRobustnessSpec { case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) case other ⇒ probe ! other } - override def preStart() = () } class FailingLocalSnapshotStore extends LocalSnapshotStore { @@ -117,7 +116,6 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf system.eventStream.subscribe(testActor, classOf[Logging.Error]) try { val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) - lPersistentActor ! Recover() expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) ⇒ state should ===("blahonga") diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index 776adf4d5d..cb34674b4a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -22,6 +22,7 @@ object SnapshotRecoveryLocalStoreSpec { } class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) + with TurnOffRecoverOnStart with ActorLogging { def receiveCommand = { @@ -30,7 +31,6 @@ object SnapshotRecoveryLocalStoreSpec { def receiveRecover = { case other ⇒ probe ! other } - override def preStart() = () } } @@ -53,7 +53,7 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) - recoveringActor ! Recover() + recoveringActor ! Recovery() expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) ⇒ } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index 8fd8da18b7..08f1c46c64 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -87,7 +87,7 @@ class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config(" sPersistentActor ! "blahonga" expectMsg(0) val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) - lPersistentActor ! Recover() + lPersistentActor ! Recovery() expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) ⇒ state should ===(new MySnapshot("blahonga")) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 350e45d160..b6b07c0574 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -29,7 +29,9 @@ object SnapshotSpec { } } - class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + class LoadSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef) extends NamedPersistentActor(name) { + override def recovery: Recovery = _recovery + override def receiveRecover: Receive = { case payload: String ⇒ probe ! s"${payload}-${lastSequenceNr}" case offer @ SnapshotOffer(md, s) ⇒ probe ! offer @@ -45,13 +47,12 @@ object SnapshotSpec { case offer @ SnapshotOffer(md, s) ⇒ probe ! offer case other ⇒ probe ! other } - override def preStart() = () } final case class Delete1(metadata: SnapshotMetadata) final case class DeleteN(criteria: SnapshotSelectionCriteria) - class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, probe) { + class DeleteSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, _recovery, probe) { override def receiveCommand = receiveDelete orElse super.receiveCommand def receiveDelete: Receive = { case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr) @@ -82,11 +83,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn "A persistentActor" must { "recover state starting from the most recent snapshot" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(), testActor)) val persistenceId = name - persistentActor ! Recover() - expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) @@ -97,11 +96,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 3), testActor)) val persistenceId = name - persistentActor ! Recover(toSequenceNr = 3) - expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) @@ -111,10 +108,9 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 4), testActor)) val persistenceId = name - persistentActor ! Recover(toSequenceNr = 4) persistentActor ! "done" expectMsgPF() { @@ -126,11 +122,10 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg("done") } "recover state starting from the most recent snapshot matching criteria" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name - persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) - expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) @@ -143,11 +138,10 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name - persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) - expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) @@ -157,9 +151,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover state from scratch if snapshot based recovery is disabled" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) - - persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3) + val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3) + val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) expectMsg("a-1") expectMsg("b-2") @@ -169,13 +162,13 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn "support single snapshot deletions" in { val deleteProbe = TestProbe() - val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + // recover persistentActor from 3rd snapshot and then delete snapshot + val recovery = Recovery(toSequenceNr = 4) + val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot]) - // recover persistentActor from 3rd snapshot and then delete snapshot - persistentActor1 ! Recover(toSequenceNr = 4) persistentActor1 ! "done" val metadata = expectMsgPF() { @@ -191,9 +184,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ } // recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages - val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) - persistentActor2 ! Recover(toSequenceNr = 4) expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) { case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should ===(List("a-1", "b-2").reverse) @@ -206,13 +198,13 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn "support bulk snapshot deletions" in { val deleteProbe = TestProbe() - val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val recovery = Recovery(toSequenceNr = 4) + val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots]) // recover persistentActor and the delete first three (= all) snapshots - persistentActor1 ! Recover(toSequenceNr = 4) val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4) persistentActor1 ! DeleteN(criteria) expectMsgPF() { @@ -224,9 +216,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) ⇒ } // recover persistentActor from replayed messages (all snapshots deleted) - val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) - persistentActor2 ! Recover(toSequenceNr = 4) expectMsg("a-1") expectMsg("b-2") expectMsg("c-3") diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala index 1340beee9d..1abd66d037 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala @@ -300,6 +300,7 @@ object PersistentFSMActorSpec { case Event(Buy, _) ⇒ goto(Paid) applying OrderExecuted andThen { case NonEmptyShoppingCart(items) ⇒ reportActor ! PurchaseWasMade(items) + case EmptyShoppingCart ⇒ // do nothing... } case Event(Leave, _) ⇒ stop applying OrderDiscarded andThen {