=21423 remove deprecated PersistentView

This commit is contained in:
Konrad Malawski 2016-12-02 14:43:25 +01:00 committed by Konrad `ktoso` Malawski
parent d48ea5cacc
commit 2ab8ab2840
17 changed files with 697 additions and 1321 deletions

View file

@ -610,7 +610,7 @@ Router Example with Pool of Remote Deployed Routees
Let's take a look at how to use a cluster aware router on single master node that creates
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-java`
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
in the cluster-tools module. The ``ClusterSingletonManager`` is started on each node.
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#create-singleton-manager

View file

@ -527,31 +527,6 @@ public class LambdaPersistenceDocTest {
}
};
static Object o12 = new Object() {
//#view
class MyView extends AbstractPersistentView {
@Override public String persistenceId() { return "some-persistence-id"; }
@Override public String viewId() { return "some-persistence-id-view"; }
public MyView() {
receive(ReceiveBuilder.
match(Object.class, p -> isPersistent(), persistent -> {
// ...
}).build()
);
}
}
//#view
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#view-update
final ActorRef view = system.actorOf(Props.create(MyView.class));
view.tell(Update.create(true), null);
//#view-update
}
};
static Object o14 = new Object() {
//#safe-shutdown
final class Shutdown {

View file

@ -512,37 +512,6 @@ public class PersistenceDocTest {
}
};
static Object o14 = new Object() {
//#view
class MyView extends UntypedPersistentView {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override
public String viewId() { return "my-stable-persistence-view-id"; }
@Override
public void onReceive(Object message) throws Exception {
if (isPersistent()) {
// handle message from Journal...
} else if (message instanceof String) {
// handle message from user...
} else {
unhandled(message);
}
}
}
//#view
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#view-update
final ActorRef view = system.actorOf(Props.create(MyView.class));
view.tell(Update.create(true), null);
//#view-update
}
};
static Object o13 = new Object() {
//#safe-shutdown
final class Shutdown {}

View file

@ -54,10 +54,6 @@ Architecture
When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can
recover internal state from these messages.
* *AbstractPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream.
* *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in
case of sender and receiver JVM crashes.
@ -457,90 +453,6 @@ mechanism when ``persist()`` is used. Notice the early stop behaviour that occur
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#safe-shutdown-example-bad
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#safe-shutdown-example-good
.. _persistent-views-java-lambda:
Persistent Views
================
.. warning::
``AbstractPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``UntypedPersistentView`` actor:
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
being propagated all the way
* `ActorSubscriber`_ in case you need more fine grained control
The consuming actor may be a plain ``AbstractActor`` or an ``AbstractPersistentActor`` if it needs to store its
own state (e.g. fromSequenceNr offset).
.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#Sink_actorRef
.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages
.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorSubscriber
Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method
and setting the “initial behavior” in the constructor by calling the :meth:`receive` method.
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view
The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that
the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a
persistent actor is started later and begins to write new messages, by default the corresponding view is updated automatically.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
The default update interval of all persistent views of an actor system is configurable:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``AbstractPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view-update
If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the
incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update
Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To
limit the number of replayed messages per update request, applications can configure a custom
``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number
of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message.
Recovery
--------
Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-java-lambda`.
.. _persistence-identifiers-java-lambda:
Identifiers
-----------
A persistent view must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its
persistent actor should be shared (which is what applications usually do not want).
.. _snapshots-java-lambda:
Snapshots
=========

View file

