From 3fd240384c697564969edbb2292cbe9ad739b98d Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 24 Jun 2014 16:57:33 +0200 Subject: [PATCH] +per #15424 Added PersistentView, deprecated View A PersistentView works the same way as View did previously, except: * it requires an `peristenceId` (no default is provided) * messages given to `receive` are NOT wrapped in Persistent() akka-streams not touched, will update them afterwards on different branch Also solves #15436 by making persistentId in PersistentView abstract. (cherry picked from commit dcafaf788236fe6d018388dd55d5bf9650ded696) Conflicts: akka-docs/rst/java/lambda-persistence.rst akka-docs/rst/java/persistence.rst akka-docs/rst/scala/persistence.rst akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/View.scala --- .../contrib/pattern/ClusterSharding.scala | 2 + .../docs/persistence/PersistenceDocTest.java | 17 +- akka-docs/rst/java/lambda-persistence.rst | 37 +- akka-docs/rst/java/persistence.rst | 41 ++- .../migration-guide-eventsourced-2.3.x.rst | 2 +- ...e-persistence-experimental-2.3.x-2.4.x.rst | 71 +++- .../docs/persistence/PersistenceDocSpec.scala | 20 +- akka-docs/rst/scala/persistence.rst | 38 ++- .../scala/akka/persistence/Eventsourced.scala | 10 +- .../scala/akka/persistence/Persistent.scala | 4 + .../akka/persistence/PersistentView.scala | 238 +++++++++++++ .../scala/akka/persistence/Recovery.scala | 46 +-- .../main/scala/akka/persistence/View.scala | 44 +-- .../akka/persistence/PersistenceSpec.scala | 5 + .../akka/persistence/PersistentViewSpec.scala | 319 ++++++++++++++++++ .../scala/akka/persistence/ViewSpec.scala | 73 ++-- .../java/doc/LambdaPersistenceDocTest.java | 10 +- .../java/sample/persistence/ViewExample.java | 15 +- .../tutorial/index.html | 2 +- ...xample.java => PersistentViewExample.java} | 34 +- .../tutorial/index.html | 2 +- .../sample/persistence/ViewExample.scala | 9 +- 22 files changed, 847 insertions(+), 192 deletions(-) create mode 100644 akka-persistence/src/main/scala/akka/persistence/PersistentView.scala create mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala rename akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/{ViewExample.java => PersistentViewExample.java} (79%) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fb47bfe818..4347e01c3f 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -1188,6 +1188,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite import ShardCoordinator.Internal._ import ShardRegion.ShardId + override def persistenceId = self.path.toStringWithoutAddress + var persistentState = State.empty var rebalanceInProgress = Set.empty[ShardId] diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 3b8bff72a4..460b6ba041 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -485,18 +485,21 @@ public class PersistenceDocTest { static Object o11 = new Object() { //#view - class MyView extends UntypedView { - @Override - public String persistenceId() { - return "some-persistence-id"; - } + class MyView extends UntypedPersistentView { + @Override public String viewId() { return "some-persistence-id-view"; } + @Override public String persistenceId() { return "some-persistence-id"; } @Override public void onReceive(Object message) throws Exception { - if (message instanceof Persistent) { - // ... + if (isPersistent()) { + // handle message from Journal... + } else if (message instanceof String) { + // handle message from user... + } else { + unhandled(message); } } + } //#view diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 7d7d704319..5f52f84726 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -26,6 +26,17 @@ Akka persistence is inspired by the `eventsourced`_ library. It follows the same .. _eventsourced: https://github.com/eligosource/eventsourced +Changes in Akka 2.3.4 +===================== + +In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified. +In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel`` +and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``. + +See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. +The old classes are still included, and deprecated, for a while to make the transition smooth. +In case you need the old documentation it is located `here `_. + Dependencies ============ @@ -45,7 +56,7 @@ Architecture When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can recover internal state from these messages. -* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another +* *AbstractPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. @@ -260,12 +271,12 @@ to Akka persistence will allow to replay messages that have been marked as delet purposes, for example. To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors should call the ``deleteMessages`` method. -.. _views-java-lambda: +.. _persistent-views-java-lambda: Views ===== -Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method +Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view @@ -275,14 +286,18 @@ the referenced persistent actor is actually running. Views read messages from a persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by default. +It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` +method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases +(skip the ``if isPersistent`` check). + Updates ------- -The default update interval of all views of an actor system is configurable: +The default update interval of all persistent views of an actor system is configurable: .. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval -``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update +``AbstractPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an ``Update`` message. @@ -293,7 +308,7 @@ incremental message replay, triggered by that update request, completed. If set following the update request may interleave with the replayed message stream. Automated updates always run with ``await = false``. -Automated updates of all views of an actor system can be turned off by configuration: +Automated updates of all persistent views of an actor system can be turned off by configuration: .. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update @@ -305,20 +320,20 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa Recovery -------- -Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message +Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. -Further possibilities to customize initial recovery are explained in section :ref:`recovery-java-lambda`. +Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`. .. _persistence-identifiers-java-lambda: Identifiers ----------- -A view must have an identifier that doesn't change across different actor incarnations. It defaults to the +A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the ``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` method. -Applications can customize a view's id by specifying an actor name during view creation. This changes that view's +Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. @@ -338,7 +353,7 @@ Snapshots ========= Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots -in context of persistent actors but this is also applicable to views. +in context of persistent actors but this is also applicable to persistent views. Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 0ad339252a..5a9609d3b6 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -31,6 +31,17 @@ concepts and architecture of `eventsourced`_ but significantly differs on API an .. _eventsourced: https://github.com/eligosource/eventsourced +Changes in Akka 2.3.4 +===================== + +In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified. +In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel`` +and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``. + +See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. +The old classes are still included, and deprecated, for a while to make the transition smooth. +In case you need the old documentation it is located `here `_. + Dependencies ============ @@ -50,7 +61,7 @@ Architecture When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can recover internal state from these messages. -* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another +* *UntypedPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. @@ -265,13 +276,13 @@ to Akka persistence will allow to replay messages that have been marked as delet purposes, for example. To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors should call the ``deleteMessages`` method. -.. _views-java: +.. _persistent-views-java: -Views -===== +Persistent Views +================ -Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``persistenceId`` -methods. +Persistent views can be implemented by extending the ``UntypedPersistentView`` trait and implementing the ``onReceive`` +and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocTest.java#view @@ -280,14 +291,18 @@ the referenced persistent actor is actually running. Views read messages from a persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by default. +It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` +method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases +(skip the ``if isPersistent`` check). + Updates ------- -The default update interval of all views of an actor system is configurable: +The default update interval of all persistent views of an actor system is configurable: .. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval -``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update +``UntypedPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an ``Update`` message. @@ -298,7 +313,7 @@ incremental message replay, triggered by that update request, completed. If set following the update request may interleave with the replayed message stream. Automated updates always run with ``await = false``. -Automated updates of all views of an actor system can be turned off by configuration: +Automated updates of all persistent views of an actor system can be turned off by configuration: .. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update @@ -310,7 +325,7 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa Recovery -------- -Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message +Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`. @@ -319,11 +334,11 @@ Further possibilities to customize initial recovery are explained in section :re Identifiers ----------- -A view must have an identifier that doesn't change across different actor incarnations. It defaults to the +A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the ``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` method. -Applications can customize a view's id by specifying an actor name during view creation. This changes that view's +Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. @@ -343,7 +358,7 @@ Snapshots ========= Snapshots can dramatically reduce recovery times of persistent actor and views. The following discusses snapshots -in context of persistent actor but this is also applicable to views. +in context of persistent actor but this is also applicable to persistent views. Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message diff --git a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst index 1778ace7c3..cc78edec88 100644 --- a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst @@ -137,7 +137,7 @@ Views **Akka Persistence:** ``View`` - Receives the message stream written by a ``PersistentActor`` by reading it directly from the - journal (see :ref:`views`). Alternative to using channels. Useful in situations where actors shall receive a + journal (see :ref:`persistent-views`). Alternative to using channels. Useful in situations where actors shall receive a persistent message stream in correct order without duplicates. - Supports :ref:`snapshots`. diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index ee0b86e1e7..762c76374b 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -15,22 +15,39 @@ Migrating to ``2.4.x`` is as simple as changing all your classes to extending ` Replace all classes like:: - class DeprecatedProcessor extends EventsourcedProcessor { /*...*/ } + class DeprecatedProcessor extends EventsourcedProcessor { + def processorId = "id" + /*...*/ + } To extend ``PersistentActor``:: - class NewPersistentProcessor extends PersistentActor { /*...*/ } + class NewPersistentProcessor extends PersistentActor { + def persistenceId = "id" + /*...*/ + } -Renamed processorId to persistenceId -==================================== +Changed processorId to (abstract) persistenceId +=============================================== In Akka Persistence ``2.3.3`` and previously, the main building block of applications were Processors. Persistent messages, as well as processors implemented the ``processorId`` method to identify which persistent entity a message belonged to. This concept remains the same in Akka ``2.3.4``, yet we rename ``processorId`` to ``persistenceId`` because Processors will be removed, and persistent messages can be used from different classes not only ``PersistentActor`` (Views, directly from Journals etc). -We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of ``processorId`` to ``persistenceId``. +Please note that ``processorId`` is **abstract** in the new API classes (``PersistentActor`` and ``PersistentView``), +and we do **not** provide a default (actor-path derrived) value for it like we did for ``processorId``. +The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical "which persistent entity this actor represents". +A longer discussion on this subject can be found on `issue #15436 `_ on github. + +In case you want to perserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily +implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: + + override def persistenceId = self.path.toStringWithoutAddress + +We provided the renamed method also on already deprecated classes (Channels), +so you can simply apply a global rename of ``processorId`` to ``persistenceId``. Plugin APIs: Renamed PersistentId to PersistenceId ================================================== @@ -76,6 +93,8 @@ the throughput Now deprecated code using Processor:: class OldProcessor extends Processor { + override def processorId = "user-wallet-1337" + def receive = { case Persistent(cmd) => sender() ! cmd } @@ -84,6 +103,8 @@ Now deprecated code using Processor:: Replacement code, with the same semantics, using PersistentActor:: class NewProcessor extends PersistentActor { + override def persistenceId = "user-wallet-1337" + def receiveCommand = { case cmd => persistAsync(cmd) { e => sender() ! e } @@ -100,3 +121,43 @@ any of the problems Futures have when closing over the sender reference. Using the``PersistentActor`` instead of ``Processor`` also shifts the responsibility of deciding if a message should be persisted to the receiver instead of the sender of the message. Previously, using ``Processor``, clients would have to wrap messages as ``Persistent(cmd)`` manually, as well as have to be aware of the receiver being a ``Processor``, which didn't play well with transparency of the ActorRefs in general. + +Renamed View to PersistentView, which receives plain messages (Persistent() wrapper is gone) +============================================================================================ +Views used to receive messages wrapped as ``Persistent(payload, seqNr)``, this is no longer the case and views receive +the ``payload`` as message from the ``Journal`` directly. The rationale here is that the wrapper aproach was inconsistent +with the other Akka Persistence APIs and also is not easily "discoverable" (you have to *know* you will be getting this Persistent wrapper). + +Instead, since ``2.3.4``, views get plain messages, and can use additional methods provided by the ``View`` to identify if a message +was sent from the Journal (had been played back to the view). So if you had code like this:: + + class AverageView extends View { + override def processorId = "average-view" + + def receive = { + case Persistent(msg, seqNr) => + // from Journal + + case msg => + // from user-land + } + +You should update it to extend ``PersistentView`` instead:: + + class AverageView extends PersistentView { + override def persistenceId = "persistence-sample" + override def viewId = "persistence-sample-average" + + def receive = { + case msg if isPersistent => + // from Journal + val seqNr = lastSequenceNr // in case you require the sequence number + + case msg => + // from user-land + } + } + +In case you need to obtain the current sequence number the view is looking at, you can use the ``lastSequenceNr`` method. +It is equivalent to "current sequence number", when ``isPersistent`` returns true, otherwise it yields the sequence number +of the last persistent message that this view was updated with. diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 82baecbf0b..bf4f142336 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -4,12 +4,12 @@ package docs.persistence +import akka.actor.{ Actor, ActorSystem, Props } +import akka.persistence._ + import scala.concurrent.duration._ import scala.language.postfixOps -import akka.actor.{ Props, Actor, ActorSystem } -import akka.persistence._ - trait PersistenceDocSpec { val config = """ @@ -29,7 +29,7 @@ trait PersistenceDocSpec { new AnyRef { //#definition - import akka.persistence.{ Persistent, PersistenceFailure, Processor } + import akka.persistence.{ PersistenceFailure, Persistent, Processor } class MyProcessor extends Processor { def receive = { @@ -208,7 +208,7 @@ trait PersistenceDocSpec { new AnyRef { //#fsm-example import akka.actor.FSM - import akka.persistence.{ Processor, Persistent } + import akka.persistence.{ Persistent, Processor } class PersistentDoor extends Processor with FSM[String, Int] { startWith("closed", 0) @@ -332,7 +332,6 @@ trait PersistenceDocSpec { } new AnyRef { - import akka.actor.ActorRef val processor = system.actorOf(Props[MyPersistentActor]()) @@ -367,7 +366,6 @@ trait PersistenceDocSpec { //#persist-async } new AnyRef { - import akka.actor.ActorRef val processor = system.actorOf(Props[MyPersistentActor]()) @@ -409,11 +407,15 @@ trait PersistenceDocSpec { import akka.actor.Props //#view - class MyView extends View { + class MyView extends PersistentView { override def persistenceId: String = "some-persistence-id" + override def viewId: String = "some-persistence-id-view" def receive: Actor.Receive = { - case Persistent(payload, sequenceNr) => // ... + case payload if isPersistent => + // handle message from journal... + case payload => + // handle message from user-land... } } //#view diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 89d79306f8..73e41fe735 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -26,6 +26,18 @@ concepts and architecture of `eventsourced`_ but significantly differs on API an .. _eventsourced: https://github.com/eligosource/eventsourced +Changes in Akka 2.3.4 +===================== + +In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified. +In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel`` +and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``. + +See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. +The old classes are still included, and deprecated, for a while to make the transition smooth. +In case you need the old documentation it is located `here `_. + + Dependencies ============ @@ -41,7 +53,7 @@ Architecture When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can recover internal state from these messages. -* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another +* *PersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. @@ -263,12 +275,12 @@ persistent actors should call the ``deleteMessages`` method. -.. _views: +.. _persistent-views: -Views -===== +Persistent Views +================ -Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``persistenceId`` +Persistent views can be implemented by extending the ``PersistentView`` trait and implementing the ``receive`` and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view @@ -278,6 +290,10 @@ the referenced persistent actor is actually running. Views read messages from a persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by default. +It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` +method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases +(skip the ``if isPersistent`` check). + Updates ------- @@ -285,7 +301,7 @@ The default update interval of all views of an actor system is configurable: .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval -``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update +``PersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an ``Update`` message. @@ -296,7 +312,7 @@ incremental message replay, triggered by that update request, completed. If set following the update request may interleave with the replayed message stream. Automated updates always run with ``await = false``. -Automated updates of all views of an actor system can be turned off by configuration: +Automated updates of all persistent views of an actor system can be turned off by configuration: .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update @@ -308,7 +324,7 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa Recovery -------- -Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message +Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. Further possibilities to customize initial recovery are explained in section :ref:`recovery`. @@ -317,11 +333,11 @@ Further possibilities to customize initial recovery are explained in section :re Identifiers ----------- -A view must have an identifier that doesn't change across different actor incarnations. It defaults to the +A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the ``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` method. -Applications can customize a view's id by specifying an actor name during view creation. This changes that view's +Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. @@ -372,7 +388,7 @@ Snapshots ========= Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots -in context of persistent actors but this is also applicable to views. +in context of persistent actors but this is also applicable to persistent views. Persistent actors can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index ffa3fa5ab5..e5ecc9aae7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -6,11 +6,11 @@ package akka.persistence import java.lang.{ Iterable ⇒ JIterable } -import scala.collection.immutable - +import akka.actor.AbstractActor import akka.japi.{ Procedure, Util } import akka.persistence.JournalProtocol._ -import akka.actor.{ ActorRef, AbstractActor } + +import scala.collection.immutable /** * INTERNAL API. @@ -550,7 +550,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is + * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ @throws(classOf[Exception]) @@ -563,7 +563,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is + * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ @deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index bf08fbb308..91e3124dd5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -85,6 +85,7 @@ object Persistent { /** * [[Persistent]] extractor. */ + @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") def unapply(persistent: Persistent): Option[(Any, Long)] = Some((persistent.payload, persistent.sequenceNr)) } @@ -113,6 +114,7 @@ object ConfirmablePersistent { /** * [[ConfirmablePersistent]] extractor. */ + @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") def unapply(persistent: ConfirmablePersistent): Option[(Any, Long, Int)] = Some((persistent.payload, persistent.sequenceNr, persistent.redeliveries)) } @@ -328,6 +330,7 @@ private[persistence] final case class PersistentImpl( /** * INTERNAL API. */ +@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4") private[persistence] final case class ConfirmablePersistentImpl( payload: Any, sequenceNr: Long, @@ -357,6 +360,7 @@ private[persistence] final case class ConfirmablePersistentImpl( /** * INTERNAL API. */ +@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4") private[persistence] object ConfirmablePersistentImpl { def apply(persistent: PersistentRepr, confirmMessage: Delivered, confirmTarget: ActorRef = null): ConfirmablePersistentImpl = ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.persistenceId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala new file mode 100644 index 0000000000..1ad88fac48 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -0,0 +1,238 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.persistence.JournalProtocol._ + +/** + * Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with + * all messages from the corresponding persistent id's journal that have not yet been consumed by the view. + * To update a view with messages that have been written after handling this request, another `Update` + * request must be sent to the view. + * + * @param await if `true`, processing of further messages sent to the view will be delayed until the + * incremental message replay, triggered by this update request, completes. If `false`, + * any message sent to the view may interleave with replayed [[Persistent]] message + * stream. + * @param replayMax maximum number of messages to replay when handling this update request. Defaults + * to `Long.MaxValue` (i.e. no limit). + */ +@SerialVersionUID(1L) +final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue) + +object Update { + /** + * Java API. + */ + def create() = + Update() + + /** + * Java API. + */ + def create(await: Boolean) = + Update(await) + + /** + * Java API. + */ + def create(await: Boolean, replayMax: Long) = + Update(await, replayMax) +} + +/** + * A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive + * the message stream directly from the Journal. These messages can be processed to update internal state + * in order to maintain an (eventual consistent) view of the state of the corresponding processor. A + * persistent view can also run on a different node, provided that a replicated journal is used. + * + * Implementation classes refer to a persistent actors' message stream by implementing `persistenceId` + * with the corresponding (shared) identifier value. + * + * Views can also store snapshots of internal state by calling [[autoUpdate]]. The snapshots of a view + * are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered + * to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger + * than the snapshot. Default is to offer the latest saved snapshot. + * + * By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`. + * This method can be overridden by implementation classes to define a view instance-specific update + * interval. The default update interval for all views of an actor system can be configured with the + * `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional + * view updates by sending the view [[Update]] requests. See also methods + * + * - [[autoUpdate]] for turning automated updates on or off + * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle + */ +trait PersistentView extends Actor with Recovery { + import context.dispatcher + + /** + * INTERNAL API. + * + * Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests + * sent by users. + */ + private[persistence] override def replayStarted(await: Boolean) = new State { + private var delegateAwaiting = await + private var delegate = PersistentView.super.replayStarted(await) + + override def toString: String = delegate.toString + + override def aroundReceive(receive: Receive, message: Any) = message match { + case Update(false, _) ⇒ // ignore + case u @ Update(true, _) if !delegateAwaiting ⇒ + delegateAwaiting = true + delegate = PersistentView.super.replayStarted(await = true) + delegate.aroundReceive(receive, u) + case other ⇒ + delegate.aroundReceive(receive, other) + } + } + + /** + * When receiving an [[Update]] request, switches to `replayStarted` state and triggers + * an incremental message replay. Invokes the actor's current behavior for any other + * received message. + */ + private val idle: State = new State { + override def toString: String = "idle" + + def aroundReceive(receive: Receive, message: Any): Unit = message match { + case r: Recover ⇒ // ignore + case Update(awaitUpdate, replayMax) ⇒ + _currentState = replayStarted(await = awaitUpdate) + journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) + case other ⇒ process(receive, other) + } + } + + /** + * INTERNAL API. + */ + private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit = + onReplayComplete(await) + + /** + * INTERNAL API. + */ + private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = + onReplayComplete(await) + + /** + * Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`. + */ + private def onReplayComplete(await: Boolean): Unit = { + _currentState = idle + if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax))) + if (await) receiverStash.unstashAll() + } + + /** + * INTERNAL API + * WARNING: This implementation UNWRAPS PERSISTENT() before delivering to the receive block. + */ + override private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = + super.withCurrentPersistent(persistent) { p ⇒ + receive.applyOrElse(p.payload, unhandled) + } + + private val viewSettings = extension.settings.view + + private var schedule: Option[Cancellable] = None + + /** + * View id is used as identifier for snapshots performed by this [[PersistentView]]. + * This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream. + * + * + * The usual case is to have a *different* id set as `viewId` than `persistenceId`, + * although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots + * based on some average or sum, calculated by this view. + * + * Example: + * {{{ + * class SummingView extends PersistentView { + * override def persistenceId = "count-123" + * override def viewId = "count-123-sum" // this view is performing summing, + * // so this view's snapshots reside under the "-sum" suffixed id + * + * // ... + * } + * }}} + */ + def viewId: String + + /** + * Returns `viewId`. + */ + def snapshotterId: String = viewId + + /** + * If `true`, the currently processed message was persisted (is sent from the Journal). + * If `false`, the currently processed message comes from another actor (from "user-land"). + */ + def isPersistent: Boolean = + currentPersistentMessage.isDefined + + /** + * If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`. + * If `false`, applications must explicitly update this view by sending [[Update]] requests. The default + * value can be configured with the `akka.persistence.view.auto-update` configuration key. This method + * can be overridden by implementation classes to return non-default values. + */ + def autoUpdate: Boolean = + viewSettings.autoUpdate + + /** + * The interval for automated updates. The default value can be configured with the + * `akka.persistence.view.auto-update-interval` configuration key. This method can be + * overridden by implementation classes to return non-default values. + */ + def autoUpdateInterval: FiniteDuration = + viewSettings.autoUpdateInterval + + /** + * The maximum number of messages to replay per update. The default value can be configured with the + * `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by + * implementation classes to return non-default values. + */ + def autoUpdateReplayMax: Long = + viewSettings.autoUpdateReplayMax + + /** + * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax` + * messages (following that snapshot). + */ + override def preStart(): Unit = { + super.preStart() + self ! Recover(replayMax = autoUpdateReplayMax) + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + try receiverStash.unstashAll() finally super.preRestart(reason, message) + } + + override def postStop(): Unit = { + schedule.foreach(_.cancel()) + super.postStop() + } +} + +/** + * Java API. + * + * @see [[PersistentView]] + */ +abstract class UntypedPersistentView extends UntypedActor with PersistentView + +/** + * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]) + * + * @see [[PersistentView]] + */ +abstract class AbstractPersistentView extends AbstractActor with PersistentView diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 68152cbbdf..2783cd1a62 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -9,6 +9,8 @@ import akka.dispatch.Envelope import akka.persistence.JournalProtocol._ import akka.persistence.SnapshotProtocol.LoadSnapshotResult +import scala.util.control.NonFatal + /** * Recovery state machine that loads snapshots and replays messages. * @@ -30,19 +32,6 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { protected def processPersistent(receive: Receive, persistent: Persistent) = withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled)) - protected def updateLastSequenceNr(persistent: Persistent): Unit = - if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr - - def updateLastSequenceNr(value: Long): Unit = - _lastSequenceNr = value - - /** INTERNAL API */ - private[akka] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { - _currentPersistent = persistent - updateLastSequenceNr(persistent) - body(persistent) - } finally _currentPersistent = null - protected def recordFailure(cause: Throwable): Unit = { _recoveryFailureCause = cause _recoveryFailureMessage = context.asInstanceOf[ActorCell].currentMessage @@ -108,11 +97,12 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { def aroundReceive(receive: Receive, message: Any) = message match { case r: Recover ⇒ // ignore - case ReplayedMessage(p) ⇒ try { processPersistent(receive, p) } catch { - case t: Throwable ⇒ - _currentState = replayFailed // delay throwing exception to prepareRestart - recordFailure(t) - } + case ReplayedMessage(p) ⇒ + try processPersistent(receive, p) catch { + case NonFatal(t) ⇒ + _currentState = replayFailed // delay throwing exception to prepareRestart + recordFailure(t) + } case ReplayMessagesSuccess ⇒ onReplaySuccess(receive, await) case ReplayMessagesFailure(cause) ⇒ onReplayFailure(receive, await, cause) case other ⇒ @@ -174,7 +164,25 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { @deprecated("Override `persistenceId` instead. Processor will be removed.", since = "2.3.4") def processorId: String = extension.persistenceId(self) // TODO: remove processorId - def persistenceId: String = processorId + /** + * Id of the persistent entity for which messages should be replayed. + */ + def persistenceId: String + + /** INTERNAL API */ + private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { + _currentPersistent = persistent + updateLastSequenceNr(persistent) + body(persistent) + } finally _currentPersistent = null + + /** INTERNAL API. */ + private[persistence] def updateLastSequenceNr(persistent: Persistent): Unit = + if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr + + /** INTERNAL API. */ + private[persistence] def updateLastSequenceNr(value: Long): Unit = + _lastSequenceNr = value /** * Returns the current persistent message if there is any. diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index 26ed3a9208..28048c73b3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -9,42 +9,6 @@ import scala.concurrent.duration._ import akka.actor._ import akka.persistence.JournalProtocol._ -/** - * Instructs a [[View]] to update itself. This will run a single incremental message replay with all - * messages from the corresponding processor's journal that have not yet been consumed by the view. - * To update a view with messages that have been written after handling this request, another `Update` - * request must be sent to the view. - * - * @param await if `true`, processing of further messages sent to the view will be delayed until the - * incremental message replay, triggered by this update request, completes. If `false`, - * any message sent to the view may interleave with replayed [[Persistent]] message - * stream. - * @param replayMax maximum number of messages to replay when handling this update request. Defaults - * to `Long.MaxValue` (i.e. no limit). - */ -@SerialVersionUID(1L) -final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue) - -case object Update { - /** - * Java API. - */ - def create() = - Update() - - /** - * Java API. - */ - def create(await: Boolean) = - Update(await) - - /** - * Java API. - */ - def create(await: Boolean, replayMax: Long) = - Update(await, replayMax) -} - /** * A view replicates the persistent message stream of a processor. Implementation classes receive the * message stream as [[Persistent]] messages. These messages can be processed to update internal state @@ -68,6 +32,7 @@ case object Update { * * Views can also use channels to communicate with destinations in the same way as processors can do. */ +@deprecated("Use `akka.persistence.PersistentView` instead.", since = "2.3.4") trait View extends Actor with Recovery { import context.dispatcher @@ -147,6 +112,11 @@ trait View extends Actor with Recovery { */ def snapshotterId: String = viewId + /** + * Persistence id. Defaults to this persistent-actors's path and can be overridden. + */ + override def persistenceId: String = processorId + /** * If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`. * If `false`, applications must explicitly update this view by sending [[Update]] requests. The default @@ -196,6 +166,7 @@ trait View extends Actor with Recovery { * * @see [[View]] */ +@deprecated("Use `akka.persistence.UntypedPersistentView instead.", since = "2.3.4") abstract class UntypedView extends UntypedActor with View /** @@ -203,4 +174,5 @@ abstract class UntypedView extends UntypedActor with View * * @see [[View]] */ +@deprecated("Use `akka.persistence.AbstractPersistentView` instead.", since = "2.3.4") abstract class AbstractView extends AbstractActor with View diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index c4ec7a2449..ee760b67f7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -76,10 +76,15 @@ trait Cleanup { this: AkkaSpec ⇒ } } +@deprecated("Use NamedPersistentActor instead.", since = "2.3.4") abstract class NamedProcessor(name: String) extends Processor { override def persistenceId: String = name } +abstract class NamedPersistentActor(name: String) extends PersistentActor { + override def persistenceId: String = name +} + trait TurnOffRecoverOnStart { this: Processor ⇒ override def preStart(): Unit = () } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala new file mode 100644 index 0000000000..c1855bed78 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -0,0 +1,319 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import akka.actor._ +import akka.persistence.JournalProtocol.ReplayMessages +import akka.testkit._ +import com.typesafe.config.Config + +import scala.concurrent.duration._ + +object PersistentViewSpec { + + private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + def receiveCommand = { + case msg ⇒ + persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" } + } + + override def receiveRecover: Receive = { + case _ ⇒ // do nothing... + } + } + + private class TestPersistentView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends PersistentView { + def this(name: String, probe: ActorRef, interval: FiniteDuration) = + this(name, probe, interval, None) + + def this(name: String, probe: ActorRef) = + this(name, probe, 100.milliseconds) + + override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system) + override val persistenceId: String = name + override val viewId: String = name + "-view" + + var last: String = _ + + def receive = { + case "get" ⇒ + probe ! last + case "boom" ⇒ + throw new TestException("boom") + + case payload if isPersistent && shouldFailOn(payload) ⇒ + throw new TestException("boom") + + case payload if isPersistent ⇒ + last = s"replicated-${payload}-${lastSequenceNr}" + probe ! last + } + + override def postRestart(reason: Throwable): Unit = { + super.postRestart(reason) + failAt = None + } + + def shouldFailOn(m: Any): Boolean = + failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) } + } + + private class PassiveTestPersistentView(name: String, probe: ActorRef, var failAt: Option[String]) extends PersistentView { + override val persistenceId: String = name + override val viewId: String = name + "-view" + + override def autoUpdate: Boolean = false + override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery + + var last: String = _ + + def receive = { + case "get" ⇒ + probe ! last + case payload if isPersistent && shouldFailOn(payload) ⇒ + throw new TestException("boom") + case payload ⇒ + last = s"replicated-${payload}-${lastSequenceNr}" + } + + override def postRestart(reason: Throwable): Unit = { + super.postRestart(reason) + failAt = None + } + + def shouldFailOn(m: Any): Boolean = + failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) } + + } + + private class ActiveTestPersistentView(name: String, probe: ActorRef) extends PersistentView { + override val persistenceId: String = name + override val viewId: String = name + "-view" + + override def autoUpdateInterval: FiniteDuration = 50.millis + override def autoUpdateReplayMax: Long = 2 + + def receive = { + case payload ⇒ + probe ! s"replicated-${payload}-${lastSequenceNr}" + } + } + + private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView { + override val persistenceId: String = name + override val viewId: String = name + "-view" + + def receive = { + case payload if isPersistent ⇒ probe ! s"replicated-${payload}-${lastSequenceNr}" + case payload ⇒ probe ! s"normal-${payload}-${lastSequenceNr}" + } + } + + private class SnapshottingPersistentView(name: String, probe: ActorRef) extends PersistentView { + override val persistenceId: String = name + override val viewId: String = s"${name}-replicator" + + override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) + + var last: String = _ + + def receive = { + case "get" ⇒ + probe ! last + case "snap" ⇒ + saveSnapshot(last) + case "restart" ⇒ + throw new TestException("restart requested") + case SaveSnapshotSuccess(_) ⇒ + probe ! "snapped" + case SnapshotOffer(metadata, snapshot: String) ⇒ + last = snapshot + probe ! last + case payload ⇒ + last = s"replicated-${payload}-${lastSequenceNr}" + probe ! last + } + } +} + +abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { + import akka.persistence.PersistentViewSpec._ + + var persistentActor: ActorRef = _ + var view: ActorRef = _ + + var persistentActorProbe: TestProbe = _ + var viewProbe: TestProbe = _ + + override protected def beforeEach(): Unit = { + super.beforeEach() + + persistentActorProbe = TestProbe() + viewProbe = TestProbe() + + persistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, persistentActorProbe.ref)) + persistentActor ! "a" + persistentActor ! "b" + + persistentActorProbe.expectMsg("a-1") + persistentActorProbe.expectMsg("b-2") + } + + override protected def afterEach(): Unit = { + system.stop(persistentActor) + system.stop(view) + super.afterEach() + } + + def subscribeToConfirmation(probe: TestProbe): Unit = + system.eventStream.subscribe(probe.ref, classOf[Delivered]) + + def awaitConfirmation(probe: TestProbe): Unit = + probe.expectMsgType[Delivered] + + def subscribeToReplay(probe: TestProbe): Unit = + system.eventStream.subscribe(probe.ref, classOf[ReplayMessages]) + + "A persistent view" must { + "receive past updates from a processor" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + } + "receive live updates from a processor" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + persistentActor ! "c" + viewProbe.expectMsg("replicated-c-3") + } + "run updates at specified interval" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 2.seconds)) + // initial update is done on start + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + // live updates takes 5 seconds to replicate + persistentActor ! "c" + viewProbe.expectNoMsg(1.second) + viewProbe.expectMsg("replicated-c-3") + } + "run updates on user request" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + persistentActor ! "c" + persistentActorProbe.expectMsg("c-3") + view ! Update(await = false) + viewProbe.expectMsg("replicated-c-3") + } + "run updates on user request and await update" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + persistentActor ! "c" + persistentActorProbe.expectMsg("c-3") + view ! Update(await = true) + view ! "get" + viewProbe.expectMsg("replicated-c-3") + } + "run updates again on failure outside an update cycle" in { + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + view ! "boom" + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + } + "run updates again on failure during an update cycle" in { + persistentActor ! "c" + persistentActorProbe.expectMsg("c-3") + view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds, Some("b"))) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + viewProbe.expectMsg("replicated-c-3") + } + "run size-limited updates on user request" in { + persistentActor ! "c" + persistentActor ! "d" + persistentActor ! "e" + persistentActor ! "f" + + persistentActorProbe.expectMsg("c-3") + persistentActorProbe.expectMsg("d-4") + persistentActorProbe.expectMsg("e-5") + persistentActorProbe.expectMsg("f-6") + + view = system.actorOf(Props(classOf[PassiveTestPersistentView], name, viewProbe.ref, None)) + + view ! Update(await = true, replayMax = 2) + view ! "get" + viewProbe.expectMsg("replicated-b-2") + + view ! Update(await = true, replayMax = 1) + view ! "get" + viewProbe.expectMsg("replicated-c-3") + + view ! Update(await = true, replayMax = 4) + view ! "get" + viewProbe.expectMsg("replicated-f-6") + } + "run size-limited updates automatically" in { + val replayProbe = TestProbe() + + persistentActor ! "c" + persistentActor ! "d" + + persistentActorProbe.expectMsg("c-3") + persistentActorProbe.expectMsg("d-4") + + subscribeToReplay(replayProbe) + + view = system.actorOf(Props(classOf[ActiveTestPersistentView], name, viewProbe.ref)) + + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + viewProbe.expectMsg("replicated-c-3") + viewProbe.expectMsg("replicated-d-4") + + replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ } + } + "check if an incoming message is persistent" in { + persistentActor ! "c" + + persistentActorProbe.expectMsg("c-3") + + view = system.actorOf(Props(classOf[PersistentOrNotTestPersistentView], name, viewProbe.ref)) + + view ! "d" + view ! "e" + + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + viewProbe.expectMsg("replicated-c-3") + viewProbe.expectMsg("normal-d-3") + viewProbe.expectMsg("normal-e-3") + + persistentActor ! "f" + viewProbe.expectMsg("replicated-f-4") + } + "take snapshots" in { + view = system.actorOf(Props(classOf[SnapshottingPersistentView], name, viewProbe.ref)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + view ! "snap" + viewProbe.expectMsg("snapped") + view ! "restart" + persistentActor ! "c" + viewProbe.expectMsg("replicated-b-2") + viewProbe.expectMsg("replicated-c-3") + } + } +} + +class LeveldbPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentViewSpec")) +class InmemPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("inmem", "InmemPersistentViewSpec")) + diff --git a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala index 2fc01670ef..bad21e7bc2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala @@ -3,23 +3,26 @@ */ package akka.persistence -import scala.concurrent.duration._ - +import akka.actor._ +import akka.persistence.JournalProtocol.ReplayMessages +import akka.testkit._ import com.typesafe.config.Config -import akka.actor._ -import akka.testkit._ -import akka.persistence.JournalProtocol.ReplayMessages +import scala.concurrent.duration._ object ViewSpec { - class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { - def receive = { - case Persistent(payload, sequenceNr) ⇒ - probe ! s"${payload}-${sequenceNr}" + private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + def receiveCommand = { + case msg ⇒ + persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" } + } + + override def receiveRecover: Receive = { + case _ ⇒ } } - class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View { + private class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View { def this(name: String, probe: ActorRef, interval: FiniteDuration) = this(name, probe, interval, None) @@ -27,7 +30,7 @@ object ViewSpec { this(name, probe, 100.milliseconds) override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system) - override val persistenceId: String = name + override val processorId: String = name var last: String = _ @@ -49,7 +52,7 @@ object ViewSpec { } } - class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View { + private class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View { override val persistenceId: String = name override def autoUpdate: Boolean = false @@ -73,7 +76,7 @@ object ViewSpec { } - class ActiveTestView(name: String, probe: ActorRef) extends View { + private class ActiveTestView(name: String, probe: ActorRef) extends View { override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 50.millis override def autoUpdateReplayMax: Long = 2 @@ -84,7 +87,7 @@ object ViewSpec { } } - class TestDestination(probe: ActorRef) extends Actor { + private class TestDestination(probe: ActorRef) extends Actor { def receive = { case cp @ ConfirmablePersistent(payload, sequenceNr, _) ⇒ cp.confirm() @@ -92,7 +95,7 @@ object ViewSpec { } } - class EmittingView(name: String, destination: ActorRef) extends View { + private class EmittingView(name: String, destination: ActorRef) extends View { override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system) @@ -106,7 +109,7 @@ object ViewSpec { } } - class SnapshottingView(name: String, probe: ActorRef) extends View { + private class SnapshottingView(name: String, probe: ActorRef) extends View { override val persistenceId: String = name override val viewId: String = s"${name}-replicator" @@ -134,9 +137,9 @@ object ViewSpec { } abstract class ViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { - import ViewSpec._ + import akka.persistence.ViewSpec._ - var processor: ActorRef = _ + var persistor: ActorRef = _ var view: ActorRef = _ var processorProbe: TestProbe = _ @@ -148,16 +151,16 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc processorProbe = TestProbe() viewProbe = TestProbe() - processor = system.actorOf(Props(classOf[TestProcessor], name, processorProbe.ref)) - processor ! Persistent("a") - processor ! Persistent("b") + persistor = system.actorOf(Props(classOf[TestPersistentActor], name, processorProbe.ref)) + persistor ! "a" + persistor ! "b" processorProbe.expectMsg("a-1") processorProbe.expectMsg("b-2") } override protected def afterEach(): Unit = { - system.stop(processor) + system.stop(persistor) system.stop(view) super.afterEach() } @@ -181,7 +184,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref)) viewProbe.expectMsg("replicated-a-1") viewProbe.expectMsg("replicated-b-2") - processor ! Persistent("c") + persistor ! "c" viewProbe.expectMsg("replicated-c-3") } "run updates at specified interval" in { @@ -190,7 +193,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc viewProbe.expectMsg("replicated-a-1") viewProbe.expectMsg("replicated-b-2") // live updates takes 5 seconds to replicate - processor ! Persistent("c") + persistor ! "c" viewProbe.expectNoMsg(1.second) viewProbe.expectMsg("replicated-c-3") } @@ -198,7 +201,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds)) viewProbe.expectMsg("replicated-a-1") viewProbe.expectMsg("replicated-b-2") - processor ! Persistent("c") + persistor ! "c" processorProbe.expectMsg("c-3") view ! Update(await = false) viewProbe.expectMsg("replicated-c-3") @@ -207,7 +210,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds)) viewProbe.expectMsg("replicated-a-1") viewProbe.expectMsg("replicated-b-2") - processor ! Persistent("c") + persistor ! "c" processorProbe.expectMsg("c-3") view ! Update(await = true) view ! "get" @@ -222,7 +225,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc viewProbe.expectMsg("replicated-b-2") } "run updates again on failure during an update cycle" in { - processor ! Persistent("c") + persistor ! "c" processorProbe.expectMsg("c-3") view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds, Some("b"))) viewProbe.expectMsg("replicated-a-1") @@ -231,10 +234,10 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc viewProbe.expectMsg("replicated-c-3") } "run size-limited updates on user request" in { - processor ! Persistent("c") - processor ! Persistent("d") - processor ! Persistent("e") - processor ! Persistent("f") + persistor ! "c" + persistor ! "d" + persistor ! "e" + persistor ! "f" processorProbe.expectMsg("c-3") processorProbe.expectMsg("d-4") @@ -258,8 +261,8 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc "run size-limited updates automatically" in { val replayProbe = TestProbe() - processor ! Persistent("c") - processor ! Persistent("d") + persistor ! "c" + persistor ! "d" processorProbe.expectMsg("c-3") processorProbe.expectMsg("d-4") @@ -294,7 +297,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc awaitConfirmation(confirmProbe) view ! "restart" - processor ! Persistent("c") + persistor ! "c" destinationProbe.expectMsg("emitted-c-3") awaitConfirmation(confirmProbe) @@ -306,7 +309,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc view ! "snap" viewProbe.expectMsg("snapped") view ! "restart" - processor ! Persistent("c") + persistor ! "c" viewProbe.expectMsg("replicated-b-2") viewProbe.expectMsg("replicated-c-3") } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 579d859a9d..be5f3744b1 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -496,15 +496,13 @@ public class LambdaPersistenceDocTest { static Object o11 = new Object() { //#view - class MyView extends AbstractView { - @Override - public String persistenceId() { - return "some-persistence-id"; - } + class MyView extends AbstractPersistentView { + @Override public String persistenceId() { return "some-persistence-id"; } + @Override public String viewId() { return "some-persistence-id-view"; } public MyView() { receive(ReceiveBuilder. - match(Persistent.class, persistent -> { + match(Object.class, p -> isPersistent(), persistent -> { // ... }).build() ); diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java index 39f9273c83..7b3659cf99 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java @@ -45,26 +45,21 @@ public class ViewExample { } } - public static class ExampleView extends AbstractView { + public static class ExampleView extends AbstractPersistentView { private int numReplicated = 0; - @Override - public String viewId() { + @Override public String persistenceId() { return "persistentActor-5"; } + @Override public String viewId() { return "view-5"; } - @Override - public String persistenceId() { - return "persistentActor-5"; - } - public ExampleView() { receive(ReceiveBuilder. - match(Persistent.class, p -> { + match(Object.class, m -> isPersistent(), msg -> { numReplicated += 1; System.out.println(String.format("view received %s (num replicated = %d)", - p.payload(), + msg, numReplicated)); }). match(SnapshotOffer.class, so -> { diff --git a/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html b/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html index f94463f3da..ba10ebd78d 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html +++ b/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html @@ -81,7 +81,7 @@ snapshotting to reduce recovery time.

