+per #15424 Added PersistentView, deprecated View

A PersistentView works the same way as View did previously, except:

* it requires an `peristenceId` (no default is provided)
* messages given to `receive` are NOT wrapped in Persistent()

akka-streams not touched, will update them afterwards on different branch

Also solves #15436 by making persistentId in PersistentView abstract.

(cherry picked from commit dcafaf788236fe6d018388dd55d5bf9650ded696)

Conflicts:
	akka-docs/rst/java/lambda-persistence.rst
	akka-docs/rst/java/persistence.rst
	akka-docs/rst/scala/persistence.rst
	akka-persistence/src/main/scala/akka/persistence/Persistent.scala
	akka-persistence/src/main/scala/akka/persistence/View.scala
This commit is contained in:
Konrad 'ktoso' Malawski 2014-06-24 16:57:33 +02:00 committed by Patrik Nordwall
parent 2203968adb
commit 3fd240384c
22 changed files with 847 additions and 192 deletions

View file

@ -1188,6 +1188,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
import ShardCoordinator.Internal._
import ShardRegion.ShardId
override def persistenceId = self.path.toStringWithoutAddress
var persistentState = State.empty
var rebalanceInProgress = Set.empty[ShardId]

View file

@ -485,18 +485,21 @@ public class PersistenceDocTest {
static Object o11 = new Object() {
//#view
class MyView extends UntypedView {
@Override
public String persistenceId() {
return "some-persistence-id";
}
class MyView extends UntypedPersistentView {
@Override public String viewId() { return "some-persistence-id-view"; }
@Override public String persistenceId() { return "some-persistence-id"; }
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
// ...
if (isPersistent()) {
// handle message from Journal...
} else if (message instanceof String) {
// handle message from user...
} else {
unhandled(message);
}
}
}
//#view

View file

@ -26,6 +26,17 @@ Akka persistence is inspired by the `eventsourced`_ library. It follows the same
.. _eventsourced: https://github.com/eligosource/eventsourced
Changes in Akka 2.3.4
=====================
In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified.
In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel``
and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``.
See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`.
The old classes are still included, and deprecated, for a while to make the transition smooth.
In case you need the old documentation it is located `here <http://doc.akka.io/docs/akka/2.3.3/java/lambda-persistence.html#persistence-lambda-java>`_.
Dependencies
============
@ -45,7 +56,7 @@ Architecture
When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can
recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
* *AbstractPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream.
@ -260,12 +271,12 @@ to Akka persistence will allow to replay messages that have been marked as delet
purposes, for example. To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors should call the ``deleteMessages`` method.
.. _views-java-lambda:
.. _persistent-views-java-lambda:
Views
=====
Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method
Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method
and setting the “initial behavior” in the constructor by calling the :meth:`receive` method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view
@ -275,14 +286,18 @@ the referenced persistent actor is actually running. Views read messages from a
persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by
default.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
The default update interval of all views of an actor system is configurable:
The default update interval of all persistent views of an actor system is configurable:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
``AbstractPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
@ -293,7 +308,7 @@ incremental message replay, triggered by that update request, completed. If set
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all views of an actor system can be turned off by configuration:
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update
@ -305,20 +320,20 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa
Recovery
--------
Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-java-lambda`.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`.
.. _persistence-identifiers-java-lambda:
Identifiers
-----------
A view must have an identifier that doesn't change across different actor incarnations. It defaults to the
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the
``String`` representation of the actor path without the address part and can be obtained via the ``viewId``
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that view's
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
@ -338,7 +353,7 @@ Snapshots
=========
Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots
in context of persistent actors but this is also applicable to views.
in context of persistent actors but this is also applicable to persistent views.
Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot
succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message

View file

@ -31,6 +31,17 @@ concepts and architecture of `eventsourced`_ but significantly differs on API an
.. _eventsourced: https://github.com/eligosource/eventsourced
Changes in Akka 2.3.4
=====================
In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified.
In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel``
and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``.
See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`.
The old classes are still included, and deprecated, for a while to make the transition smooth.
In case you need the old documentation it is located `here <http://doc.akka.io/docs/akka/2.3.3/java/persistence.html>`_.
Dependencies
============
@ -50,7 +61,7 @@ Architecture
When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can
recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
* *UntypedPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream.
@ -265,13 +276,13 @@ to Akka persistence will allow to replay messages that have been marked as delet
purposes, for example. To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors should call the ``deleteMessages`` method.
.. _views-java:
.. _persistent-views-java:
Views
=====
Persistent Views
================
Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``persistenceId``
methods.
Persistent views can be implemented by extending the ``UntypedPersistentView`` trait and implementing the ``onReceive``
and the ``persistenceId`` methods.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#view
@ -280,14 +291,18 @@ the referenced persistent actor is actually running. Views read messages from a
persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by
default.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
The default update interval of all views of an actor system is configurable:
The default update interval of all persistent views of an actor system is configurable:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
``UntypedPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
@ -298,7 +313,7 @@ incremental message replay, triggered by that update request, completed. If set
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all views of an actor system can be turned off by configuration:
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update
@ -310,7 +325,7 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa
Recovery
--------
Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`.
@ -319,11 +334,11 @@ Further possibilities to customize initial recovery are explained in section :re
Identifiers
-----------
A view must have an identifier that doesn't change across different actor incarnations. It defaults to the
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the
``String`` representation of the actor path without the address part and can be obtained via the ``viewId``
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that view's
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
@ -343,7 +358,7 @@ Snapshots
=========
Snapshots can dramatically reduce recovery times of persistent actor and views. The following discusses snapshots
in context of persistent actor but this is also applicable to views.
in context of persistent actor but this is also applicable to persistent views.
Persistent actor can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot
succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message

View file

@ -137,7 +137,7 @@ Views
**Akka Persistence:** ``View``
- Receives the message stream written by a ``PersistentActor`` by reading it directly from the
journal (see :ref:`views`). Alternative to using channels. Useful in situations where actors shall receive a
journal (see :ref:`persistent-views`). Alternative to using channels. Useful in situations where actors shall receive a
persistent message stream in correct order without duplicates.
- Supports :ref:`snapshots`.

View file

@ -15,22 +15,39 @@ Migrating to ``2.4.x`` is as simple as changing all your classes to extending `
Replace all classes like::
class DeprecatedProcessor extends EventsourcedProcessor { /*...*/ }
class DeprecatedProcessor extends EventsourcedProcessor {
def processorId = "id"
/*...*/
}
To extend ``PersistentActor``::
class NewPersistentProcessor extends PersistentActor { /*...*/ }
class NewPersistentProcessor extends PersistentActor {
def persistenceId = "id"
/*...*/
}
Renamed processorId to persistenceId
====================================
Changed processorId to (abstract) persistenceId
===============================================
In Akka Persistence ``2.3.3`` and previously, the main building block of applications were Processors.
Persistent messages, as well as processors implemented the ``processorId`` method to identify which persistent entity a message belonged to.
This concept remains the same in Akka ``2.3.4``, yet we rename ``processorId`` to ``persistenceId`` because Processors will be removed,
and persistent messages can be used from different classes not only ``PersistentActor`` (Views, directly from Journals etc).
We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of ``processorId`` to ``persistenceId``.
Please note that ``processorId`` is **abstract** in the new API classes (``PersistentActor`` and ``PersistentView``),
and we do **not** provide a default (actor-path derrived) value for it like we did for ``processorId``.
The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical "which persistent entity this actor represents".
A longer discussion on this subject can be found on `issue #15436 <https://github.com/akka/akka/issues/15436>`_ on github.
In case you want to perserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily
implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows::
override def persistenceId = self.path.toStringWithoutAddress
We provided the renamed method also on already deprecated classes (Channels),
so you can simply apply a global rename of ``processorId`` to ``persistenceId``.
Plugin APIs: Renamed PersistentId to PersistenceId
==================================================
@ -76,6 +93,8 @@ the throughput
Now deprecated code using Processor::
class OldProcessor extends Processor {
override def processorId = "user-wallet-1337"
def receive = {
case Persistent(cmd) => sender() ! cmd
}
@ -84,6 +103,8 @@ Now deprecated code using Processor::
Replacement code, with the same semantics, using PersistentActor::
class NewProcessor extends PersistentActor {
override def persistenceId = "user-wallet-1337"
def receiveCommand = {
case cmd =>
persistAsync(cmd) { e => sender() ! e }
@ -100,3 +121,43 @@ any of the problems Futures have when closing over the sender reference.
Using the``PersistentActor`` instead of ``Processor`` also shifts the responsibility of deciding if a message should be persisted
to the receiver instead of the sender of the message. Previously, using ``Processor``, clients would have to wrap messages as ``Persistent(cmd)``
manually, as well as have to be aware of the receiver being a ``Processor``, which didn't play well with transparency of the ActorRefs in general.
Renamed View to PersistentView, which receives plain messages (Persistent() wrapper is gone)
============================================================================================
Views used to receive messages wrapped as ``Persistent(payload, seqNr)``, this is no longer the case and views receive
the ``payload`` as message from the ``Journal`` directly. The rationale here is that the wrapper aproach was inconsistent
with the other Akka Persistence APIs and also is not easily "discoverable" (you have to *know* you will be getting this Persistent wrapper).
Instead, since ``2.3.4``, views get plain messages, and can use additional methods provided by the ``View`` to identify if a message
was sent from the Journal (had been played back to the view). So if you had code like this::
class AverageView extends View {
override def processorId = "average-view"
def receive = {
case Persistent(msg, seqNr) =>
// from Journal
case msg =>
// from user-land
}
You should update it to extend ``PersistentView`` instead::
class AverageView extends PersistentView {
override def persistenceId = "persistence-sample"
override def viewId = "persistence-sample-average"
def receive = {
case msg if isPersistent =>
// from Journal
val seqNr = lastSequenceNr // in case you require the sequence number
case msg =>
// from user-land
}
}
In case you need to obtain the current sequence number the view is looking at, you can use the ``lastSequenceNr`` method.
It is equivalent to "current sequence number", when ``isPersistent`` returns true, otherwise it yields the sequence number
of the last persistent message that this view was updated with.

View file

@ -4,12 +4,12 @@
package docs.persistence
import akka.actor.{ Actor, ActorSystem, Props }
import akka.persistence._
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ Props, Actor, ActorSystem }
import akka.persistence._
trait PersistenceDocSpec {
val config =
"""
@ -29,7 +29,7 @@ trait PersistenceDocSpec {
new AnyRef {
//#definition
import akka.persistence.{ Persistent, PersistenceFailure, Processor }
import akka.persistence.{ PersistenceFailure, Persistent, Processor }
class MyProcessor extends Processor {
def receive = {
@ -208,7 +208,7 @@ trait PersistenceDocSpec {
new AnyRef {
//#fsm-example
import akka.actor.FSM
import akka.persistence.{ Processor, Persistent }
import akka.persistence.{ Persistent, Processor }
class PersistentDoor extends Processor with FSM[String, Int] {
startWith("closed", 0)
@ -332,7 +332,6 @@ trait PersistenceDocSpec {
}
new AnyRef {
import akka.actor.ActorRef
val processor = system.actorOf(Props[MyPersistentActor]())
@ -367,7 +366,6 @@ trait PersistenceDocSpec {
//#persist-async
}
new AnyRef {
import akka.actor.ActorRef
val processor = system.actorOf(Props[MyPersistentActor]())
@ -409,11 +407,15 @@ trait PersistenceDocSpec {
import akka.actor.Props
//#view
class MyView extends View {
class MyView extends PersistentView {
override def persistenceId: String = "some-persistence-id"
override def viewId: String = "some-persistence-id-view"
def receive: Actor.Receive = {
case Persistent(payload, sequenceNr) => // ...
case payload if isPersistent =>
// handle message from journal...
case payload =>
// handle message from user-land...
}
}
//#view

View file

@ -26,6 +26,18 @@ concepts and architecture of `eventsourced`_ but significantly differs on API an
.. _eventsourced: https://github.com/eligosource/eventsourced
Changes in Akka 2.3.4
=====================
In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified.
In essence; ``Processor`` and ``EventsourcedProcessor`` are replaced by ``PersistentActor``. ``Channel``
and ``PersistentChannel`` are replaced by ``AtLeastOnceDelivery``. ``View`` is replaced by ``PersistentView``.
See full details of the changes in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`.
The old classes are still included, and deprecated, for a while to make the transition smooth.
In case you need the old documentation it is located `here <http://doc.akka.io/docs/akka/2.3.3/scala/persistence.html>`_.
Dependencies
============
@ -41,7 +53,7 @@ Architecture
When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can
recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
* *PersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream.
@ -263,12 +275,12 @@ persistent actors should call the ``deleteMessages`` method.
.. _views:
.. _persistent-views:
Views
=====
Persistent Views
================
Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``persistenceId``
Persistent views can be implemented by extending the ``PersistentView`` trait and implementing the ``receive`` and the ``persistenceId``
methods.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view
@ -278,6 +290,10 @@ the referenced persistent actor is actually running. Views read messages from a
persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by
default.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
@ -285,7 +301,7 @@ The default update interval of all views of an actor system is configurable:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
``PersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
@ -296,7 +312,7 @@ incremental message replay, triggered by that update request, completed. If set
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all views of an actor system can be turned off by configuration:
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update
@ -308,7 +324,7 @@ of replayed messages for manual updates can be limited with the ``replayMax`` pa
Recovery
--------
Initial recovery of views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery`.
@ -317,11 +333,11 @@ Further possibilities to customize initial recovery are explained in section :re
Identifiers
-----------
A view must have an identifier that doesn't change across different actor incarnations. It defaults to the
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the
``String`` representation of the actor path without the address part and can be obtained via the ``viewId``
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that view's
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
@ -372,7 +388,7 @@ Snapshots
=========
Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots
in context of persistent actors but this is also applicable to views.
in context of persistent actors but this is also applicable to persistent views.
Persistent actors can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot
succeeds, the persistent actor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message

View file

@ -6,11 +6,11 @@ package akka.persistence
import java.lang.{ Iterable JIterable }
import scala.collection.immutable
import akka.actor.AbstractActor
import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._
import akka.actor.{ ActorRef, AbstractActor }
import scala.collection.immutable
/**
* INTERNAL API.
@ -550,7 +550,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@throws(classOf[Exception])
@ -563,7 +563,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")

View file

@ -85,6 +85,7 @@ object Persistent {
/**
* [[Persistent]] extractor.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def unapply(persistent: Persistent): Option[(Any, Long)] =
Some((persistent.payload, persistent.sequenceNr))
}
@ -113,6 +114,7 @@ object ConfirmablePersistent {
/**
* [[ConfirmablePersistent]] extractor.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def unapply(persistent: ConfirmablePersistent): Option[(Any, Long, Int)] =
Some((persistent.payload, persistent.sequenceNr, persistent.redeliveries))
}
@ -328,6 +330,7 @@ private[persistence] final case class PersistentImpl(
/**
* INTERNAL API.
*/
@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] final case class ConfirmablePersistentImpl(
payload: Any,
sequenceNr: Long,
@ -357,6 +360,7 @@ private[persistence] final case class ConfirmablePersistentImpl(
/**
* INTERNAL API.
*/
@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] object ConfirmablePersistentImpl {
def apply(persistent: PersistentRepr, confirmMessage: Delivered, confirmTarget: ActorRef = null): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.persistenceId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)

View file

@ -0,0 +1,238 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol._
/**
* Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with
* all messages from the corresponding persistent id's journal that have not yet been consumed by the view.
* To update a view with messages that have been written after handling this request, another `Update`
* request must be sent to the view.
*
* @param await if `true`, processing of further messages sent to the view will be delayed until the
* incremental message replay, triggered by this update request, completes. If `false`,
* any message sent to the view may interleave with replayed [[Persistent]] message
* stream.
* @param replayMax maximum number of messages to replay when handling this update request. Defaults
* to `Long.MaxValue` (i.e. no limit).
*/
@SerialVersionUID(1L)
final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue)
object Update {
/**
* Java API.
*/
def create() =
Update()
/**
* Java API.
*/
def create(await: Boolean) =
Update(await)
/**
* Java API.
*/
def create(await: Boolean, replayMax: Long) =
Update(await, replayMax)
}
/**
* A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive
* the message stream directly from the Journal. These messages can be processed to update internal state
* in order to maintain an (eventual consistent) view of the state of the corresponding processor. A
* persistent view can also run on a different node, provided that a replicated journal is used.
*
* Implementation classes refer to a persistent actors' message stream by implementing `persistenceId`
* with the corresponding (shared) identifier value.
*
* Views can also store snapshots of internal state by calling [[autoUpdate]]. The snapshots of a view
* are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
* than the snapshot. Default is to offer the latest saved snapshot.
*
* By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`.
* This method can be overridden by implementation classes to define a view instance-specific update
* interval. The default update interval for all views of an actor system can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
* view updates by sending the view [[Update]] requests. See also methods
*
* - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Recovery {
import context.dispatcher
/**
* INTERNAL API.
*
* Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests
* sent by users.
*/
private[persistence] override def replayStarted(await: Boolean) = new State {
private var delegateAwaiting = await
private var delegate = PersistentView.super.replayStarted(await)
override def toString: String = delegate.toString
override def aroundReceive(receive: Receive, message: Any) = message match {
case Update(false, _) // ignore
case u @ Update(true, _) if !delegateAwaiting
delegateAwaiting = true
delegate = PersistentView.super.replayStarted(await = true)
delegate.aroundReceive(receive, u)
case other
delegate.aroundReceive(receive, other)
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover // ignore
case Update(awaitUpdate, replayMax)
_currentState = replayStarted(await = awaitUpdate)
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case other process(receive, other)
}
}
/**
* INTERNAL API.
*/
private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit =
onReplayComplete(await)
/**
* INTERNAL API.
*/
private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit =
onReplayComplete(await)
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
*/
private def onReplayComplete(await: Boolean): Unit = {
_currentState = idle
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) receiverStash.unstashAll()
}
/**
* INTERNAL API
* WARNING: This implementation UNWRAPS PERSISTENT() before delivering to the receive block.
*/
override private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit =
super.withCurrentPersistent(persistent) { p
receive.applyOrElse(p.payload, unhandled)
}
private val viewSettings = extension.settings.view
private var schedule: Option[Cancellable] = None
/**
* View id is used as identifier for snapshots performed by this [[PersistentView]].
* This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream.
*
*
* The usual case is to have a *different* id set as `viewId` than `persistenceId`,
* although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots
* based on some average or sum, calculated by this view.
*
* Example:
* {{{
* class SummingView extends PersistentView {
* override def persistenceId = "count-123"
* override def viewId = "count-123-sum" // this view is performing summing,
* // so this view's snapshots reside under the "-sum" suffixed id
*
* // ...
* }
* }}}
*/
def viewId: String
/**
* Returns `viewId`.
*/
def snapshotterId: String = viewId
/**
* If `true`, the currently processed message was persisted (is sent from the Journal).
* If `false`, the currently processed message comes from another actor (from "user-land").
*/
def isPersistent: Boolean =
currentPersistentMessage.isDefined
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
* If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
* value can be configured with the `akka.persistence.view.auto-update` configuration key. This method
* can be overridden by implementation classes to return non-default values.
*/
def autoUpdate: Boolean =
viewSettings.autoUpdate
/**
* The interval for automated updates. The default value can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. This method can be
* overridden by implementation classes to return non-default values.
*/
def autoUpdateInterval: FiniteDuration =
viewSettings.autoUpdateInterval
/**
* The maximum number of messages to replay per update. The default value can be configured with the
* `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by
* implementation classes to return non-default values.
*/
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
*/
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = autoUpdateReplayMax)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try receiverStash.unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = {
schedule.foreach(_.cancel())
super.postStop()
}
}
/**
* Java API.
*
* @see [[PersistentView]]
*/
abstract class UntypedPersistentView extends UntypedActor with PersistentView
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
*
* @see [[PersistentView]]
*/
abstract class AbstractPersistentView extends AbstractActor with PersistentView

View file

@ -9,6 +9,8 @@ import akka.dispatch.Envelope
import akka.persistence.JournalProtocol._
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import scala.util.control.NonFatal
/**
* Recovery state machine that loads snapshots and replays messages.
*
@ -30,19 +32,6 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
protected def processPersistent(receive: Receive, persistent: Persistent) =
withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled))
protected def updateLastSequenceNr(persistent: Persistent): Unit =
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
def updateLastSequenceNr(value: Long): Unit =
_lastSequenceNr = value
/** INTERNAL API */
private[akka] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
body(persistent)
} finally _currentPersistent = null
protected def recordFailure(cause: Throwable): Unit = {
_recoveryFailureCause = cause
_recoveryFailureMessage = context.asInstanceOf[ActorCell].currentMessage
@ -108,11 +97,12 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
def aroundReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case ReplayedMessage(p) try { processPersistent(receive, p) } catch {
case t: Throwable
_currentState = replayFailed // delay throwing exception to prepareRestart
recordFailure(t)
}
case ReplayedMessage(p)
try processPersistent(receive, p) catch {
case NonFatal(t)
_currentState = replayFailed // delay throwing exception to prepareRestart
recordFailure(t)
}
case ReplayMessagesSuccess onReplaySuccess(receive, await)
case ReplayMessagesFailure(cause) onReplayFailure(receive, await, cause)
case other
@ -174,7 +164,25 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
@deprecated("Override `persistenceId` instead. Processor will be removed.", since = "2.3.4")
def processorId: String = extension.persistenceId(self) // TODO: remove processorId
def persistenceId: String = processorId
/**
* Id of the persistent entity for which messages should be replayed.
*/
def persistenceId: String
/** INTERNAL API */
private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
body(persistent)
} finally _currentPersistent = null
/** INTERNAL API. */
private[persistence] def updateLastSequenceNr(persistent: Persistent): Unit =
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
/** INTERNAL API. */
private[persistence] def updateLastSequenceNr(value: Long): Unit =
_lastSequenceNr = value
/**
* Returns the current persistent message if there is any.

View file

@ -9,42 +9,6 @@ import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol._
/**
* Instructs a [[View]] to update itself. This will run a single incremental message replay with all
* messages from the corresponding processor's journal that have not yet been consumed by the view.
* To update a view with messages that have been written after handling this request, another `Update`
* request must be sent to the view.
*
* @param await if `true`, processing of further messages sent to the view will be delayed until the
* incremental message replay, triggered by this update request, completes. If `false`,
* any message sent to the view may interleave with replayed [[Persistent]] message
* stream.
* @param replayMax maximum number of messages to replay when handling this update request. Defaults
* to `Long.MaxValue` (i.e. no limit).
*/
@SerialVersionUID(1L)
final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue)
case object Update {
/**
* Java API.
*/
def create() =
Update()
/**
* Java API.
*/
def create(await: Boolean) =
Update(await)
/**
* Java API.
*/
def create(await: Boolean, replayMax: Long) =
Update(await, replayMax)
}
/**
* A view replicates the persistent message stream of a processor. Implementation classes receive the
* message stream as [[Persistent]] messages. These messages can be processed to update internal state
@ -68,6 +32,7 @@ case object Update {
*
* Views can also use channels to communicate with destinations in the same way as processors can do.
*/
@deprecated("Use `akka.persistence.PersistentView` instead.", since = "2.3.4")
trait View extends Actor with Recovery {
import context.dispatcher
@ -147,6 +112,11 @@ trait View extends Actor with Recovery {
*/
def snapshotterId: String = viewId
/**
* Persistence id. Defaults to this persistent-actors's path and can be overridden.
*/
override def persistenceId: String = processorId
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
* If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
@ -196,6 +166,7 @@ trait View extends Actor with Recovery {
*
* @see [[View]]
*/
@deprecated("Use `akka.persistence.UntypedPersistentView instead.", since = "2.3.4")
abstract class UntypedView extends UntypedActor with View
/**
@ -203,4 +174,5 @@ abstract class UntypedView extends UntypedActor with View
*
* @see [[View]]
*/
@deprecated("Use `akka.persistence.AbstractPersistentView` instead.", since = "2.3.4")
abstract class AbstractView extends AbstractActor with View

View file

@ -76,10 +76,15 @@ trait Cleanup { this: AkkaSpec ⇒
}
}
@deprecated("Use NamedPersistentActor instead.", since = "2.3.4")
abstract class NamedProcessor(name: String) extends Processor {
override def persistenceId: String = name
}
abstract class NamedPersistentActor(name: String) extends PersistentActor {
override def persistenceId: String = name
}
trait TurnOffRecoverOnStart { this: Processor
override def preStart(): Unit = ()
}

View file

@ -0,0 +1,319 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config
import scala.concurrent.duration._
object PersistentViewSpec {
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg
persist(msg) { m probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {
case _ // do nothing...
}
}
private class TestPersistentView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends PersistentView {
def this(name: String, probe: ActorRef, interval: FiniteDuration) =
this(name, probe, interval, None)
def this(name: String, probe: ActorRef) =
this(name, probe, 100.milliseconds)
override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
override val persistenceId: String = name
override val viewId: String = name + "-view"
var last: String = _
def receive = {
case "get"
probe ! last
case "boom"
throw new TestException("boom")
case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom")
case payload if isPersistent
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) a || (m == f) }
}
private class PassiveTestPersistentView(name: String, probe: ActorRef, var failAt: Option[String]) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdate: Boolean = false
override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery
var last: String = _
def receive = {
case "get"
probe ! last
case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom")
case payload
last = s"replicated-${payload}-${lastSequenceNr}"
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) a || (m == f) }
}
private class ActiveTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
def receive = {
case payload
probe ! s"replicated-${payload}-${lastSequenceNr}"
}
}
private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
def receive = {
case payload if isPersistent probe ! s"replicated-${payload}-${lastSequenceNr}"
case payload probe ! s"normal-${payload}-${lastSequenceNr}"
}
}
private class SnapshottingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = s"${name}-replicator"
override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system)
var last: String = _
def receive = {
case "get"
probe ! last
case "snap"
saveSnapshot(last)
case "restart"
throw new TestException("restart requested")
case SaveSnapshotSuccess(_)
probe ! "snapped"
case SnapshotOffer(metadata, snapshot: String)
last = snapshot
probe ! last
case payload
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
}
}
abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import akka.persistence.PersistentViewSpec._
var persistentActor: ActorRef = _
var view: ActorRef = _
var persistentActorProbe: TestProbe = _
var viewProbe: TestProbe = _
override protected def beforeEach(): Unit = {
super.beforeEach()
persistentActorProbe = TestProbe()
viewProbe = TestProbe()
persistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, persistentActorProbe.ref))
persistentActor ! "a"
persistentActor ! "b"
persistentActorProbe.expectMsg("a-1")
persistentActorProbe.expectMsg("b-2")
}
override protected def afterEach(): Unit = {
system.stop(persistentActor)
system.stop(view)
super.afterEach()
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Delivered])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def subscribeToReplay(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])
"A persistent view" must {
"receive past updates from a processor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"receive live updates from a processor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
viewProbe.expectMsg("replicated-c-3")
}
"run updates at specified interval" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 2.seconds))
// initial update is done on start
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
// live updates takes 5 seconds to replicate
persistentActor ! "c"
viewProbe.expectNoMsg(1.second)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = false)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request and await update" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = true)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
}
"run updates again on failure outside an update cycle" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "boom"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"run updates again on failure during an update cycle" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds, Some("b")))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"run size-limited updates on user request" in {
persistentActor ! "c"
persistentActor ! "d"
persistentActor ! "e"
persistentActor ! "f"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
persistentActorProbe.expectMsg("e-5")
persistentActorProbe.expectMsg("f-6")
view = system.actorOf(Props(classOf[PassiveTestPersistentView], name, viewProbe.ref, None))
view ! Update(await = true, replayMax = 2)
view ! "get"
viewProbe.expectMsg("replicated-b-2")
view ! Update(await = true, replayMax = 1)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
view ! Update(await = true, replayMax = 4)
view ! "get"
viewProbe.expectMsg("replicated-f-6")
}
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
persistentActor ! "c"
persistentActor ! "d"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
subscribeToReplay(replayProbe)
view = system.actorOf(Props(classOf[ActiveTestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) }
}
"check if an incoming message is persistent" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[PersistentOrNotTestPersistentView], name, viewProbe.ref))
view ! "d"
view ! "e"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("normal-d-3")
viewProbe.expectMsg("normal-e-3")
persistentActor ! "f"
viewProbe.expectMsg("replicated-f-4")
}
"take snapshots" in {
view = system.actorOf(Props(classOf[SnapshottingPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "snap"
viewProbe.expectMsg("snapped")
view ! "restart"
persistentActor ! "c"
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
}
}
class LeveldbPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentViewSpec"))
class InmemPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("inmem", "InmemPersistentViewSpec"))

View file

@ -3,23 +3,26 @@
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config
import akka.actor._
import akka.testkit._
import akka.persistence.JournalProtocol.ReplayMessages
import scala.concurrent.duration._
object ViewSpec {
class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, sequenceNr)
probe ! s"${payload}-${sequenceNr}"
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg
persist(msg) { m probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {
case _
}
}
class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View {
private class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View {
def this(name: String, probe: ActorRef, interval: FiniteDuration) =
this(name, probe, interval, None)
@ -27,7 +30,7 @@ object ViewSpec {
this(name, probe, 100.milliseconds)
override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
override val persistenceId: String = name
override val processorId: String = name
var last: String = _
@ -49,7 +52,7 @@ object ViewSpec {
}
}
class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View {
private class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View {
override val persistenceId: String = name
override def autoUpdate: Boolean = false
@ -73,7 +76,7 @@ object ViewSpec {
}
class ActiveTestView(name: String, probe: ActorRef) extends View {
private class ActiveTestView(name: String, probe: ActorRef) extends View {
override val persistenceId: String = name
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
@ -84,7 +87,7 @@ object ViewSpec {
}
}
class TestDestination(probe: ActorRef) extends Actor {
private class TestDestination(probe: ActorRef) extends Actor {
def receive = {
case cp @ ConfirmablePersistent(payload, sequenceNr, _)
cp.confirm()
@ -92,7 +95,7 @@ object ViewSpec {
}
}
class EmittingView(name: String, destination: ActorRef) extends View {
private class EmittingView(name: String, destination: ActorRef) extends View {
override val persistenceId: String = name
override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system)
@ -106,7 +109,7 @@ object ViewSpec {
}
}
class SnapshottingView(name: String, probe: ActorRef) extends View {
private class SnapshottingView(name: String, probe: ActorRef) extends View {
override val persistenceId: String = name
override val viewId: String = s"${name}-replicator"
@ -134,9 +137,9 @@ object ViewSpec {
}
abstract class ViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ViewSpec._
import akka.persistence.ViewSpec._
var processor: ActorRef = _
var persistor: ActorRef = _
var view: ActorRef = _
var processorProbe: TestProbe = _
@ -148,16 +151,16 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
processorProbe = TestProbe()
viewProbe = TestProbe()
processor = system.actorOf(Props(classOf[TestProcessor], name, processorProbe.ref))
processor ! Persistent("a")
processor ! Persistent("b")
persistor = system.actorOf(Props(classOf[TestPersistentActor], name, processorProbe.ref))
persistor ! "a"
persistor ! "b"
processorProbe.expectMsg("a-1")
processorProbe.expectMsg("b-2")
}
override protected def afterEach(): Unit = {
system.stop(processor)
system.stop(persistor)
system.stop(view)
super.afterEach()
}
@ -181,7 +184,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
processor ! Persistent("c")
persistor ! "c"
viewProbe.expectMsg("replicated-c-3")
}
"run updates at specified interval" in {
@ -190,7 +193,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
// live updates takes 5 seconds to replicate
processor ! Persistent("c")
persistor ! "c"
viewProbe.expectNoMsg(1.second)
viewProbe.expectMsg("replicated-c-3")
}
@ -198,7 +201,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
processor ! Persistent("c")
persistor ! "c"
processorProbe.expectMsg("c-3")
view ! Update(await = false)
viewProbe.expectMsg("replicated-c-3")
@ -207,7 +210,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
processor ! Persistent("c")
persistor ! "c"
processorProbe.expectMsg("c-3")
view ! Update(await = true)
view ! "get"
@ -222,7 +225,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
viewProbe.expectMsg("replicated-b-2")
}
"run updates again on failure during an update cycle" in {
processor ! Persistent("c")
persistor ! "c"
processorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds, Some("b")))
viewProbe.expectMsg("replicated-a-1")
@ -231,10 +234,10 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
viewProbe.expectMsg("replicated-c-3")
}
"run size-limited updates on user request" in {
processor ! Persistent("c")
processor ! Persistent("d")
processor ! Persistent("e")
processor ! Persistent("f")
persistor ! "c"
persistor ! "d"
persistor ! "e"
persistor ! "f"
processorProbe.expectMsg("c-3")
processorProbe.expectMsg("d-4")
@ -258,8 +261,8 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
processor ! Persistent("c")
processor ! Persistent("d")
persistor ! "c"
persistor ! "d"
processorProbe.expectMsg("c-3")
processorProbe.expectMsg("d-4")
@ -294,7 +297,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
awaitConfirmation(confirmProbe)
view ! "restart"
processor ! Persistent("c")
persistor ! "c"
destinationProbe.expectMsg("emitted-c-3")
awaitConfirmation(confirmProbe)
@ -306,7 +309,7 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
view ! "snap"
viewProbe.expectMsg("snapped")
view ! "restart"
processor ! Persistent("c")
persistor ! "c"
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}

View file

@ -496,15 +496,13 @@ public class LambdaPersistenceDocTest {
static Object o11 = new Object() {
//#view
class MyView extends AbstractView {
@Override
public String persistenceId() {
return "some-persistence-id";
}
class MyView extends AbstractPersistentView {
@Override public String persistenceId() { return "some-persistence-id"; }
@Override public String viewId() { return "some-persistence-id-view"; }
public MyView() {
receive(ReceiveBuilder.
match(Persistent.class, persistent -> {
match(Object.class, p -> isPersistent(), persistent -> {
// ...
}).build()
);

View file

@ -45,26 +45,21 @@ public class ViewExample {
}
}
public static class ExampleView extends AbstractView {
public static class ExampleView extends AbstractPersistentView {
private int numReplicated = 0;
@Override
public String viewId() {
@Override public String persistenceId() { return "persistentActor-5"; }
@Override public String viewId() {
return "view-5";
}
@Override
public String persistenceId() {
return "persistentActor-5";
}
public ExampleView() {
receive(ReceiveBuilder.
match(Persistent.class, p -> {
match(Object.class, m -> isPersistent(), msg -> {
numReplicated += 1;
System.out.println(String.format("view received %s (num replicated = %d)",
p.payload(),
msg,
numReplicated));
}).
match(SnapshotOffer.class, so -> {

View file

@ -81,7 +81,7 @@ snapshotting to reduce recovery time.
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ViewExample</code></b>.
<b><code>sample.persistence.PersistentViewExample</code></b>.
</p>
</div>

View file

@ -1,13 +1,17 @@
package sample.persistence;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedPersistentActor;
import akka.persistence.UntypedPersistentView;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import akka.persistence.*;
import akka.japi.Procedure;
import java.util.concurrent.TimeUnit;
public class ViewExample {
public class PersistentViewExample {
public static class ExamplePersistentActor extends UntypedPersistentActor {
private int count = 1;
@ -41,32 +45,26 @@ public class ViewExample {
}
}
public static class ExampleView extends UntypedView {
public static class ExampleView extends UntypedPersistentView {
private int numReplicated = 0;
@Override
public String viewId() {
return "view-5";
}
@Override
public String persistenceId() {
return "persistentActor-5";
}
@Override public String persistenceId() { return "persistentActor-5"; }
@Override public String viewId() { return "view-5"; }
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
Persistent p = (Persistent)message;
if (isPersistent()) {
numReplicated += 1;
System.out.println(String.format("view received %s (num replicated = %d)", p.payload(), numReplicated));
System.out.println(String.format("view received %s (num replicated = %d)", message, numReplicated));
} else if (message instanceof SnapshotOffer) {
SnapshotOffer so = (SnapshotOffer)message;
numReplicated = (Integer)so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)", numReplicated, so.metadata()));
} else if (message.equals("snap")) {
saveSnapshot(numReplicated);
} else {
unhandled(message);
}
}
}

View file

@ -106,7 +106,7 @@ snapshotting to reduce recovery time.
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ViewExample</code></b>.
<b><code>sample.persistence.PersistentViewExample</code></b>.
</p>
</div>

View file

@ -24,11 +24,10 @@ object ViewExample extends App {
}
}
class ExampleView extends View {
class ExampleView extends PersistentView {
private var numReplicated = 0
override def persistenceId: String = "persistentActor-5"
override def viewId = "view-5"
def receive = {
@ -37,9 +36,11 @@ object ViewExample extends App {
case SnapshotOffer(metadata, snapshot: Int) =>
numReplicated = snapshot
println(s"view received snapshot offer ${snapshot} (metadata = ${metadata})")
case Persistent(payload, _) =>
case payload if isPersistent =>
numReplicated += 1
println(s"view received ${payload} (num replicated = ${numReplicated})")
println(s"view received persistent ${payload} (num replicated = ${numReplicated})")
case payload =>
println(s"view received not persitent ${payload}")
}
}