@ -58,10 +58,6 @@ Architecture
When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can
recover internal state from these messages.
* *UntypedPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream.
* *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in
case of sender and receiver JVM crashes.
@ -518,91 +514,6 @@ For example, if you configure the replay filter for leveldb plugin, it looks lik
}
.. _persistent-views-java:
Persistent Views
================
.. warning::
``UntypedPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``UntypedPersistentView`` actor:
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
being propagated all the way
* `ActorSubscriber`_ in case you need more fine grained control
The consuming actor may be a plain ``UntypedActor`` or an ``UntypedPersistentActor`` if it needs to store its
own state (e.g. fromSequenceNr offset).
.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#Sink_actorRef
.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages
.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorSubscriber
Persistent views can be implemented by extending the ``UntypedPersistentView`` trait and implementing the ``onReceive``
and the ``persistenceId`` methods.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#view
The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that
the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a
persistent actor is started later and begins to write new messages, by
default the corresponding view is updated automatically.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
The default update interval of all persistent views of an actor system is configurable:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``UntypedPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#view-update
If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the
incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update
Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To
limit the number of replayed messages per update request, applications can configure a custom
``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number
of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message.
Recovery
--------
Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`.
.. _persistence-identifiers-java:
Identifiers
-----------
A persistent view must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its
persistent actor should be shared (which is what applications usually do not want).
.. _snapshots-java:
Snapshots
=========

View file

@ -6,3 +6,676 @@ Migration Guide 2.3.x to 2.4.x
Migration from 2.3.x to 2.4.x is described in the
`documentation of 2.4 <http://doc.akka.io/docs/akka/2.4/project/migration-guide-2.3.x-2.4.x.html>`_.
The 2.4 release contains some structural changes that require some
simple, mechanical source-level changes in client code.
When migrating from earlier versions you should first follow the instructions for
migrating :ref:`1.3.x to 2.0.x <migration-2.0>` and then :ref:`2.0.x to 2.1.x <migration-2.1>`
and then :ref:`2.1.x to 2.2.x <migration-2.2>` and then :ref:`2.2.x to 2.3.x <migration-2.3>`.
Binary Compatibility
====================
Akka 2.4.x is backwards binary compatible with previous 2.3.x versions apart from the following
exceptions. This means that the new JARs are a drop-in replacement for the old one
(but not the other way around) as long as your build does not enable the inliner (Scala-only restriction).
The following parts are not binary compatible with 2.3.x:
* akka-testkit and akka-remote-testkit
* experimental modules, such as akka-persistence and akka-contrib
* features, classes, methods that were deprecated in 2.3.x and removed in 2.4.x
The dependency to **Netty** has been updated from version 3.8.0.Final to 3.10.3.Final. The changes in
those versions might not be fully binary compatible, but we believe that it will not be a problem
in practice. No changes were needed to the Akka source code for this update. Users of libraries that
depend on 3.8.0.Final that break with 3.10.3.Final should be able to manually downgrade the dependency
to 3.8.0.Final and Akka will still work with that version.
Advanced Notice: TypedActors will go away
=========================================
While technically not yet deprecated, the current ``akka.actor.TypedActor`` support will be superseded by
the :ref:`typed-scala` project that is currently being developed in open preview mode. If you are using TypedActors
in your projects you are advised to look into this, as it is superior to the Active Object pattern expressed
in TypedActors. The generic ActorRefs in Akka Typed allow the same type-safety that is afforded by
TypedActors while retaining all the other benefits of an explicit actor model (including the ability to
change behaviors etc.).
It is likely that TypedActors will be officially deprecated in the next major update of Akka and subsequently removed.
Removed Deprecated Features
===========================
The following, previously deprecated, features have been removed:
* akka-dataflow
* akka-transactor
* durable mailboxes (akka-mailboxes-common, akka-file-mailbox)
* Cluster.publishCurrentClusterState
* akka.cluster.auto-down, replaced by akka.cluster.auto-down-unreachable-after in Akka 2.3
* Old routers and configuration.
Note that in router configuration you must now specify if it is a ``pool`` or a ``group``
in the way that was introduced in Akka 2.3.
* Timeout constructor without unit
* JavaLoggingEventHandler, replaced by JavaLogger
* UntypedActorFactory
* Java API TestKit.dilated, moved to JavaTestKit.dilated
Protobuf Dependency
===================
The transitive dependency to Protobuf has been removed to make it possible to use any version
of Protobuf for the application messages. If you use Protobuf in your application you need
to add the following dependency with desired version number::
"com.google.protobuf" % "protobuf-java" % "2.5.0"
Internally Akka is using an embedded version of protobuf that corresponds to ``com.google.protobuf/protobuf-java``
version 2.5.0. The package name of the embedded classes has been changed to ``akka.protobuf``.
Added parameter validation to RootActorPath
===========================================
Previously ``akka.actor.RootActorPath`` allowed passing in arbitrary strings into its name parameter,
which is meant to be the *name* of the root Actor. Subsequently, if constructed with an invalid name
such as a full path for example (``/user/Full/Path``) some features using this path may transparently fail -
such as using ``actorSelection`` on such invalid path.
In Akka 2.4.x the ``RootActorPath`` validates the input and may throw an ``IllegalArgumentException`` if
the passed in name string is illegal (contains ``/`` elsewhere than in the begining of the string or contains ``#``).
TestKit.remaining throws AssertionError
=======================================
In earlier versions of Akka `TestKit.remaining` returned the default timeout configurable under
"akka.test.single-expect-default". This was a bit confusing and thus it has been changed to throw an
AssertionError if called outside of within. The old behavior however can still be achieved by
calling `TestKit.remainingOrDefault` instead.
EventStream and ManagedActorClassification EventBus now require an ActorSystem
==============================================================================
Both the ``EventStream`` (:ref:`Scala <event-stream-scala>`, :ref:`Java <event-stream-java>`) and the
``ManagedActorClassification``, ``ManagedActorEventBus`` (:ref:`Scala <actor-classification-scala>`, :ref:`Java <actor-classification-java>`) now
require an ``ActorSystem`` to properly operate. The reason for that is moving away from stateful internal lifecycle checks
to a fully reactive model for unsubscribing actors that have ``Terminated``. Therefore the ``ActorClassification``
and ``ActorEventBus`` was deprecated and replaced by ``ManagedActorClassification`` and ``ManagedActorEventBus``
If you have implemented a custom event bus, you will need to pass in the actor system through the constructor now:
.. includecode:: ../scala/code/docs/event/EventBusDocSpec.scala#actor-bus
If you have been creating EventStreams manually, you now have to provide an actor system and *start the unsubscriber*:
.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala#event-bus-start-unsubscriber-scala
Please note that this change affects you only if you have implemented your own buses, Akka's own ``context.eventStream``
is still there and does not require any attention from you concerning this change.
FSM notifies on same state transitions
======================================
When changing states in an Finite-State-Machine Actor (``FSM``), state transition events are emitted and can be handled by the user
either by registering ``onTransition`` handlers or by subscribing to these events by sending it an ``SubscribeTransitionCallBack`` message.
Previously in ``2.3.x`` when an ``FSM`` was in state ``A`` and performed a ``goto(A)`` transition, no state transition notification would be sent.
This is because it would effectively stay in the same state, and was deemed to be semantically equivalent to calling ``stay()``.
In ``2.4.x`` when an ``FSM`` performs an any ``goto(X)`` transition, it will always trigger state transition events.
Which turns out to be useful in many systems where same-state transitions actually should have an effect.
In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead.
Circuit Breaker Timeout Change
==============================
In ``2.3.x`` calls protected by the ``CircuitBreaker`` were allowed to run indefinitely and the check to see if the timeout had been exceeded was done after the call had returned.
In ``2.4.x`` the failureCount of the Breaker will be increased as soon as the timeout is reached and a ``Failure[TimeoutException]`` will be returned immediately for asynchronous calls. Synchronous calls will now throw a ``TimeoutException`` after the call is finished.
Slf4j logging filter
====================
If you use ``Slf4jLogger`` you should add the following configuration::
akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
It will filter the log events using the backend configuration (e.g. logback.xml) before
they are published to the event bus.
Inbox.receive Java API
======================
``Inbox.receive`` now throws a checked ``java.util.concurrent.TimeoutException`` exception if the receive timeout
is reached.
Pool routers nrOfInstances method now takes ActorSystem
=======================================================
In order to make cluster routers smarter about when they can start local routees,
``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument.
In case you have implemented a custom Pool you will have to update the method's signature,
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.
Group routers paths method now takes ActorSystem
================================================
In order to make cluster routers smarter about when they can start local routees,
``paths`` defined on ``Group`` now takes ``ActorSystem`` as an argument.
In case you have implemented a custom Group you will have to update the method's signature,
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.
Cluster aware router max-total-nr-of-instances
==============================================
In 2.3.x the deployment configuration property ``nr-of-instances`` was used for
cluster aware routers to specify total number of routees in the cluster.
This was confusing, especially since the default value is 1.
In 2.4.x there is a new deployement property ``cluster.max-total-nr-of-instances`` that
defines total number of routees in the cluster. By default ``max-total-nr-of-instances``
is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster.
Set it to a lower value if you want to limit total number of routees.
For backwards compatibility reasons ``nr-of-instances`` is still used if defined by user,
i.e. if defined it takes precedence over ``max-total-nr-of-instances``.
Logger names use full class name
================================
Previously, few places in Akka used "simple" logger names, such as ``Cluster`` or ``Remoting``.
Now they use full class names, such as ``akka.cluster.Cluster`` or ``akka.remote.Remoting``,
in order to allow package level log level definitions and ease source code lookup.
In case you used specific "simple" logger name based rules in your ``logback.xml`` configurations,
please change them to reflect appropriate package name, such as
``<logger name='akka.cluster' level='warn' />`` or ``<logger name='akka.remote' level='error' />``
Default interval for TestKit.awaitAssert changed to 100 ms
==========================================================
Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a
longer interval.
Secure Cookies
==============
`Secure cookies` feature was deprecated.
AES128CounterInetRNG and AES256CounterInetRNG are Deprecated
============================================================
Use ``AES128CounterSecureRNG`` or ``AES256CounterSecureRNG`` as
``akka.remote.netty.ssl.security.random-number-generator``.
Microkernel is Deprecated
=========================
Akka Microkernel is deprecated and will be removed. It is replaced by using an ordinary
user defined main class and packaging with `sbt-native-packager <https://github.com/sbt/sbt-native-packager>`_
or `Lightbend ConductR <http://www.lightbend.com/products/conductr>`_.
Please see :ref:`deployment-scenarios` for more information.
New Cluster Metrics Extension
=============================
Previously, cluster metrics functionality was located in the ``akka-cluster`` jar.
Now it is split out and moved into a separate Akka module: ``akka-cluster-metrics`` jar.
The module comes with few enhancements, such as use of Kamon sigar-loader
for native library provisioning as well as use of statistical averaging of metrics data.
Note that both old and new metrics configuration entries in the ``reference.conf``
are still in the same name space ``akka.cluster.metrics`` but are not compatible.
Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``,
since it is still enabled in akka-cluster by default (for compatibility with past releases).
Router configuration entries have also changed for the module, they use prefix ``cluster-metrics-``:
``cluster-metrics-adaptive-pool`` and ``cluster-metrics-adaptive-group``
Metrics extension classes and objects are located in the new package ``akka.cluster.metrics``.
Please see :ref:`Scala <cluster_metrics_scala>`, :ref:`Java <cluster_metrics_java>` for more information.
Cluster tools moved to separate module
======================================
The Cluster Singleton, Distributed Pub-Sub, and Cluster Client previously located in the ``akka-contrib``
jar is now moved to a separate module named ``akka-cluster-tools``. You need to replace this dependency
if you use any of these tools.
The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster.singleton``, ``akka.cluster.pubsub``
and ``akka.cluster.client``.
The configuration properties changed name to ``akka.cluster.pub-sub`` and ``akka.cluster.client``.
Cluster sharding moved to separate module
=========================================
The Cluster Sharding previously located in the ``akka-contrib`` jar is now moved to a separate module
named ``akka-cluster-sharding``. You need to replace this dependency if you use Cluster Sharding.
The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster.sharding``.
The configuration properties changed name to ``akka.cluster.sharding``.
ClusterSharding construction
============================
Several parameters of the ``start`` method of the ``ClusterSharding`` extension are now defined
in a settings object ``ClusterShardingSettings``.
It can be created from system configuration properties and also amended with API.
These settings can be defined differently per entry type if needed.
Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method
of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter.
Entry was renamed to Entity, for example in the ``MessagesExtractor`` in the Java API
and the ``EntityId`` type in the Scala API.
``idExtractor`` function was renamed to ``extractEntityId``. ``shardResolver`` function
was renamed to ``extractShardId``.
Cluster Sharding Entry Path Change
==================================
Previously in ``2.3.x`` entries were direct children of the local ``ShardRegion``. In examples the ``persistenceId`` of entries
included ``self.path.parent.name`` to include the cluster type name.
In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard
type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry.
Asynchronous ShardAllocationStrategy
====================================
The methods of the ``ShardAllocationStrategy`` and ``AbstractShardAllocationStrategy`` in Cluster Sharding
have changed return type to a ``Future`` to support asynchronous decision. For example you can ask an
actor external actor of how to allocate shards or rebalance shards.
For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or
``akka.dispatch.Futures.successful`` in Java.
Cluster Sharding internal data
==============================
The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence.
This data can safely be removed when restarting the whole Akka Cluster.
The serialization format of the internal persistent events stored by the Cluster Sharding coordinator
has been changed and it cannot load old data from 2.3.x or some 2.4 milestone.
The ``persistenceId`` of the Cluster Sharding coordinator has been changed since 2.3.x so
it should not load such old data, but it can be a problem if you have used a 2.4
milestone release. In that case you should remove the persistent data that the
Cluster Sharding coordinator stored. Note that this is not application data.
You can use the :ref:`RemoveInternalClusterShardingData <RemoveInternalClusterShardingData-scala>`
utility program to remove this data.
The new ``persistenceId`` is ``s"/sharding/${typeName}Coordinator"``.
The old ``persistenceId`` is ``s"/user/sharding/${typeName}Coordinator/singleton/coordinator"``.
ClusterSingletonManager and ClusterSingletonProxy construction
==============================================================
Parameters to the ``Props`` factory methods have been moved to settings object ``ClusterSingletonManagerSettings``
and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also
amended with API as needed.
The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.
The ``singletonPath`` parameter of ``ClusterSingletonProxy.props`` has changed. It is now named
``singletonManagerPath`` and is the logical path of the singleton manager, e.g. ``/user/singletonManager``,
which ends with the name you defined in ``actorOf`` when creating the ``ClusterSingletonManager``.
In 2.3.x it was the path to singleton instance, which was error-prone because one had to provide both
the name of the singleton manager and the singleton actor.
DistributedPubSub construction
==============================
Normally, the ``DistributedPubSubMediator`` actor is started by the ``DistributedPubSubExtension``.
This extension has been renamed to ``DistributedPubSub``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.
The parameters of the ``Props`` factory methods in the ``DistributedPubSubMediator`` companion
has been moved to settings object ``DistributedPubSubSettings``. This can be created from
system configuration properties and also amended with API as needed.
ClusterClient construction
==========================
The parameters of the ``Props`` factory methods in the ``ClusterClient`` companion
has been moved to settings object ``ClusterClientSettings``. This can be created from
system configuration properties and also amended with API as needed.
The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.
Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``.
This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.
The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion
has been moved to settings object ``ClusterReceptionistSettings``. This can be created from
system configuration properties and also amended with API as needed.
The ``ClusterReceptionist`` actor that is started by the ``ClusterReceptionistExtension``
is now started as a ``system`` actor instead of a ``user`` actor, i.e. the default path for
the ``ClusterClient`` initial contacts has changed to
``"akka.tcp://system@hostname:port/system/receptionist"``.
ClusterClient sender
====================
In 2.3 the ``sender()`` of the response messages, as seen by the client, was the
actor in cluster.
In 2.4 the ``sender()`` of the response messages, as seen by the client, is ``deadLetters``
since the client should normally send subsequent messages via the ``ClusterClient``.
It is possible to pass the original sender inside the reply messages if
the client is supposed to communicate directly to the actor in the cluster.
Akka Persistence
================
Experimental removed
--------------------
The artifact name has changed from ``akka-persistence-experimental`` to ``akka-persistence``.
New sbt dependency::
"com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@
New Maven dependency::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
The artefact name of the Persistent TCK has changed from ``akka-persistence-tck-experimental`` (``akka-persistence-experimental-tck``) to
``akka-persistence-tck``.
Mandatory persistenceId
-----------------------
It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor``
and ``AbstractPersistentId``.
The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical
"which persistent entity this actor represents".
In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily
implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows::
override def persistenceId = self.path.toStringWithoutAddress
Failures
--------
Backend journal failures during recovery and persist are treated differently than in 2.3.x. The ``PersistenceFailure``
message is removed and the actor is unconditionally stopped. The new behavior and reasons for it is explained in
:ref:`failures-scala`.
Persist sequence of events
--------------------------
The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and
renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling
the overloaded ``persist(event)`` method.
non-permanent deletion
----------------------
The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes are not supported
any more. Events that were deleted with ``permanent=false`` with older version will
still not be replayed in this version.
Recover message is gone, replaced by Recovery config
----------------------------------------------------
Previously the way to cause recover in PersistentActors was sending them a ``Recover()`` message.
Most of the time it was the actor itself sending such message to ``self`` in its ``preStart`` method,
however it was possible to send this message from an external source to any ``PersistentActor`` or ``PresistentView``
to make it start recovering.
This style of starting recovery does not fit well with usual Actor best practices: an Actor should be independent
and know about its internal state, and also about its recovery or lack thereof. In order to guide users towards
more independent Actors, the ``Recovery()`` object is now not used as a message, but as configuration option
used by the Actor when it starts. In order to migrate previous code which customised its recovery mode use this example
as reference::
// previously
class OldCookieMonster extends PersistentActor {
def preStart() = self ! Recover(toSequenceNr = 42L)
// ...
}
// now:
class NewCookieMonster extends PersistentActor {
override def recovery = Recovery(toSequenceNr = 42L)
// ...
}
Sender reference of replayed events is deadLetters
--------------------------------------------------
While undocumented, previously the ``sender()`` of the replayed messages would be the same sender that originally had
sent the message. Since sender is an ``ActorRef`` and those events are often replayed in different incarnations of
actor systems and during the entire lifetime of the app, relying on the existence of this reference is most likely
not going to succeed. In order to avoid bugs in the style of "it worked last week", the ``sender()`` reference is now not
stored, in order to avoid potential bugs which this could have provoked.
The previous behaviour was never documented explicitly (nor was it a design goal), so it is unlikely that applications
have explicitly relied on this behaviour, however if you find yourself with an application that did exploit this you
should rewrite it to explicitly store the ``ActorPath`` of where such replies during replay may have to be sent to,
instead of relying on the sender reference during replay.
max-message-batch-size config
-----------------------------
Configuration property ``akka.persistence.journal.max-message-batch-size`` has been moved into the plugin configuration
section, to allow different values for different journal plugins. See ``reference.conf``.
akka.persistence.snapshot-store.plugin config
---------------------------------------------
The configuration property ``akka.persistence.snapshot-store.plugin`` now by default is empty. To restore the previous
setting add ``akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"`` to your application.conf.
See ``reference.conf``.
PersistentView is deprecated
----------------------------
``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``PersistentView`` actor which are documented in :ref:`stream-integrations-scala`
for Scala and :ref:`Java <stream-integrations-java>`.
The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its
own state (e.g. fromSequenceNr offset).
.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#Sink_actorRef
.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages
.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#ActorSubscriber
Persistence Plugin APIs
=======================
SyncWriteJournal removed
------------------------
``SyncWriteJournal`` removed in favor of using ``AsyncWriteJournal``.
If the storage backend API only supports synchronous, blocking writes,
the methods can still be implemented in terms of the asynchronous API.
Example of how to do that is in included in the
See :ref:`Journal plugin API for Scala <journal-plugin-api>`
or :ref:`Journal plugin API for Java <journal-plugin-api-java>`.
SnapshotStore: Snapshots can now be deleted asynchronously (and report failures)
--------------------------------------------------------------------------------
Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots,
and failures of deleting a snapshot may have been even silently ignored.
Now ``SnapshotStore`` must return a ``Future`` representing the deletion of the snapshot.
If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will
be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure``
will be sent to the actor instead.
For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent
``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria,
instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages.
SnapshotStore: Removed 'saved' callback
---------------------------------------
Snapshot Stores previously were required to implement a ``def saved(meta: SnapshotMetadata): Unit`` method which
would be called upon successful completion of a ``saveAsync`` (``doSaveAsync`` in Java API) snapshot write.
Currently all journals and snapshot stores perform asynchronous writes and deletes, thus all could potentially benefit
from such callback methods. The only gain these callback give over composing an ``onComplete`` over ``Future`` returned
by the journal or snapshot store is that it is executed in the Actors context, thus it can safely (without additional
synchronization modify its internal state - for example a "pending writes" counter).
However, this feature was not used by many plugins, and expanding the API to accomodate all callbacks would have grown
the API a lot. Instead, Akka Persistence 2.4.x introduces an additional (optionally overrideable)
``receivePluginInternal:Actor.Receive`` method in the plugin API, which can be used for handling those as well as any custom messages
that are sent to the plugin Actor (imagine use cases like "wake up and continue reading" or custom protocols which your
specialised journal can implement).
Implementations using the previous feature should adjust their code as follows::
// previously
class MySnapshots extends SnapshotStore {
// old API:
// def saved(meta: SnapshotMetadata): Unit = doThings()
// new API:
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
// completion or failure of the returned future triggers internal messages in receivePluginInternal
val f: Future[Unit] = ???
// custom messages can be piped to self in order to be received in receivePluginInternal
f.map(MyCustomMessage(_)) pipeTo self
f
}
def receivePluginInternal = {
case SaveSnapshotSuccess(metadata) => doThings()
case MyCustomMessage(data) => doOtherThings()
}
// ...
}
SnapshotStore: Java 8 Optional used in Java plugin APIs
-------------------------------------------------------
In places where previously ``akka.japi.Option`` was used in Java APIs, including the return type of ``doLoadAsync``,
the Java 8 provided ``Optional`` type is used now.
Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to
use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is
slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``).
Atomic writes
-------------
``asyncWriteMessages`` takes a ``immutable.Seq[AtomicWrite]`` parameter instead of
``immutable.Seq[PersistentRepr]``.
Each `AtomicWrite` message contains the single ``PersistentRepr`` that corresponds to the event that was
passed to the ``persist`` method of the ``PersistentActor``, or it contains several ``PersistentRepr``
that corresponds to the events that were passed to the ``persistAll`` method of the ``PersistentActor``.
All ``PersistentRepr`` of the `AtomicWrite` must be written to the data store atomically, i.e. all or
none must be stored.
If the journal (data store) cannot support atomic writes of multiple events it should
reject such writes with a ``Try`` ``Failure`` with an ``UnsupportedOperationException``
describing the issue. This limitation should also be documented by the journal plugin.
Rejecting writes
----------------
``asyncWriteMessages`` returns a ``Future[immutable.Seq[Try[Unit]]]``.
The journal can signal that it rejects individual messages (``AtomicWrite``) by the returned
`immutable.Seq[Try[Unit]]`. The returned ``Seq`` must have as many elements as the input
``messages`` ``Seq``. Each ``Try`` element signals if the corresponding ``AtomicWrite``
is rejected or not, with an exception describing the problem. Rejecting a message means it
was not stored, i.e. it must not be included in a later replay. Rejecting a message is
typically done before attempting to store it, e.g. because of serialization error.
Read the :ref:`API documentation <journal-plugin-api>` of this method for more
information about the semantics of rejections and failures.
asyncReplayMessages Java API
----------------------------
The signature of `asyncReplayMessages` in the Java API changed from ``akka.japi.Procedure``
to ``java.util.function.Consumer``.
asyncDeleteMessagesTo
---------------------
The ``permanent`` deletion flag was removed. Support for non-permanent deletions was
removed. Events that were deleted with ``permanent=false`` with older version will
still not be replayed in this version.
References to "replay" in names
-------------------------------
Previously a number of classes and methods used the word "replay" interchangeably with the word "recover".
This lead to slight inconsistencies in APIs, where a method would be called ``recovery``, yet the
signal for a completed recovery was named ``ReplayMessagesSuccess``.
This is now fixed, and all methods use the same "recovery" wording consistently across the entire API.
The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure``
has been introduced.
AtLeastOnceDelivery deliver signature
-------------------------------------
The signature of ``deliver`` changed slightly in order to allow both ``ActorSelection`` and ``ActorPath`` to be
used with it.
Previously:
def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit
Now:
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit
def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit
The Java API remains unchanged and has simply gained the 2nd overload which allows ``ActorSelection`` to be
passed in directly (without converting to ``ActorPath``).
Actor system shutdown
---------------------
``ActorSystem.shutdown``, ``ActorSystem.awaitTermination`` and ``ActorSystem.isTerminated`` has been
deprecated in favor of ``ActorSystem.terminate`` and ``ActorSystem.whenTerminated```. Both returns a
``Future[Terminated]`` value that will complete when the actor system has terminated.
To get the same behavior as ``ActorSystem.awaitTermination`` block and wait for ``Future[Terminated]`` value
with ``Await.result`` from the Scala standard library.
To trigger a termination and wait for it to complete:
import scala.concurrent.duration._
Await.result(system.terminate(), 10.seconds)
Be careful to not do any operations on the ``Future[Terminated]`` using the ``system.dispatcher``
as ``ExecutionContext`` as it will be shut down with the ``ActorSystem``, instead use for example
the Scala standard library context from ``scala.concurrent.ExecutionContext.global``.
::
// import system.dispatcher <- this would not work
import scala.concurrent.ExecutionContext.Implicits.global
system.terminate().foreach { _ =>
println("Actor system was shut down")
}

View file

@ -4,6 +4,24 @@
Migration Guide 2.4.x to 2.5.x
##############################
Akka Persistence
================
Removal of PersistentView
-------------------------
After being deprecated for a long time, and replaced by :ref:`Persistence Query Java <persistence-query-java>`
(:ref:`Persistence Query Scala <persistence-query-scala>`) ``PersistentView`` has been removed now removed.
The corresponding query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``PersistentView``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``PersistentView`` actor which are documented in :ref:`stream-integrations-scala`
for Scala and :ref:`Java <stream-integrations-java>`.
The consuming actor may be a plain ``Actor`` or an ``PersistentActor`` if it needs to store its own state (e.g. ``fromSequenceNr`` offset).
Please note that Persistence Query is not experimental anymore in Akka ``2.5.0``, so you can safely upgrade to it.
Akka Streams
============

View file

@ -607,7 +607,7 @@ Router Example with Pool of Remote Deployed Routees
Let's take a look at how to use a cluster aware router on single master node that creates
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-scala`
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
in the cluster-tools module. The ``ClusterSingletonManager`` is started on each node.
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#create-singleton-manager