To run this example, go to the Run tab, and run the application main class -sample.persistence.ViewExample. +sample.persistence.PersistentViewExample.

diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java similarity index 79% rename from akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java index d22b6634d3..dc2c713581 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java @@ -1,13 +1,17 @@ package sample.persistence; -import java.util.concurrent.TimeUnit; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.Procedure; +import akka.persistence.SnapshotOffer; +import akka.persistence.UntypedPersistentActor; +import akka.persistence.UntypedPersistentView; import scala.concurrent.duration.Duration; -import akka.actor.*; -import akka.persistence.*; -import akka.japi.Procedure; +import java.util.concurrent.TimeUnit; -public class ViewExample { +public class PersistentViewExample { public static class ExamplePersistentActor extends UntypedPersistentActor { private int count = 1; @@ -41,32 +45,26 @@ public class ViewExample { } } - public static class ExampleView extends UntypedView { + public static class ExampleView extends UntypedPersistentView { private int numReplicated = 0; - @Override - public String viewId() { - return "view-5"; - } - - @Override - public String persistenceId() { - return "persistentActor-5"; - } + @Override public String persistenceId() { return "persistentActor-5"; } + @Override public String viewId() { return "view-5"; } @Override public void onReceive(Object message) throws Exception { - if (message instanceof Persistent) { - Persistent p = (Persistent)message; + if (isPersistent()) { numReplicated += 1; - System.out.println(String.format("view received %s (num replicated = %d)", p.payload(), numReplicated)); + System.out.println(String.format("view received %s (num replicated = %d)", message, numReplicated)); } else if (message instanceof SnapshotOffer) { SnapshotOffer so = (SnapshotOffer)message; numReplicated = (Integer)so.snapshot(); System.out.println(String.format("view received snapshot offer %s (metadata = %s)", numReplicated, so.metadata())); } else if (message.equals("snap")) { saveSnapshot(numReplicated); + } else { + unhandled(message); } } } diff --git a/akka-samples/akka-sample-persistence-java/tutorial/index.html b/akka-samples/akka-sample-persistence-java/tutorial/index.html index 8431c352df..640669c5ff 100644 --- a/akka-samples/akka-sample-persistence-java/tutorial/index.html +++ b/akka-samples/akka-sample-persistence-java/tutorial/index.html @@ -106,7 +106,7 @@ snapshotting to reduce recovery time.

To run this example, go to the Run tab, and run the application main class -sample.persistence.ViewExample. +sample.persistence.PersistentViewExample.

diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala index 53059cecfb..dbfaa88f93 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala @@ -24,11 +24,10 @@ object ViewExample extends App { } } - class ExampleView extends View { + class ExampleView extends PersistentView { private var numReplicated = 0 override def persistenceId: String = "persistentActor-5" - override def viewId = "view-5" def receive = { @@ -37,9 +36,11 @@ object ViewExample extends App { case SnapshotOffer(metadata, snapshot: Int) => numReplicated = snapshot println(s"view received snapshot offer ${snapshot} (metadata = ${metadata})") - case Persistent(payload, _) => + case payload if isPersistent => numReplicated += 1 - println(s"view received ${payload} (num replicated = ${numReplicated})") + println(s"view received persistent ${payload} (num replicated = ${numReplicated})") + case payload => + println(s"view received not persitent ${payload}") } }