View file

@ -423,29 +423,4 @@ object PersistenceDocSpec {
//#safe-shutdown-example-good
}
object View {
import akka.actor.Props
val system: ActorSystem = ???
//#view
class MyView extends PersistentView {
override def persistenceId: String = "some-persistence-id"
override def viewId: String = "some-persistence-id-view"
def receive: Receive = {
case payload if isPersistent =>
// handle message from journal...
case payload =>
// handle message from user-land...
}
}
//#view
//#view-update
val view = system.actorOf(Props[MyView])
view ! Update(await = true)
//#view-update
}
}

View file

@ -504,88 +504,6 @@ For example, if you configure the replay filter for leveldb plugin, it looks lik
mode = repair-by-discard-old
}
.. _persistent-views:
Persistent Views
================
.. warning::
``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
to an actor corresponding to a previous ``PersistentView`` actor:
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
being propagated all the way
* `ActorSubscriber`_ in case you need more fine grained control
The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its
own state (e.g. fromSequenceNr offset).
.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#Sink_actorRef
.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages
.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#ActorSubscriber
Persistent views can be implemented by extending the ``PersistentView`` trait and implementing the ``receive`` and the ``persistenceId``
methods.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view
The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that
the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a
persistent actor is started later and begins to write new messages, by default the corresponding view is updated automatically.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent``
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the ``if isPersistent`` check).
Updates
-------
The default update interval of all views of an actor system is configurable:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``PersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view-update
If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the
incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all persistent views of an actor system can be turned off by configuration:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update
Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To
limit the number of replayed messages per update request, applications can configure a custom
``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number
of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message.
Recovery
--------
Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`recovery-scala`.
.. _persistence-identifiers:
Identifiers
-----------
A persistent view must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its
persistent actor should be shared (which is what applications usually do not want).
.. _snapshots:
Snapshots

View file

@ -1,396 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor.AbstractActor
import akka.actor.Actor
import akka.actor.Cancellable
import akka.actor.Stash
import akka.actor.StashFactory
import akka.actor.UntypedActor
import akka.actor.ActorLogging
/**
* Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with
* all messages from the corresponding persistent id's journal that have not yet been consumed by the view.
* To update a view with messages that have been written after handling this request, another `Update`
* request must be sent to the view.
*
* @param await if `true`, processing of further messages sent to the view will be delayed until the
* incremental message replay, triggered by this update request, completes. If `false`,
* any message sent to the view may interleave with replayed persistent event stream.
* @param replayMax maximum number of messages to replay when handling this update request. Defaults
* to `Long.MaxValue` (i.e. no limit).
*/
@SerialVersionUID(1L)
final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue)
object Update {
/**
* Java API.
*/
def create() =
Update()
/**
* Java API.
*/
def create(await: Boolean) =
Update(await)
/**
* Java API.
*/
def create(await: Boolean, replayMax: Long) =
Update(await, replayMax)
}
/**
* INTERNAL API
*/
private[akka] object PersistentView {
private final case class ScheduledUpdate(replayMax: Long)
}
/**
* A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive
* the message stream directly from the Journal. These messages can be processed to update internal state
* in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A
* persistent view can also run on a different node, provided that a replicated journal is used.
*
* Implementation classes refer to a persistent actors' message stream by implementing `persistenceId`
* with the corresponding (shared) identifier value.
*
* Views can also store snapshots of internal state by calling [[PersistentView#autoUpdate]]. The snapshots of a view
* are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
* than the snapshot. Default is to offer the latest saved snapshot.
*
* By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`.
* This method can be overridden by implementation classes to define a view instance-specific update
* interval. The default update interval for all views of an actor system can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
* view updates by sending the view [[Update]] requests. See also methods
*
* - [[PersistentView#autoUpdate]] for turning automated updates on or off
* - [[PersistentView#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*
*/
@deprecated("use Persistence Query instead", "2.4")
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
with PersistenceIdentity with PersistenceRecovery
with ActorLogging {
import PersistentView._
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import context.dispatcher
private val extension = Persistence(context.system)
private val viewSettings = extension.settings.view
private[persistence] lazy val journal = extension.journalFor(journalPluginId)
private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId)
private var schedule: Option[Cancellable] = None
private var _lastSequenceNr: Long = 0L
private val internalStash = createStash()
private var currentState: State = recoveryStarted(Long.MaxValue)
/**
* View id is used as identifier for snapshots performed by this [[PersistentView]].
* This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream.
*
*
* The usual case is to have a *different* id set as `viewId` than `persistenceId`,
* although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots
* based on some average or sum, calculated by this view.
*
* Example:
* {{{
* class SummingView extends PersistentView {
* override def persistenceId = "count-123"
* override def viewId = "count-123-sum" // this view is performing summing,
* // so this view's snapshots reside under the "-sum" suffixed id
*
* // ...
* }
* }}}
*/
def viewId: String
/**
* Returns `viewId`.
*/
def snapshotterId: String = viewId
/**
* If `true`, the currently processed message was persisted (is sent from the Journal).
* If `false`, the currently processed message comes from another actor (from "user-land").
*/
def isPersistent: Boolean = currentState.recoveryRunning
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
* If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
* value can be configured with the `akka.persistence.view.auto-update` configuration key. This method
* can be overridden by implementation classes to return non-default values.
*/
def autoUpdate: Boolean =
viewSettings.autoUpdate
/**
* The interval for automated updates. The default value can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. This method can be
* overridden by implementation classes to return non-default values.
*/
def autoUpdateInterval: FiniteDuration =
viewSettings.autoUpdateInterval
/**
* The maximum number of messages to replay per update. The default value can be configured with the
* `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by
* implementation classes to return non-default values.
*/
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax match {
case -1 Long.MaxValue
case value value
}
/**
* Highest received sequence number so far or `0L` if this actor hasn't replayed
* any persistent events yet.
*/
def lastSequenceNr: Long = _lastSequenceNr
/**
* Returns `lastSequenceNr`.
*/
def snapshotSequenceNr: Long = lastSequenceNr
private def setLastSequenceNr(value: Long): Unit =
_lastSequenceNr = value
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
override def recovery = Recovery(replayMax = autoUpdateReplayMax)
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
*/
override def preStart(): Unit = {
startRecovery(recovery)
if (autoUpdate)
schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax)))
}
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
}
/** INTERNAL API. */
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
currentState.stateReceive(receive, message)
/** INTERNAL API. */
override protected[akka] def aroundPreStart(): Unit = {
// Fail fast on missing plugins.
val j = journal; val s = snapshotStore
super.aroundPreStart()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try internalStash.unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = {
schedule.foreach(_.cancel())
super.postStop()
}
/**
* Called whenever a message replay fails. By default it logs the error.
* Subclass may override to customize logging.
* The `PersistentView` will not stop or throw exception due to this.
* It will try again on next update.
*/
protected def onReplayError(cause: Throwable): Unit = {
log.error(cause, "Persistence view failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", persistenceId, lastSequenceNr)
}
private def changeState(state: State): Unit = {
currentState = state
}
// TODO There are some duplication of the recovery state management here and in Eventsourced.scala,
// but the enhanced PersistentView will not be based on recovery infrastructure, and
// therefore this code will be replaced anyway
private trait State {
def stateReceive(receive: Receive, message: Any): Unit
def recoveryRunning: Boolean
}
/**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receive`. Then initiates a message replay, either starting
* from the loaded snapshot or from scratch, and switches to `replayStarted` state.
* All incoming messages are stashed.
*
* @param replayMax maximum number of messages to replay.
*/
private def recoveryStarted(replayMax: Long) = new State {
override def toString: String = s"recovery started (replayMax = [${replayMax}])"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case LoadSnapshotResult(sso, toSnr)
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
setLastSequenceNr(metadata.sequenceNr)
PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot))
}
changeState(replayStarted(await = true))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case other internalStash.stash()
}
}
/**
* Processes replayed messages, if any. The actor's `receive` is invoked with the replayed
* events.
*
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence
* number from the journal.
*
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayError` is called and
* remaining replay events are consumed (ignored).
*
* If processing of a replayed event fails, the exception is caught and
* stored for later and state is changed to `recoveryFailed`.
*
* All incoming messages are stashed when `await` is true.
*/
private def replayStarted(await: Boolean) = new State {
override def toString: String = s"replay started"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p)
try {
updateLastSequenceNr(p)
PersistentView.super.aroundReceive(receive, p.payload)
} catch {
case NonFatal(t)
changeState(ignoreRemainingReplay(t))
}
case _: RecoverySuccess
onReplayComplete()
case ReplayMessagesFailure(cause)
try onReplayError(cause) finally onReplayComplete()
case ScheduledUpdate(_) // ignore
case Update(a, _)
if (a)
internalStash.stash()
case other
if (await)
internalStash.stash()
else {
try {
PersistentView.super.aroundReceive(receive, other)
} catch {
case NonFatal(t)
changeState(ignoreRemainingReplay(t))
}
}
}
/**
* Switches to `idle`
*/
private def onReplayComplete(): Unit = {
changeState(idle)
internalStash.unstashAll()
}
}
/**
* Consumes remaining replayed messages and then throw the exception.
*/
private def ignoreRemainingReplay(cause: Throwable) = new State {
override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p)
case ReplayMessagesFailure(_)
replayCompleted(receive)
// journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue)
case _: RecoverySuccess replayCompleted(receive)
case _ internalStash.stash()
}
def replayCompleted(receive: Receive): Unit = {
// in case the actor resumes the state must be `idle`
changeState(idle)
internalStash.unstashAll()
throw cause
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case ReplayedMessage(p)
// we can get ReplayedMessage here if it was stashed by user during replay
// unwrap the payload
PersistentView.super.aroundReceive(receive, p.payload)
case ScheduledUpdate(replayMax) changeStateToReplayStarted(await = false, replayMax)
case Update(awaitUpdate, replayMax) changeStateToReplayStarted(awaitUpdate, replayMax)
case other PersistentView.super.aroundReceive(receive, other)
}
def changeStateToReplayStarted(await: Boolean, replayMax: Long): Unit = {
changeState(replayStarted(await))
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
}
}
}
/**
* Java API.
*
* @see [[PersistentView]]
*/
@deprecated("use Persistence Query instead", "2.4")
abstract class UntypedPersistentView extends UntypedActor with PersistentView
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
*
* @see [[PersistentView]]
*/
@deprecated("use Persistence Query instead", "2.4")
abstract class AbstractPersistentView extends AbstractActor with PersistentView

View file

@ -1,352 +0,0 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config
import scala.concurrent.duration._
object PersistentViewSpec {
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg persist(msg) { m probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {
case _ // do nothing...
}
}
private class TestPersistentView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends PersistentView {
def this(name: String, probe: ActorRef, interval: FiniteDuration) =
this(name, probe, interval, None)
def this(name: String, probe: ActorRef) =
this(name, probe, 100.milliseconds)
override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
override val persistenceId: String = name
override val viewId: String = name + "-view"
var last: String = _
def receive = {
case "get"
probe ! last
case "boom"
throw new TestException("boom")
case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom")
case payload if isPersistent
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) a || (m == f) }
}
private class PassiveTestPersistentView(name: String, probe: ActorRef, var failAt: Option[String]) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdate: Boolean = false
override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery
var last: String = _
def receive = {
case "get"
probe ! last
case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom")
case payload
last = s"replicated-${payload}-${lastSequenceNr}"
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) a || (m == f) }
}
private class ActiveTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
def receive = {
case payload
probe ! s"replicated-${payload}-${lastSequenceNr}"
}
}
private class BecomingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override def persistenceId = name
override def viewId = name + "-view"
def receive = Actor.emptyBehavior
context.become {
case payload probe ! s"replicated-${payload}-${lastSequenceNr}"
}
}
private class StashingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override def persistenceId = name
override def viewId = name + "-view"
def receive = {
case "other" stash()
case "unstash"
unstashAll()
context.become {
case msg probe ! s"$msg-${lastSequenceNr}"
}
case msg stash()
}
}
private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
def receive = {
case payload if isPersistent probe ! s"replicated-${payload}-${lastSequenceNr}"
case payload probe ! s"normal-${payload}-${lastSequenceNr}"
}
}
private class SnapshottingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = s"${name}-replicator"
override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system)
var last: String = _
def receive = {
case "get"
probe ! last
case "snap"
saveSnapshot(last)
case "restart"
throw new TestException("restart requested")
case SaveSnapshotSuccess(_)
probe ! "snapped"
case SnapshotOffer(metadata, snapshot: String)
last = snapshot
probe ! last
case payload
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
}
}
abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import akka.persistence.PersistentViewSpec._
var persistentActor: ActorRef = _
var view: ActorRef = _
var persistentActorProbe: TestProbe = _
var viewProbe: TestProbe = _
override protected def beforeEach(): Unit = {
super.beforeEach()
persistentActorProbe = TestProbe()
viewProbe = TestProbe()
persistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, persistentActorProbe.ref))
persistentActor ! "a"
persistentActor ! "b"
persistentActorProbe.expectMsg("a-1")
persistentActorProbe.expectMsg("b-2")
}
override protected def afterEach(): Unit = {
system.stop(persistentActor)
system.stop(view)
super.afterEach()
}
def subscribeToReplay(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])
"A persistent view" must {
"receive past updates from a persistent actor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"receive live updates from a persistent actor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
viewProbe.expectMsg("replicated-c-3")
}
"run updates at specified interval" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 2.seconds))
// initial update is done on start
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
// live updates takes 5 seconds to replicate
persistentActor ! "c"
viewProbe.expectNoMsg(1.second)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = false)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request and await update" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = true)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
}
"run updates again on failure outside an update cycle" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "boom"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"run updates again on failure during an update cycle" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds, Some("b")))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"run size-limited updates on user request" in {
persistentActor ! "c"
persistentActor ! "d"
persistentActor ! "e"
persistentActor ! "f"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
persistentActorProbe.expectMsg("e-5")
persistentActorProbe.expectMsg("f-6")
view = system.actorOf(Props(classOf[PassiveTestPersistentView], name, viewProbe.ref, None))
view ! Update(await = true, replayMax = 2)
view ! "get"
viewProbe.expectMsg("replicated-b-2")
view ! Update(await = true, replayMax = 1)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
view ! Update(await = true, replayMax = 4)
view ! "get"
viewProbe.expectMsg("replicated-f-6")
}
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
persistentActor ! "c"
persistentActor ! "d"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
subscribeToReplay(replayProbe)
view = system.actorOf(Props(classOf[ActiveTestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _) }
}
"support context.become" in {
view = system.actorOf(Props(classOf[BecomingPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"check if an incoming message is persistent" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[PersistentOrNotTestPersistentView], name, viewProbe.ref))
view ! "d"
view ! "e"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("normal-d-3")
viewProbe.expectMsg("normal-e-3")
persistentActor ! "f"
viewProbe.expectMsg("replicated-f-4")
}
"take snapshots" in {
view = system.actorOf(Props(classOf[SnapshottingPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "snap"
viewProbe.expectMsg("snapped")
view ! "restart"
persistentActor ! "c"
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"support stash" in {
view = system.actorOf(Props(classOf[StashingPersistentView], name, viewProbe.ref))
view ! "other"
view ! "unstash"
viewProbe.expectMsg("a-2") // note that the lastSequenceNumber is 2, since we have replayed b-2
viewProbe.expectMsg("b-2")
viewProbe.expectMsg("other-2")
}
}
}
class LeveldbPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentViewSpec"))
class InmemPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("inmem", "InmemPersistentViewSpec"))

View file

@ -572,7 +572,7 @@ akka {
}
# DEPRECATED
# DEPRECATED, since 2.5.0
# The netty.udp transport is deprecated, please use Artery instead.
# See: http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html
netty.udp = ${akka.remote.netty.tcp}

View file

@ -1,87 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package sample.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class ViewExample {
public static class ExamplePersistentActor extends AbstractPersistentActor {
private int count = 1;
@Override
public String persistenceId() { return "sample-id-4"; }
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> {
System.out.println(String.format("persistentActor received %s (nr = %d)", s, count));
persist(s + count, evt -> {
count += 1;
});
}).
build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, s -> count += 1).
build();
}
}
public static class ExampleView extends AbstractPersistentView {
private int numReplicated = 0;
@Override public String persistenceId() { return "sample-id-4"; }
@Override public String viewId() { return "sample-view-id-4"; }
public ExampleView() {
receive(ReceiveBuilder.
match(Object.class, m -> isPersistent(), msg -> {
numReplicated += 1;
System.out.println(String.format("view received %s (num replicated = %d)",
msg,
numReplicated));
}).
match(SnapshotOffer.class, so -> {
numReplicated = (Integer) so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)",
numReplicated,
so.metadata()));
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler()
.schedule(Duration.Zero(),
Duration.create(2, TimeUnit.SECONDS),
persistentActor,
"scheduled",
system.dispatcher(),
null);
system.scheduler()
.schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
}
}

View file

@ -1,78 +0,0 @@
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedPersistentActor;
import akka.persistence.UntypedPersistentView;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class PersistentViewExample {
public static class ExamplePersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "sample-id-4"; }
private int count = 1;
@Override
public void onReceiveRecover(Object message) {
if (message instanceof String) {
count += 1;
} else {
unhandled(message);
}
}
@Override
public void onReceiveCommand(Object message) {
if (message instanceof String) {
String s = (String) message;
System.out.println(String.format("persistentActor received %s (nr = %d)", s, count));
persist(s + count, new Procedure<String>() {
public void apply(String evt) {
count += 1;
}
});
} else {
unhandled(message);
}
}
}
public static class ExampleView extends UntypedPersistentView {
private int numReplicated = 0;
@Override public String persistenceId() { return "sample-id-4"; }
@Override public String viewId() { return "sample-view-id-4"; }
@Override
public void onReceive(Object message) throws Exception {
if (isPersistent()) {
numReplicated += 1;
System.out.println(String.format("view received %s (num replicated = %d)", message, numReplicated));
} else if (message instanceof SnapshotOffer) {
SnapshotOffer so = (SnapshotOffer)message;
numReplicated = (Integer)so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)", numReplicated, so.metadata()));
} else if (message.equals("snap")) {
saveSnapshot(numReplicated);
} else {
unhandled(message);
}
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler().schedule(Duration.Zero(), Duration.create(2, TimeUnit.SECONDS), persistentActor, "scheduled", system.dispatcher(), null);
system.scheduler().schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
}
}

View file

@ -1,62 +0,0 @@
package sample.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
object ViewExample extends App {
class ExamplePersistentActor extends PersistentActor {
override def persistenceId = "sample-id-4"
var count = 1
def receiveCommand: Receive = {
case payload: String =>
println(s"persistentActor received ${payload} (nr = ${count})")
persist(payload + count) { evt =>
count += 1
}
}
def receiveRecover: Receive = {
case _: String => count += 1
}
}
class ExampleView extends PersistentView {
private var numReplicated = 0
override def persistenceId: String = "sample-id-4"
override def viewId = "sample-view-id-4"
def receive = {
case "snap" =>
println(s"view saving snapshot")
saveSnapshot(numReplicated)
case SnapshotOffer(metadata, snapshot: Int) =>
numReplicated = snapshot
println(s"view received snapshot offer ${snapshot} (metadata = ${metadata})")
case payload if isPersistent =>
numReplicated += 1
println(s"view replayed event ${payload} (num replicated = ${numReplicated})")
case SaveSnapshotSuccess(metadata) =>
println(s"view saved snapshot (metadata = ${metadata})")
case SaveSnapshotFailure(metadata, reason) =>
println(s"view snapshot failure (metadata = ${metadata}), caused by ${reason}")
case payload =>
println(s"view received other message ${payload}")
}
}
val system = ActorSystem("example")
val persistentActor = system.actorOf(Props(classOf[ExamplePersistentActor]))
val view = system.actorOf(Props(classOf[ExampleView]))
import system.dispatcher
system.scheduler.schedule(Duration.Zero, 2.seconds, persistentActor, "scheduled")
system.scheduler.schedule(Duration.Zero, 5.seconds, view, "snap")
}