From 2ab8ab2840c441e1f44832046697e10a9d723a80 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Fri, 2 Dec 2016 14:43:25 +0100 Subject: [PATCH] =21423 remove deprecated PersistentView --- .../src/main/scala/akka/actor/ActorDSL.scala | 2 +- akka-docs/rst/java/cluster-usage.rst | 4 +- .../persistence/LambdaPersistenceDocTest.java | 25 - .../docs/persistence/PersistenceDocTest.java | 31 - akka-docs/rst/java/lambda-persistence.rst | 88 --- akka-docs/rst/java/persistence.rst | 89 --- .../project/migration-guide-2.3.x-2.4.x.rst | 675 +++++++++++++++++- .../project/migration-guide-2.4.x-2.5.x.rst | 18 + akka-docs/rst/scala/cluster-usage.rst | 2 +- .../docs/persistence/PersistenceDocSpec.scala | 25 - akka-docs/rst/scala/persistence.rst | 82 --- .../akka/persistence/PersistentView.scala | 396 ---------- .../akka/persistence/PersistentViewSpec.scala | 352 --------- akka-remote/src/main/resources/reference.conf | 2 +- .../java/sample/persistence/ViewExample.java | 87 --- .../persistence/PersistentViewExample.java | 78 -- .../sample/persistence/ViewExample.scala | 62 -- 17 files changed, 697 insertions(+), 1321 deletions(-) delete mode 100644 akka-persistence/src/main/scala/akka/persistence/PersistentView.scala delete mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala delete mode 100644 akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java delete mode 100644 akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java delete mode 100644 akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 61633c3b94..c913a8bda4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -65,7 +65,7 @@ import akka.util.Helpers.ConfigOps * Note: If you want to use an `Act with Stash`, you should use the * `ActWithStash` trait in order to have the actor get the necessary deque-based * mailbox setting. - * + * * @deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead. */ @deprecated("deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead.", since = "2.5.0") diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index ec0bb057ab..ef4dac22bd 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -610,7 +610,7 @@ Router Example with Pool of Remote Deployed Routees Let's take a look at how to use a cluster aware router on single master node that creates and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-java` -in the contrib module. The ``ClusterSingletonManager`` is started on each node. +in the cluster-tools module. The ``ClusterSingletonManager`` is started on each node. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#create-singleton-manager @@ -754,4 +754,4 @@ For this purpose you can define a separate dispatcher to be used for the cluster Use dedicated dispatchers for such actors/tasks instead of running them on the default-dispatcher, because that may starve system internal tasks. Related config properties: ``akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher``. - Corresponding default values: ``akka.cluster.use-dispatcher =``. \ No newline at end of file + Corresponding default values: ``akka.cluster.use-dispatcher =``. diff --git a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index 1fc77894b6..d497c710f2 100644 --- a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -527,31 +527,6 @@ public class LambdaPersistenceDocTest { } }; - static Object o12 = new Object() { - //#view - class MyView extends AbstractPersistentView { - @Override public String persistenceId() { return "some-persistence-id"; } - @Override public String viewId() { return "some-persistence-id-view"; } - - public MyView() { - receive(ReceiveBuilder. - match(Object.class, p -> isPersistent(), persistent -> { - // ... - }).build() - ); - } - } - //#view - - public void usage() { - final ActorSystem system = ActorSystem.create("example"); - //#view-update - final ActorRef view = system.actorOf(Props.create(MyView.class)); - view.tell(Update.create(true), null); - //#view-update - } - }; - static Object o14 = new Object() { //#safe-shutdown final class Shutdown { diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 92f667f02d..380d69d4d3 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -512,37 +512,6 @@ public class PersistenceDocTest { } }; - static Object o14 = new Object() { - //#view - class MyView extends UntypedPersistentView { - @Override - public String persistenceId() { return "some-persistence-id"; } - - @Override - public String viewId() { return "my-stable-persistence-view-id"; } - - @Override - public void onReceive(Object message) throws Exception { - if (isPersistent()) { - // handle message from Journal... - } else if (message instanceof String) { - // handle message from user... - } else { - unhandled(message); - } - } - } - //#view - - public void usage() { - final ActorSystem system = ActorSystem.create("example"); - //#view-update - final ActorRef view = system.actorOf(Props.create(MyView.class)); - view.tell(Update.create(true), null); - //#view-update - } - }; - static Object o13 = new Object() { //#safe-shutdown final class Shutdown {} diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 1c5076d191..a1f22ecd5f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -54,10 +54,6 @@ Architecture When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can recover internal state from these messages. -* *AbstractPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another - persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's - replicated message stream. - * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. @@ -457,90 +453,6 @@ mechanism when ``persist()`` is used. Notice the early stop behaviour that occur .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#safe-shutdown-example-bad .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#safe-shutdown-example-good -.. _persistent-views-java-lambda: - -Persistent Views -================ - -.. warning:: - - ``AbstractPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding - query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source`` - to an actor corresponding to a previous ``UntypedPersistentView`` actor: - - * `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the - destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow - * `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure - being propagated all the way - * `ActorSubscriber`_ in case you need more fine grained control - - The consuming actor may be a plain ``AbstractActor`` or an ``AbstractPersistentActor`` if it needs to store its - own state (e.g. fromSequenceNr offset). - -.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#Sink_actorRef -.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages -.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorSubscriber - -Persistent views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method -and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. - -.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view - -The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that -the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a -persistent actor is started later and begins to write new messages, by default the corresponding view is updated automatically. - -It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` -method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases -(skip the ``if isPersistent`` check). - -Updates -------- - -The default update interval of all persistent views of an actor system is configurable: - -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval - -``AbstractPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update -interval for a specific view class or view instance. Applications may also trigger additional updates at -any time by sending a view an ``Update`` message. - -.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#view-update - -If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the -incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages -following the update request may interleave with the replayed message stream. Automated updates always run with -``await = false``. - -Automated updates of all persistent views of an actor system can be turned off by configuration: - -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update - -Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To -limit the number of replayed messages per update request, applications can configure a custom -``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number -of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message. - -Recovery --------- - -Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message -to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. -Further possibilities to customize initial recovery are explained in section :ref:`recovery-java-lambda`. - -.. _persistence-identifiers-java-lambda: - -Identifiers ------------ - -A persistent view must have an identifier that doesn't change across different actor incarnations. -The identifier must be defined with the ``viewId`` method. - -The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its -persistent actor should be shared (which is what applications usually do not want). - -.. _snapshots-java-lambda: - Snapshots ========= diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b9bef5d105..ff11d47452 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -58,10 +58,6 @@ Architecture When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can recover internal state from these messages. -* *UntypedPersistentView*: A view is a persistent, stateful actor that receives journaled messages that have been written by another - persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's - replicated message stream. - * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. @@ -518,91 +514,6 @@ For example, if you configure the replay filter for leveldb plugin, it looks lik } -.. _persistent-views-java: - -Persistent Views -================ - -.. warning:: - - ``UntypedPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding - query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source`` - to an actor corresponding to a previous ``UntypedPersistentView`` actor: - - * `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the - destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow - * `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure - being propagated all the way - * `ActorSubscriber`_ in case you need more fine grained control - - The consuming actor may be a plain ``UntypedActor`` or an ``UntypedPersistentActor`` if it needs to store its - own state (e.g. fromSequenceNr offset). - -.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#Sink_actorRef -.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages -.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorSubscriber - -Persistent views can be implemented by extending the ``UntypedPersistentView`` trait and implementing the ``onReceive`` -and the ``persistenceId`` methods. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#view - -The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that -the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a -persistent actor is started later and begins to write new messages, by -default the corresponding view is updated automatically. - -It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` -method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases -(skip the ``if isPersistent`` check). - -Updates -------- - -The default update interval of all persistent views of an actor system is configurable: - -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval - -``UntypedPersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update -interval for a specific view class or view instance. Applications may also trigger additional updates at -any time by sending a view an ``Update`` message. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#view-update - -If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the -incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages -following the update request may interleave with the replayed message stream. Automated updates always run with -``await = false``. - -Automated updates of all persistent views of an actor system can be turned off by configuration: - -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update - -Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To -limit the number of replayed messages per update request, applications can configure a custom -``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number -of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message. - -Recovery --------- - -Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message -to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. -Further possibilities to customize initial recovery are explained in section :ref:`recovery-java`. - -.. _persistence-identifiers-java: - -Identifiers ------------ - -A persistent view must have an identifier that doesn't change across different actor incarnations. -The identifier must be defined with the ``viewId`` method. - -The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its -persistent actor should be shared (which is what applications usually do not want). - -.. _snapshots-java: - Snapshots ========= diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index e5fb497cf9..05814782d9 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -5,4 +5,677 @@ Migration Guide 2.3.x to 2.4.x ############################## Migration from 2.3.x to 2.4.x is described in the -`documentation of 2.4 `_. \ No newline at end of file +`documentation of 2.4 `_. +The 2.4 release contains some structural changes that require some +simple, mechanical source-level changes in client code. + +When migrating from earlier versions you should first follow the instructions for +migrating :ref:`1.3.x to 2.0.x ` and then :ref:`2.0.x to 2.1.x ` +and then :ref:`2.1.x to 2.2.x ` and then :ref:`2.2.x to 2.3.x `. + +Binary Compatibility +==================== + +Akka 2.4.x is backwards binary compatible with previous 2.3.x versions apart from the following +exceptions. This means that the new JARs are a drop-in replacement for the old one +(but not the other way around) as long as your build does not enable the inliner (Scala-only restriction). + +The following parts are not binary compatible with 2.3.x: + +* akka-testkit and akka-remote-testkit +* experimental modules, such as akka-persistence and akka-contrib +* features, classes, methods that were deprecated in 2.3.x and removed in 2.4.x + +The dependency to **Netty** has been updated from version 3.8.0.Final to 3.10.3.Final. The changes in +those versions might not be fully binary compatible, but we believe that it will not be a problem +in practice. No changes were needed to the Akka source code for this update. Users of libraries that +depend on 3.8.0.Final that break with 3.10.3.Final should be able to manually downgrade the dependency +to 3.8.0.Final and Akka will still work with that version. + +Advanced Notice: TypedActors will go away +========================================= + +While technically not yet deprecated, the current ``akka.actor.TypedActor`` support will be superseded by +the :ref:`typed-scala` project that is currently being developed in open preview mode. If you are using TypedActors +in your projects you are advised to look into this, as it is superior to the Active Object pattern expressed +in TypedActors. The generic ActorRefs in Akka Typed allow the same type-safety that is afforded by +TypedActors while retaining all the other benefits of an explicit actor model (including the ability to +change behaviors etc.). + +It is likely that TypedActors will be officially deprecated in the next major update of Akka and subsequently removed. + +Removed Deprecated Features +=========================== + +The following, previously deprecated, features have been removed: + +* akka-dataflow + +* akka-transactor + +* durable mailboxes (akka-mailboxes-common, akka-file-mailbox) + +* Cluster.publishCurrentClusterState + +* akka.cluster.auto-down, replaced by akka.cluster.auto-down-unreachable-after in Akka 2.3 + +* Old routers and configuration. + + Note that in router configuration you must now specify if it is a ``pool`` or a ``group`` + in the way that was introduced in Akka 2.3. + +* Timeout constructor without unit + +* JavaLoggingEventHandler, replaced by JavaLogger + +* UntypedActorFactory + +* Java API TestKit.dilated, moved to JavaTestKit.dilated + +Protobuf Dependency +=================== + +The transitive dependency to Protobuf has been removed to make it possible to use any version +of Protobuf for the application messages. If you use Protobuf in your application you need +to add the following dependency with desired version number:: + + "com.google.protobuf" % "protobuf-java" % "2.5.0" + +Internally Akka is using an embedded version of protobuf that corresponds to ``com.google.protobuf/protobuf-java`` +version 2.5.0. The package name of the embedded classes has been changed to ``akka.protobuf``. + +Added parameter validation to RootActorPath +=========================================== +Previously ``akka.actor.RootActorPath`` allowed passing in arbitrary strings into its name parameter, +which is meant to be the *name* of the root Actor. Subsequently, if constructed with an invalid name +such as a full path for example (``/user/Full/Path``) some features using this path may transparently fail - +such as using ``actorSelection`` on such invalid path. + +In Akka 2.4.x the ``RootActorPath`` validates the input and may throw an ``IllegalArgumentException`` if +the passed in name string is illegal (contains ``/`` elsewhere than in the begining of the string or contains ``#``). + +TestKit.remaining throws AssertionError +======================================= + +In earlier versions of Akka `TestKit.remaining` returned the default timeout configurable under +"akka.test.single-expect-default". This was a bit confusing and thus it has been changed to throw an +AssertionError if called outside of within. The old behavior however can still be achieved by +calling `TestKit.remainingOrDefault` instead. + +EventStream and ManagedActorClassification EventBus now require an ActorSystem +============================================================================== + +Both the ``EventStream`` (:ref:`Scala `, :ref:`Java `) and the +``ManagedActorClassification``, ``ManagedActorEventBus`` (:ref:`Scala `, :ref:`Java `) now +require an ``ActorSystem`` to properly operate. The reason for that is moving away from stateful internal lifecycle checks +to a fully reactive model for unsubscribing actors that have ``Terminated``. Therefore the ``ActorClassification`` +and ``ActorEventBus`` was deprecated and replaced by ``ManagedActorClassification`` and ``ManagedActorEventBus`` + +If you have implemented a custom event bus, you will need to pass in the actor system through the constructor now: + +.. includecode:: ../scala/code/docs/event/EventBusDocSpec.scala#actor-bus + +If you have been creating EventStreams manually, you now have to provide an actor system and *start the unsubscriber*: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala#event-bus-start-unsubscriber-scala + +Please note that this change affects you only if you have implemented your own buses, Akka's own ``context.eventStream`` +is still there and does not require any attention from you concerning this change. + +FSM notifies on same state transitions +====================================== +When changing states in an Finite-State-Machine Actor (``FSM``), state transition events are emitted and can be handled by the user +either by registering ``onTransition`` handlers or by subscribing to these events by sending it an ``SubscribeTransitionCallBack`` message. + +Previously in ``2.3.x`` when an ``FSM`` was in state ``A`` and performed a ``goto(A)`` transition, no state transition notification would be sent. +This is because it would effectively stay in the same state, and was deemed to be semantically equivalent to calling ``stay()``. + +In ``2.4.x`` when an ``FSM`` performs an any ``goto(X)`` transition, it will always trigger state transition events. +Which turns out to be useful in many systems where same-state transitions actually should have an effect. + +In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead. + + +Circuit Breaker Timeout Change +============================== +In ``2.3.x`` calls protected by the ``CircuitBreaker`` were allowed to run indefinitely and the check to see if the timeout had been exceeded was done after the call had returned. + +In ``2.4.x`` the failureCount of the Breaker will be increased as soon as the timeout is reached and a ``Failure[TimeoutException]`` will be returned immediately for asynchronous calls. Synchronous calls will now throw a ``TimeoutException`` after the call is finished. + + +Slf4j logging filter +==================== + +If you use ``Slf4jLogger`` you should add the following configuration:: + + akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + +It will filter the log events using the backend configuration (e.g. logback.xml) before +they are published to the event bus. + +Inbox.receive Java API +====================== + +``Inbox.receive`` now throws a checked ``java.util.concurrent.TimeoutException`` exception if the receive timeout +is reached. + + +Pool routers nrOfInstances method now takes ActorSystem +======================================================= + +In order to make cluster routers smarter about when they can start local routees, +``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Pool you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + +Group routers paths method now takes ActorSystem +================================================ + +In order to make cluster routers smarter about when they can start local routees, +``paths`` defined on ``Group`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Group you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + +Cluster aware router max-total-nr-of-instances +============================================== + +In 2.3.x the deployment configuration property ``nr-of-instances`` was used for +cluster aware routers to specify total number of routees in the cluster. +This was confusing, especially since the default value is 1. + +In 2.4.x there is a new deployement property ``cluster.max-total-nr-of-instances`` that +defines total number of routees in the cluster. By default ``max-total-nr-of-instances`` +is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster. +Set it to a lower value if you want to limit total number of routees. + +For backwards compatibility reasons ``nr-of-instances`` is still used if defined by user, +i.e. if defined it takes precedence over ``max-total-nr-of-instances``. + +Logger names use full class name +================================ +Previously, few places in Akka used "simple" logger names, such as ``Cluster`` or ``Remoting``. +Now they use full class names, such as ``akka.cluster.Cluster`` or ``akka.remote.Remoting``, +in order to allow package level log level definitions and ease source code lookup. +In case you used specific "simple" logger name based rules in your ``logback.xml`` configurations, +please change them to reflect appropriate package name, such as +```` or ```` + +Default interval for TestKit.awaitAssert changed to 100 ms +========================================================== + +Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a +longer interval. + +Secure Cookies +============== + +`Secure cookies` feature was deprecated. + +AES128CounterInetRNG and AES256CounterInetRNG are Deprecated +============================================================ + +Use ``AES128CounterSecureRNG`` or ``AES256CounterSecureRNG`` as +``akka.remote.netty.ssl.security.random-number-generator``. + +Microkernel is Deprecated +========================= + +Akka Microkernel is deprecated and will be removed. It is replaced by using an ordinary +user defined main class and packaging with `sbt-native-packager `_ +or `Lightbend ConductR `_. +Please see :ref:`deployment-scenarios` for more information. + +New Cluster Metrics Extension +============================= +Previously, cluster metrics functionality was located in the ``akka-cluster`` jar. +Now it is split out and moved into a separate Akka module: ``akka-cluster-metrics`` jar. +The module comes with few enhancements, such as use of Kamon sigar-loader +for native library provisioning as well as use of statistical averaging of metrics data. +Note that both old and new metrics configuration entries in the ``reference.conf`` +are still in the same name space ``akka.cluster.metrics`` but are not compatible. +Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``, +since it is still enabled in akka-cluster by default (for compatibility with past releases). +Router configuration entries have also changed for the module, they use prefix ``cluster-metrics-``: +``cluster-metrics-adaptive-pool`` and ``cluster-metrics-adaptive-group`` +Metrics extension classes and objects are located in the new package ``akka.cluster.metrics``. +Please see :ref:`Scala `, :ref:`Java ` for more information. + +Cluster tools moved to separate module +====================================== + +The Cluster Singleton, Distributed Pub-Sub, and Cluster Client previously located in the ``akka-contrib`` +jar is now moved to a separate module named ``akka-cluster-tools``. You need to replace this dependency +if you use any of these tools. + +The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster.singleton``, ``akka.cluster.pubsub`` +and ``akka.cluster.client``. + +The configuration properties changed name to ``akka.cluster.pub-sub`` and ``akka.cluster.client``. + +Cluster sharding moved to separate module +========================================= + +The Cluster Sharding previously located in the ``akka-contrib`` jar is now moved to a separate module +named ``akka-cluster-sharding``. You need to replace this dependency if you use Cluster Sharding. + +The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster.sharding``. + +The configuration properties changed name to ``akka.cluster.sharding``. + +ClusterSharding construction +============================ + +Several parameters of the ``start`` method of the ``ClusterSharding`` extension are now defined +in a settings object ``ClusterShardingSettings``. +It can be created from system configuration properties and also amended with API. +These settings can be defined differently per entry type if needed. + +Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method +of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter. + +Entry was renamed to Entity, for example in the ``MessagesExtractor`` in the Java API +and the ``EntityId`` type in the Scala API. + +``idExtractor`` function was renamed to ``extractEntityId``. ``shardResolver`` function +was renamed to ``extractShardId``. + +Cluster Sharding Entry Path Change +================================== +Previously in ``2.3.x`` entries were direct children of the local ``ShardRegion``. In examples the ``persistenceId`` of entries +included ``self.path.parent.name`` to include the cluster type name. + +In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard +type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry. + +Asynchronous ShardAllocationStrategy +==================================== + +The methods of the ``ShardAllocationStrategy`` and ``AbstractShardAllocationStrategy`` in Cluster Sharding +have changed return type to a ``Future`` to support asynchronous decision. For example you can ask an +actor external actor of how to allocate shards or rebalance shards. + +For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or +``akka.dispatch.Futures.successful`` in Java. + +Cluster Sharding internal data +============================== + +The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. +This data can safely be removed when restarting the whole Akka Cluster. + +The serialization format of the internal persistent events stored by the Cluster Sharding coordinator +has been changed and it cannot load old data from 2.3.x or some 2.4 milestone. + +The ``persistenceId`` of the Cluster Sharding coordinator has been changed since 2.3.x so +it should not load such old data, but it can be a problem if you have used a 2.4 +milestone release. In that case you should remove the persistent data that the +Cluster Sharding coordinator stored. Note that this is not application data. + +You can use the :ref:`RemoveInternalClusterShardingData ` +utility program to remove this data. + +The new ``persistenceId`` is ``s"/sharding/${typeName}Coordinator"``. +The old ``persistenceId`` is ``s"/user/sharding/${typeName}Coordinator/singleton/coordinator"``. + +ClusterSingletonManager and ClusterSingletonProxy construction +============================================================== + +Parameters to the ``Props`` factory methods have been moved to settings object ``ClusterSingletonManagerSettings`` +and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also +amended with API as needed. + +The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + +The ``singletonPath`` parameter of ``ClusterSingletonProxy.props`` has changed. It is now named +``singletonManagerPath`` and is the logical path of the singleton manager, e.g. ``/user/singletonManager``, +which ends with the name you defined in ``actorOf`` when creating the ``ClusterSingletonManager``. +In 2.3.x it was the path to singleton instance, which was error-prone because one had to provide both +the name of the singleton manager and the singleton actor. + +DistributedPubSub construction +============================== + +Normally, the ``DistributedPubSubMediator`` actor is started by the ``DistributedPubSubExtension``. +This extension has been renamed to ``DistributedPubSub``. It is also possible to start +it as an ordinary actor if you need multiple instances of it with different settings. +The parameters of the ``Props`` factory methods in the ``DistributedPubSubMediator`` companion +has been moved to settings object ``DistributedPubSubSettings``. This can be created from +system configuration properties and also amended with API as needed. + +ClusterClient construction +========================== + +The parameters of the ``Props`` factory methods in the ``ClusterClient`` companion +has been moved to settings object ``ClusterClientSettings``. This can be created from +system configuration properties and also amended with API as needed. + +The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + +Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``. +This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start +it as an ordinary actor if you need multiple instances of it with different settings. +The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion +has been moved to settings object ``ClusterReceptionistSettings``. This can be created from +system configuration properties and also amended with API as needed. + +The ``ClusterReceptionist`` actor that is started by the ``ClusterReceptionistExtension`` +is now started as a ``system`` actor instead of a ``user`` actor, i.e. the default path for +the ``ClusterClient`` initial contacts has changed to +``"akka.tcp://system@hostname:port/system/receptionist"``. + +ClusterClient sender +==================== + +In 2.3 the ``sender()`` of the response messages, as seen by the client, was the +actor in cluster. + +In 2.4 the ``sender()`` of the response messages, as seen by the client, is ``deadLetters`` +since the client should normally send subsequent messages via the ``ClusterClient``. +It is possible to pass the original sender inside the reply messages if +the client is supposed to communicate directly to the actor in the cluster. + +Akka Persistence +================ + +Experimental removed +-------------------- + +The artifact name has changed from ``akka-persistence-experimental`` to ``akka-persistence``. + +New sbt dependency:: + + "com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@ + +New Maven dependency:: + + + com.typesafe.akka + akka-persistence_@binVersion@ + @version@ + + +The artefact name of the Persistent TCK has changed from ``akka-persistence-tck-experimental`` (``akka-persistence-experimental-tck``) to +``akka-persistence-tck``. + +Mandatory persistenceId +----------------------- + +It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` +and ``AbstractPersistentId``. + +The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical +"which persistent entity this actor represents". + +In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily +implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: + + override def persistenceId = self.path.toStringWithoutAddress + +Failures +-------- + +Backend journal failures during recovery and persist are treated differently than in 2.3.x. The ``PersistenceFailure`` +message is removed and the actor is unconditionally stopped. The new behavior and reasons for it is explained in +:ref:`failures-scala`. + +Persist sequence of events +-------------------------- + +The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and +renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling +the overloaded ``persist(event)`` method. + +non-permanent deletion +---------------------- + +The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes are not supported +any more. Events that were deleted with ``permanent=false`` with older version will +still not be replayed in this version. + +Recover message is gone, replaced by Recovery config +---------------------------------------------------- +Previously the way to cause recover in PersistentActors was sending them a ``Recover()`` message. +Most of the time it was the actor itself sending such message to ``self`` in its ``preStart`` method, +however it was possible to send this message from an external source to any ``PersistentActor`` or ``PresistentView`` +to make it start recovering. + +This style of starting recovery does not fit well with usual Actor best practices: an Actor should be independent +and know about its internal state, and also about its recovery or lack thereof. In order to guide users towards +more independent Actors, the ``Recovery()`` object is now not used as a message, but as configuration option +used by the Actor when it starts. In order to migrate previous code which customised its recovery mode use this example +as reference:: + + // previously + class OldCookieMonster extends PersistentActor { + def preStart() = self ! Recover(toSequenceNr = 42L) + // ... + } + // now: + class NewCookieMonster extends PersistentActor { + override def recovery = Recovery(toSequenceNr = 42L) + // ... + } + +Sender reference of replayed events is deadLetters +-------------------------------------------------- +While undocumented, previously the ``sender()`` of the replayed messages would be the same sender that originally had +sent the message. Since sender is an ``ActorRef`` and those events are often replayed in different incarnations of +actor systems and during the entire lifetime of the app, relying on the existence of this reference is most likely +not going to succeed. In order to avoid bugs in the style of "it worked last week", the ``sender()`` reference is now not +stored, in order to avoid potential bugs which this could have provoked. + +The previous behaviour was never documented explicitly (nor was it a design goal), so it is unlikely that applications +have explicitly relied on this behaviour, however if you find yourself with an application that did exploit this you +should rewrite it to explicitly store the ``ActorPath`` of where such replies during replay may have to be sent to, +instead of relying on the sender reference during replay. + +max-message-batch-size config +----------------------------- + +Configuration property ``akka.persistence.journal.max-message-batch-size`` has been moved into the plugin configuration +section, to allow different values for different journal plugins. See ``reference.conf``. + +akka.persistence.snapshot-store.plugin config +--------------------------------------------- + +The configuration property ``akka.persistence.snapshot-store.plugin`` now by default is empty. To restore the previous +setting add ``akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"`` to your application.conf. +See ``reference.conf``. + +PersistentView is deprecated +---------------------------- + +``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding +query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source`` +to an actor corresponding to a previous ``PersistentView`` actor which are documented in :ref:`stream-integrations-scala` +for Scala and :ref:`Java `. + +The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its +own state (e.g. fromSequenceNr offset). + +.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#Sink_actorRef +.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages +.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#ActorSubscriber + +Persistence Plugin APIs +======================= + +SyncWriteJournal removed +------------------------ + +``SyncWriteJournal`` removed in favor of using ``AsyncWriteJournal``. + +If the storage backend API only supports synchronous, blocking writes, +the methods can still be implemented in terms of the asynchronous API. +Example of how to do that is in included in the +See :ref:`Journal plugin API for Scala ` +or :ref:`Journal plugin API for Java `. + +SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) +-------------------------------------------------------------------------------- +Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, +and failures of deleting a snapshot may have been even silently ignored. + +Now ``SnapshotStore`` must return a ``Future`` representing the deletion of the snapshot. +If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will +be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure`` +will be sent to the actor instead. + +For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent +``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria, +instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages. + +SnapshotStore: Removed 'saved' callback +--------------------------------------- +Snapshot Stores previously were required to implement a ``def saved(meta: SnapshotMetadata): Unit`` method which +would be called upon successful completion of a ``saveAsync`` (``doSaveAsync`` in Java API) snapshot write. + +Currently all journals and snapshot stores perform asynchronous writes and deletes, thus all could potentially benefit +from such callback methods. The only gain these callback give over composing an ``onComplete`` over ``Future`` returned +by the journal or snapshot store is that it is executed in the Actors context, thus it can safely (without additional +synchronization modify its internal state - for example a "pending writes" counter). + +However, this feature was not used by many plugins, and expanding the API to accomodate all callbacks would have grown +the API a lot. Instead, Akka Persistence 2.4.x introduces an additional (optionally overrideable) +``receivePluginInternal:Actor.Receive`` method in the plugin API, which can be used for handling those as well as any custom messages +that are sent to the plugin Actor (imagine use cases like "wake up and continue reading" or custom protocols which your +specialised journal can implement). + +Implementations using the previous feature should adjust their code as follows:: + + // previously + class MySnapshots extends SnapshotStore { + // old API: + // def saved(meta: SnapshotMetadata): Unit = doThings() + + // new API: + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + // completion or failure of the returned future triggers internal messages in receivePluginInternal + val f: Future[Unit] = ??? + + // custom messages can be piped to self in order to be received in receivePluginInternal + f.map(MyCustomMessage(_)) pipeTo self + + f + } + + def receivePluginInternal = { + case SaveSnapshotSuccess(metadata) => doThings() + case MyCustomMessage(data) => doOtherThings() + } + + // ... + } + +SnapshotStore: Java 8 Optional used in Java plugin APIs +------------------------------------------------------- +In places where previously ``akka.japi.Option`` was used in Java APIs, including the return type of ``doLoadAsync``, +the Java 8 provided ``Optional`` type is used now. + +Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to +use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is +slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``). + +Atomic writes +------------- + +``asyncWriteMessages`` takes a ``immutable.Seq[AtomicWrite]`` parameter instead of +``immutable.Seq[PersistentRepr]``. + +Each `AtomicWrite` message contains the single ``PersistentRepr`` that corresponds to the event that was +passed to the ``persist`` method of the ``PersistentActor``, or it contains several ``PersistentRepr`` +that corresponds to the events that were passed to the ``persistAll`` method of the ``PersistentActor``. +All ``PersistentRepr`` of the `AtomicWrite` must be written to the data store atomically, i.e. all or +none must be stored. + +If the journal (data store) cannot support atomic writes of multiple events it should +reject such writes with a ``Try`` ``Failure`` with an ``UnsupportedOperationException`` +describing the issue. This limitation should also be documented by the journal plugin. + +Rejecting writes +---------------- + +``asyncWriteMessages`` returns a ``Future[immutable.Seq[Try[Unit]]]``. + +The journal can signal that it rejects individual messages (``AtomicWrite``) by the returned +`immutable.Seq[Try[Unit]]`. The returned ``Seq`` must have as many elements as the input +``messages`` ``Seq``. Each ``Try`` element signals if the corresponding ``AtomicWrite`` +is rejected or not, with an exception describing the problem. Rejecting a message means it +was not stored, i.e. it must not be included in a later replay. Rejecting a message is +typically done before attempting to store it, e.g. because of serialization error. + +Read the :ref:`API documentation ` of this method for more +information about the semantics of rejections and failures. + +asyncReplayMessages Java API +---------------------------- + +The signature of `asyncReplayMessages` in the Java API changed from ``akka.japi.Procedure`` +to ``java.util.function.Consumer``. + +asyncDeleteMessagesTo +--------------------- + +The ``permanent`` deletion flag was removed. Support for non-permanent deletions was +removed. Events that were deleted with ``permanent=false`` with older version will +still not be replayed in this version. + +References to "replay" in names +------------------------------- +Previously a number of classes and methods used the word "replay" interchangeably with the word "recover". +This lead to slight inconsistencies in APIs, where a method would be called ``recovery``, yet the +signal for a completed recovery was named ``ReplayMessagesSuccess``. + +This is now fixed, and all methods use the same "recovery" wording consistently across the entire API. +The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure`` +has been introduced. + +AtLeastOnceDelivery deliver signature +------------------------------------- +The signature of ``deliver`` changed slightly in order to allow both ``ActorSelection`` and ``ActorPath`` to be +used with it. + +Previously: + + def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit + +Now: + + def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit + def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit + +The Java API remains unchanged and has simply gained the 2nd overload which allows ``ActorSelection`` to be +passed in directly (without converting to ``ActorPath``). + + +Actor system shutdown +--------------------- +``ActorSystem.shutdown``, ``ActorSystem.awaitTermination`` and ``ActorSystem.isTerminated`` has been +deprecated in favor of ``ActorSystem.terminate`` and ``ActorSystem.whenTerminated```. Both returns a +``Future[Terminated]`` value that will complete when the actor system has terminated. + +To get the same behavior as ``ActorSystem.awaitTermination`` block and wait for ``Future[Terminated]`` value +with ``Await.result`` from the Scala standard library. + +To trigger a termination and wait for it to complete: + + import scala.concurrent.duration._ + Await.result(system.terminate(), 10.seconds) + +Be careful to not do any operations on the ``Future[Terminated]`` using the ``system.dispatcher`` +as ``ExecutionContext`` as it will be shut down with the ``ActorSystem``, instead use for example +the Scala standard library context from ``scala.concurrent.ExecutionContext.global``. + +:: + + // import system.dispatcher <- this would not work + import scala.concurrent.ExecutionContext.Implicits.global + + system.terminate().foreach { _ => + println("Actor system was shut down") + } + diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 99bac62ba3..2be9a12bf0 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -4,6 +4,24 @@ Migration Guide 2.4.x to 2.5.x ############################## +Akka Persistence +================ + +Removal of PersistentView +------------------------- + +After being deprecated for a long time, and replaced by :ref:`Persistence Query Java ` +(:ref:`Persistence Query Scala `) ``PersistentView`` has been removed now removed. + +The corresponding query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source`` +to an actor corresponding to a previous ``PersistentView``. There are several alternatives for connecting the ``Source`` +to an actor corresponding to a previous ``PersistentView`` actor which are documented in :ref:`stream-integrations-scala` +for Scala and :ref:`Java `. + +The consuming actor may be a plain ``Actor`` or an ``PersistentActor`` if it needs to store its own state (e.g. ``fromSequenceNr`` offset). + +Please note that Persistence Query is not experimental anymore in Akka ``2.5.0``, so you can safely upgrade to it. + Akka Streams ============ diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index ec9aa90d71..b9315efbbc 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -607,7 +607,7 @@ Router Example with Pool of Remote Deployed Routees Let's take a look at how to use a cluster aware router on single master node that creates and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-scala` -in the contrib module. The ``ClusterSingletonManager`` is started on each node. +in the cluster-tools module. The ``ClusterSingletonManager`` is started on each node. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#create-singleton-manager diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 05103e2967..6290e79912 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -423,29 +423,4 @@ object PersistenceDocSpec { //#safe-shutdown-example-good } - object View { - import akka.actor.Props - - val system: ActorSystem = ??? - - //#view - class MyView extends PersistentView { - override def persistenceId: String = "some-persistence-id" - override def viewId: String = "some-persistence-id-view" - - def receive: Receive = { - case payload if isPersistent => - // handle message from journal... - case payload => - // handle message from user-land... - } - } - //#view - - //#view-update - val view = system.actorOf(Props[MyView]) - view ! Update(await = true) - //#view-update - } - } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 5a3ea8acbc..b72f7b6118 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -504,88 +504,6 @@ For example, if you configure the replay filter for leveldb plugin, it looks lik mode = repair-by-discard-old } -.. _persistent-views: - -Persistent Views -================ - -.. warning:: - - ``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding - query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source`` - to an actor corresponding to a previous ``PersistentView`` actor: - - * `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the - destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow - * `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure - being propagated all the way - * `ActorSubscriber`_ in case you need more fine grained control - - The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its - own state (e.g. fromSequenceNr offset). - -.. _Sink.actorRef: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#Sink_actorRef -.. _mapAsync: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/stages-overview.html#Asynchronous_processing_stages -.. _ActorSubscriber: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#ActorSubscriber - -Persistent views can be implemented by extending the ``PersistentView`` trait and implementing the ``receive`` and the ``persistenceId`` -methods. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view - -The ``persistenceId`` identifies the persistent actor from which the view receives journaled messages. It is not necessary that -the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a -persistent actor is started later and begins to write new messages, by default the corresponding view is updated automatically. - -It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the ``isPersistent`` -method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases -(skip the ``if isPersistent`` check). - -Updates -------- - -The default update interval of all views of an actor system is configurable: - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval - -``PersistentView`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update -interval for a specific view class or view instance. Applications may also trigger additional updates at -any time by sending a view an ``Update`` message. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view-update - -If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the -incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages -following the update request may interleave with the replayed message stream. Automated updates always run with -``await = false``. - -Automated updates of all persistent views of an actor system can be turned off by configuration: - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#auto-update - -Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To -limit the number of replayed messages per update request, applications can configure a custom -``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number -of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message. - -Recovery --------- - -Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a ``Recover`` message -to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. -Further possibilities to customize initial recovery are explained in section :ref:`recovery-scala`. - -.. _persistence-identifiers: - -Identifiers ------------ - -A persistent view must have an identifier that doesn't change across different actor incarnations. -The identifier must be defined with the ``viewId`` method. - -The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its -persistent actor should be shared (which is what applications usually do not want). - .. _snapshots: Snapshots diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala deleted file mode 100644 index 1c5c10d549..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ /dev/null @@ -1,396 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.persistence - -import scala.concurrent.duration._ -import scala.util.control.NonFatal -import akka.actor.AbstractActor -import akka.actor.Actor -import akka.actor.Cancellable -import akka.actor.Stash -import akka.actor.StashFactory -import akka.actor.UntypedActor -import akka.actor.ActorLogging - -/** - * Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with - * all messages from the corresponding persistent id's journal that have not yet been consumed by the view. - * To update a view with messages that have been written after handling this request, another `Update` - * request must be sent to the view. - * - * @param await if `true`, processing of further messages sent to the view will be delayed until the - * incremental message replay, triggered by this update request, completes. If `false`, - * any message sent to the view may interleave with replayed persistent event stream. - * @param replayMax maximum number of messages to replay when handling this update request. Defaults - * to `Long.MaxValue` (i.e. no limit). - */ -@SerialVersionUID(1L) -final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue) - -object Update { - /** - * Java API. - */ - def create() = - Update() - - /** - * Java API. - */ - def create(await: Boolean) = - Update(await) - - /** - * Java API. - */ - def create(await: Boolean, replayMax: Long) = - Update(await, replayMax) -} - -/** - * INTERNAL API - */ -private[akka] object PersistentView { - private final case class ScheduledUpdate(replayMax: Long) -} - -/** - * A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive - * the message stream directly from the Journal. These messages can be processed to update internal state - * in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A - * persistent view can also run on a different node, provided that a replicated journal is used. - * - * Implementation classes refer to a persistent actors' message stream by implementing `persistenceId` - * with the corresponding (shared) identifier value. - * - * Views can also store snapshots of internal state by calling [[PersistentView#autoUpdate]]. The snapshots of a view - * are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered - * to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger - * than the snapshot. Default is to offer the latest saved snapshot. - * - * By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`. - * This method can be overridden by implementation classes to define a view instance-specific update - * interval. The default update interval for all views of an actor system can be configured with the - * `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional - * view updates by sending the view [[Update]] requests. See also methods - * - * - [[PersistentView#autoUpdate]] for turning automated updates on or off - * - [[PersistentView#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle - * - */ -@deprecated("use Persistence Query instead", "2.4") -trait PersistentView extends Actor with Snapshotter with Stash with StashFactory - with PersistenceIdentity with PersistenceRecovery - with ActorLogging { - import PersistentView._ - import JournalProtocol._ - import SnapshotProtocol.LoadSnapshotResult - import context.dispatcher - - private val extension = Persistence(context.system) - private val viewSettings = extension.settings.view - - private[persistence] lazy val journal = extension.journalFor(journalPluginId) - private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId) - - private var schedule: Option[Cancellable] = None - - private var _lastSequenceNr: Long = 0L - private val internalStash = createStash() - private var currentState: State = recoveryStarted(Long.MaxValue) - - /** - * View id is used as identifier for snapshots performed by this [[PersistentView]]. - * This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream. - * - * - * The usual case is to have a *different* id set as `viewId` than `persistenceId`, - * although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots - * based on some average or sum, calculated by this view. - * - * Example: - * {{{ - * class SummingView extends PersistentView { - * override def persistenceId = "count-123" - * override def viewId = "count-123-sum" // this view is performing summing, - * // so this view's snapshots reside under the "-sum" suffixed id - * - * // ... - * } - * }}} - */ - def viewId: String - - /** - * Returns `viewId`. - */ - def snapshotterId: String = viewId - - /** - * If `true`, the currently processed message was persisted (is sent from the Journal). - * If `false`, the currently processed message comes from another actor (from "user-land"). - */ - def isPersistent: Boolean = currentState.recoveryRunning - - /** - * If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`. - * If `false`, applications must explicitly update this view by sending [[Update]] requests. The default - * value can be configured with the `akka.persistence.view.auto-update` configuration key. This method - * can be overridden by implementation classes to return non-default values. - */ - def autoUpdate: Boolean = - viewSettings.autoUpdate - - /** - * The interval for automated updates. The default value can be configured with the - * `akka.persistence.view.auto-update-interval` configuration key. This method can be - * overridden by implementation classes to return non-default values. - */ - def autoUpdateInterval: FiniteDuration = - viewSettings.autoUpdateInterval - - /** - * The maximum number of messages to replay per update. The default value can be configured with the - * `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by - * implementation classes to return non-default values. - */ - def autoUpdateReplayMax: Long = - viewSettings.autoUpdateReplayMax match { - case -1 ⇒ Long.MaxValue - case value ⇒ value - } - - /** - * Highest received sequence number so far or `0L` if this actor hasn't replayed - * any persistent events yet. - */ - def lastSequenceNr: Long = _lastSequenceNr - - /** - * Returns `lastSequenceNr`. - */ - def snapshotSequenceNr: Long = lastSequenceNr - - private def setLastSequenceNr(value: Long): Unit = - _lastSequenceNr = value - - private def updateLastSequenceNr(persistent: PersistentRepr): Unit = - if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr - - override def recovery = Recovery(replayMax = autoUpdateReplayMax) - - /** - * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax` - * messages (following that snapshot). - */ - override def preStart(): Unit = { - startRecovery(recovery) - if (autoUpdate) - schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax))) - } - - private def startRecovery(recovery: Recovery): Unit = { - changeState(recoveryStarted(recovery.replayMax)) - loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) - } - - /** INTERNAL API. */ - override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = - currentState.stateReceive(receive, message) - - /** INTERNAL API. */ - override protected[akka] def aroundPreStart(): Unit = { - // Fail fast on missing plugins. - val j = journal; val s = snapshotStore - super.aroundPreStart() - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - try internalStash.unstashAll() finally super.preRestart(reason, message) - } - - override def postStop(): Unit = { - schedule.foreach(_.cancel()) - super.postStop() - } - - /** - * Called whenever a message replay fails. By default it logs the error. - * Subclass may override to customize logging. - * The `PersistentView` will not stop or throw exception due to this. - * It will try again on next update. - */ - protected def onReplayError(cause: Throwable): Unit = { - log.error(cause, "Persistence view failure when replaying events for persistenceId [{}]. " + - "Last known sequence number [{}]", persistenceId, lastSequenceNr) - } - - private def changeState(state: State): Unit = { - currentState = state - } - - // TODO There are some duplication of the recovery state management here and in Eventsourced.scala, - // but the enhanced PersistentView will not be based on recovery infrastructure, and - // therefore this code will be replaced anyway - - private trait State { - def stateReceive(receive: Receive, message: Any): Unit - def recoveryRunning: Boolean - } - - /** - * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` - * message to the actor's `receive`. Then initiates a message replay, either starting - * from the loaded snapshot or from scratch, and switches to `replayStarted` state. - * All incoming messages are stashed. - * - * @param replayMax maximum number of messages to replay. - */ - private def recoveryStarted(replayMax: Long) = new State { - - override def toString: String = s"recovery started (replayMax = [${replayMax}])" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case LoadSnapshotResult(sso, toSnr) ⇒ - sso.foreach { - case SelectedSnapshot(metadata, snapshot) ⇒ - setLastSequenceNr(metadata.sequenceNr) - PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot)) - } - changeState(replayStarted(await = true)) - journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) - case other ⇒ internalStash.stash() - } - } - - /** - * Processes replayed messages, if any. The actor's `receive` is invoked with the replayed - * events. - * - * If replay succeeds it switches to `initializing` state and requests the highest stored sequence - * number from the journal. - * - * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayError` is called and - * remaining replay events are consumed (ignored). - * - * If processing of a replayed event fails, the exception is caught and - * stored for later and state is changed to `recoveryFailed`. - * - * All incoming messages are stashed when `await` is true. - */ - private def replayStarted(await: Boolean) = new State { - override def toString: String = s"replay started" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(p) ⇒ - try { - updateLastSequenceNr(p) - PersistentView.super.aroundReceive(receive, p.payload) - } catch { - case NonFatal(t) ⇒ - changeState(ignoreRemainingReplay(t)) - } - case _: RecoverySuccess ⇒ - onReplayComplete() - case ReplayMessagesFailure(cause) ⇒ - try onReplayError(cause) finally onReplayComplete() - case ScheduledUpdate(_) ⇒ // ignore - case Update(a, _) ⇒ - if (a) - internalStash.stash() - case other ⇒ - if (await) - internalStash.stash() - else { - try { - PersistentView.super.aroundReceive(receive, other) - } catch { - case NonFatal(t) ⇒ - changeState(ignoreRemainingReplay(t)) - } - } - } - - /** - * Switches to `idle` - */ - private def onReplayComplete(): Unit = { - changeState(idle) - internalStash.unstashAll() - } - } - - /** - * Consumes remaining replayed messages and then throw the exception. - */ - private def ignoreRemainingReplay(cause: Throwable) = new State { - - override def toString: String = "replay failed" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(p) ⇒ - case ReplayMessagesFailure(_) ⇒ - replayCompleted(receive) - // journal couldn't tell the maximum stored sequence number, hence the next - // replay must be a full replay (up to the highest stored sequence number) - // Recover(lastSequenceNr) is sent by preRestart - setLastSequenceNr(Long.MaxValue) - case _: RecoverySuccess ⇒ replayCompleted(receive) - case _ ⇒ internalStash.stash() - } - - def replayCompleted(receive: Receive): Unit = { - // in case the actor resumes the state must be `idle` - changeState(idle) - internalStash.unstashAll() - throw cause - } - } - - /** - * When receiving an [[Update]] request, switches to `replayStarted` state and triggers - * an incremental message replay. Invokes the actor's current behavior for any other - * received message. - */ - private val idle: State = new State { - override def toString: String = "idle" - override def recoveryRunning: Boolean = false - - override def stateReceive(receive: Receive, message: Any): Unit = message match { - case ReplayedMessage(p) ⇒ - // we can get ReplayedMessage here if it was stashed by user during replay - // unwrap the payload - PersistentView.super.aroundReceive(receive, p.payload) - case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax) - case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax) - case other ⇒ PersistentView.super.aroundReceive(receive, other) - } - - def changeStateToReplayStarted(await: Boolean, replayMax: Long): Unit = { - changeState(replayStarted(await)) - journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) - } - } - -} - -/** - * Java API. - * - * @see [[PersistentView]] - */ -@deprecated("use Persistence Query instead", "2.4") -abstract class UntypedPersistentView extends UntypedActor with PersistentView - -/** - * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]) - * - * @see [[PersistentView]] - */ -@deprecated("use Persistence Query instead", "2.4") -abstract class AbstractPersistentView extends AbstractActor with PersistentView diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala deleted file mode 100644 index d560a2b3d3..0000000000 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.persistence - -import akka.actor._ -import akka.persistence.JournalProtocol.ReplayMessages -import akka.testkit._ -import com.typesafe.config.Config - -import scala.concurrent.duration._ - -object PersistentViewSpec { - - private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { - def receiveCommand = { - case msg ⇒ persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" } - } - - override def receiveRecover: Receive = { - case _ ⇒ // do nothing... - } - } - - private class TestPersistentView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends PersistentView { - def this(name: String, probe: ActorRef, interval: FiniteDuration) = - this(name, probe, interval, None) - - def this(name: String, probe: ActorRef) = - this(name, probe, 100.milliseconds) - - override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system) - override val persistenceId: String = name - override val viewId: String = name + "-view" - - var last: String = _ - - def receive = { - case "get" ⇒ - probe ! last - case "boom" ⇒ - throw new TestException("boom") - - case payload if isPersistent && shouldFailOn(payload) ⇒ - throw new TestException("boom") - - case payload if isPersistent ⇒ - last = s"replicated-${payload}-${lastSequenceNr}" - probe ! last - - } - - override def postRestart(reason: Throwable): Unit = { - super.postRestart(reason) - failAt = None - } - - def shouldFailOn(m: Any): Boolean = - failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) } - } - - private class PassiveTestPersistentView(name: String, probe: ActorRef, var failAt: Option[String]) extends PersistentView { - override val persistenceId: String = name - override val viewId: String = name + "-view" - - override def autoUpdate: Boolean = false - override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery - - var last: String = _ - - def receive = { - case "get" ⇒ - probe ! last - case payload if isPersistent && shouldFailOn(payload) ⇒ - throw new TestException("boom") - case payload ⇒ - last = s"replicated-${payload}-${lastSequenceNr}" - } - - override def postRestart(reason: Throwable): Unit = { - super.postRestart(reason) - failAt = None - } - - def shouldFailOn(m: Any): Boolean = - failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) } - - } - - private class ActiveTestPersistentView(name: String, probe: ActorRef) extends PersistentView { - override val persistenceId: String = name - override val viewId: String = name + "-view" - - override def autoUpdateInterval: FiniteDuration = 50.millis - override def autoUpdateReplayMax: Long = 2 - - def receive = { - case payload ⇒ - probe ! s"replicated-${payload}-${lastSequenceNr}" - } - } - - private class BecomingPersistentView(name: String, probe: ActorRef) extends PersistentView { - override def persistenceId = name - override def viewId = name + "-view" - - def receive = Actor.emptyBehavior - - context.become { - case payload ⇒ probe ! s"replicated-${payload}-${lastSequenceNr}" - } - } - - private class StashingPersistentView(name: String, probe: ActorRef) extends PersistentView { - override def persistenceId = name - override def viewId = name + "-view" - - def receive = { - case "other" ⇒ stash() - case "unstash" ⇒ - unstashAll() - context.become { - case msg ⇒ probe ! s"$msg-${lastSequenceNr}" - } - case msg ⇒ stash() - } - } - - private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView { - override val persistenceId: String = name - override val viewId: String = name + "-view" - - def receive = { - case payload if isPersistent ⇒ probe ! s"replicated-${payload}-${lastSequenceNr}" - case payload ⇒ probe ! s"normal-${payload}-${lastSequenceNr}" - } - } - - private class SnapshottingPersistentView(name: String, probe: ActorRef) extends PersistentView { - override val persistenceId: String = name - override val viewId: String = s"${name}-replicator" - - override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) - - var last: String = _ - - def receive = { - case "get" ⇒ - probe ! last - case "snap" ⇒ - saveSnapshot(last) - case "restart" ⇒ - throw new TestException("restart requested") - case SaveSnapshotSuccess(_) ⇒ - probe ! "snapped" - case SnapshotOffer(metadata, snapshot: String) ⇒ - last = snapshot - probe ! last - case payload ⇒ - last = s"replicated-${payload}-${lastSequenceNr}" - probe ! last - } - } -} - -abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { - import akka.persistence.PersistentViewSpec._ - - var persistentActor: ActorRef = _ - var view: ActorRef = _ - - var persistentActorProbe: TestProbe = _ - var viewProbe: TestProbe = _ - - override protected def beforeEach(): Unit = { - super.beforeEach() - - persistentActorProbe = TestProbe() - viewProbe = TestProbe() - - persistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, persistentActorProbe.ref)) - persistentActor ! "a" - persistentActor ! "b" - - persistentActorProbe.expectMsg("a-1") - persistentActorProbe.expectMsg("b-2") - } - - override protected def afterEach(): Unit = { - system.stop(persistentActor) - system.stop(view) - super.afterEach() - } - - def subscribeToReplay(probe: TestProbe): Unit = - system.eventStream.subscribe(probe.ref, classOf[ReplayMessages]) - - "A persistent view" must { - "receive past updates from a persistent actor" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - } - "receive live updates from a persistent actor" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - persistentActor ! "c" - viewProbe.expectMsg("replicated-c-3") - } - "run updates at specified interval" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 2.seconds)) - // initial update is done on start - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - // live updates takes 5 seconds to replicate - persistentActor ! "c" - viewProbe.expectNoMsg(1.second) - viewProbe.expectMsg("replicated-c-3") - } - "run updates on user request" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - persistentActor ! "c" - persistentActorProbe.expectMsg("c-3") - view ! Update(await = false) - viewProbe.expectMsg("replicated-c-3") - } - "run updates on user request and await update" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - persistentActor ! "c" - persistentActorProbe.expectMsg("c-3") - view ! Update(await = true) - view ! "get" - viewProbe.expectMsg("replicated-c-3") - } - "run updates again on failure outside an update cycle" in { - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - view ! "boom" - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - } - "run updates again on failure during an update cycle" in { - persistentActor ! "c" - persistentActorProbe.expectMsg("c-3") - view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds, Some("b"))) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - viewProbe.expectMsg("replicated-c-3") - } - "run size-limited updates on user request" in { - persistentActor ! "c" - persistentActor ! "d" - persistentActor ! "e" - persistentActor ! "f" - - persistentActorProbe.expectMsg("c-3") - persistentActorProbe.expectMsg("d-4") - persistentActorProbe.expectMsg("e-5") - persistentActorProbe.expectMsg("f-6") - - view = system.actorOf(Props(classOf[PassiveTestPersistentView], name, viewProbe.ref, None)) - - view ! Update(await = true, replayMax = 2) - view ! "get" - viewProbe.expectMsg("replicated-b-2") - - view ! Update(await = true, replayMax = 1) - view ! "get" - viewProbe.expectMsg("replicated-c-3") - - view ! Update(await = true, replayMax = 4) - view ! "get" - viewProbe.expectMsg("replicated-f-6") - } - "run size-limited updates automatically" in { - val replayProbe = TestProbe() - - persistentActor ! "c" - persistentActor ! "d" - - persistentActorProbe.expectMsg("c-3") - persistentActorProbe.expectMsg("d-4") - - subscribeToReplay(replayProbe) - - view = system.actorOf(Props(classOf[ActiveTestPersistentView], name, viewProbe.ref)) - - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - viewProbe.expectMsg("replicated-c-3") - viewProbe.expectMsg("replicated-d-4") - - replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _) ⇒ } - replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _) ⇒ } - replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _) ⇒ } - } - "support context.become" in { - view = system.actorOf(Props(classOf[BecomingPersistentView], name, viewProbe.ref)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - } - "check if an incoming message is persistent" in { - persistentActor ! "c" - - persistentActorProbe.expectMsg("c-3") - - view = system.actorOf(Props(classOf[PersistentOrNotTestPersistentView], name, viewProbe.ref)) - - view ! "d" - view ! "e" - - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - viewProbe.expectMsg("replicated-c-3") - viewProbe.expectMsg("normal-d-3") - viewProbe.expectMsg("normal-e-3") - - persistentActor ! "f" - viewProbe.expectMsg("replicated-f-4") - } - "take snapshots" in { - view = system.actorOf(Props(classOf[SnapshottingPersistentView], name, viewProbe.ref)) - viewProbe.expectMsg("replicated-a-1") - viewProbe.expectMsg("replicated-b-2") - view ! "snap" - viewProbe.expectMsg("snapped") - view ! "restart" - persistentActor ! "c" - viewProbe.expectMsg("replicated-b-2") - viewProbe.expectMsg("replicated-c-3") - } - "support stash" in { - view = system.actorOf(Props(classOf[StashingPersistentView], name, viewProbe.ref)) - view ! "other" - view ! "unstash" - viewProbe.expectMsg("a-2") // note that the lastSequenceNumber is 2, since we have replayed b-2 - viewProbe.expectMsg("b-2") - viewProbe.expectMsg("other-2") - } - } -} - -class LeveldbPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentViewSpec")) -class InmemPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("inmem", "InmemPersistentViewSpec")) - diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index f5a6aec1d3..d62f59cb2d 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -572,7 +572,7 @@ akka { } - # DEPRECATED + # DEPRECATED, since 2.5.0 # The netty.udp transport is deprecated, please use Artery instead. # See: http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html netty.udp = ${akka.remote.netty.tcp} diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java deleted file mode 100644 index 5feb973d72..0000000000 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package sample.persistence; - -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; -import akka.persistence.*; -import scala.PartialFunction; -import scala.concurrent.duration.Duration; -import scala.runtime.BoxedUnit; - -import java.util.concurrent.TimeUnit; - -public class ViewExample { - public static class ExamplePersistentActor extends AbstractPersistentActor { - private int count = 1; - - @Override - public String persistenceId() { return "sample-id-4"; } - - @Override - public PartialFunction receiveCommand() { - return ReceiveBuilder. - match(String.class, s -> { - System.out.println(String.format("persistentActor received %s (nr = %d)", s, count)); - persist(s + count, evt -> { - count += 1; - }); - }). - build(); - } - - @Override - public PartialFunction receiveRecover() { - return ReceiveBuilder. - match(String.class, s -> count += 1). - build(); - } - } - - public static class ExampleView extends AbstractPersistentView { - - private int numReplicated = 0; - - @Override public String persistenceId() { return "sample-id-4"; } - @Override public String viewId() { return "sample-view-id-4"; } - - public ExampleView() { - receive(ReceiveBuilder. - match(Object.class, m -> isPersistent(), msg -> { - numReplicated += 1; - System.out.println(String.format("view received %s (num replicated = %d)", - msg, - numReplicated)); - }). - match(SnapshotOffer.class, so -> { - numReplicated = (Integer) so.snapshot(); - System.out.println(String.format("view received snapshot offer %s (metadata = %s)", - numReplicated, - so.metadata())); - }). - match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build() - ); - } - } - - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class)); - final ActorRef view = system.actorOf(Props.create(ExampleView.class)); - - system.scheduler() - .schedule(Duration.Zero(), - Duration.create(2, TimeUnit.SECONDS), - persistentActor, - "scheduled", - system.dispatcher(), - null); - system.scheduler() - .schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); - } -} diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java deleted file mode 100644 index b84ad675d5..0000000000 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java +++ /dev/null @@ -1,78 +0,0 @@ -package sample.persistence; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.japi.Procedure; -import akka.persistence.SnapshotOffer; -import akka.persistence.UntypedPersistentActor; -import akka.persistence.UntypedPersistentView; -import scala.concurrent.duration.Duration; - -import java.util.concurrent.TimeUnit; - -public class PersistentViewExample { - public static class ExamplePersistentActor extends UntypedPersistentActor { - @Override - public String persistenceId() { return "sample-id-4"; } - - private int count = 1; - - @Override - public void onReceiveRecover(Object message) { - if (message instanceof String) { - count += 1; - } else { - unhandled(message); - } - } - - @Override - public void onReceiveCommand(Object message) { - if (message instanceof String) { - String s = (String) message; - System.out.println(String.format("persistentActor received %s (nr = %d)", s, count)); - persist(s + count, new Procedure() { - public void apply(String evt) { - count += 1; - } - }); - } else { - unhandled(message); - } - } - } - - public static class ExampleView extends UntypedPersistentView { - - private int numReplicated = 0; - - @Override public String persistenceId() { return "sample-id-4"; } - @Override public String viewId() { return "sample-view-id-4"; } - - @Override - public void onReceive(Object message) throws Exception { - if (isPersistent()) { - numReplicated += 1; - System.out.println(String.format("view received %s (num replicated = %d)", message, numReplicated)); - } else if (message instanceof SnapshotOffer) { - SnapshotOffer so = (SnapshotOffer)message; - numReplicated = (Integer)so.snapshot(); - System.out.println(String.format("view received snapshot offer %s (metadata = %s)", numReplicated, so.metadata())); - } else if (message.equals("snap")) { - saveSnapshot(numReplicated); - } else { - unhandled(message); - } - } - } - - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class)); - final ActorRef view = system.actorOf(Props.create(ExampleView.class)); - - system.scheduler().schedule(Duration.Zero(), Duration.create(2, TimeUnit.SECONDS), persistentActor, "scheduled", system.dispatcher(), null); - system.scheduler().schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); - } -} diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala deleted file mode 100644 index fb385d9728..0000000000 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala +++ /dev/null @@ -1,62 +0,0 @@ -package sample.persistence - -import scala.concurrent.duration._ - -import akka.actor._ -import akka.persistence._ - -object ViewExample extends App { - class ExamplePersistentActor extends PersistentActor { - override def persistenceId = "sample-id-4" - - var count = 1 - - def receiveCommand: Receive = { - case payload: String => - println(s"persistentActor received ${payload} (nr = ${count})") - persist(payload + count) { evt => - count += 1 - } - } - - def receiveRecover: Receive = { - case _: String => count += 1 - } - } - - class ExampleView extends PersistentView { - private var numReplicated = 0 - - override def persistenceId: String = "sample-id-4" - override def viewId = "sample-view-id-4" - - def receive = { - case "snap" => - println(s"view saving snapshot") - saveSnapshot(numReplicated) - case SnapshotOffer(metadata, snapshot: Int) => - numReplicated = snapshot - println(s"view received snapshot offer ${snapshot} (metadata = ${metadata})") - case payload if isPersistent => - numReplicated += 1 - println(s"view replayed event ${payload} (num replicated = ${numReplicated})") - case SaveSnapshotSuccess(metadata) => - println(s"view saved snapshot (metadata = ${metadata})") - case SaveSnapshotFailure(metadata, reason) => - println(s"view snapshot failure (metadata = ${metadata}), caused by ${reason}") - case payload => - println(s"view received other message ${payload}") - } - - } - - val system = ActorSystem("example") - - val persistentActor = system.actorOf(Props(classOf[ExamplePersistentActor])) - val view = system.actorOf(Props(classOf[ExampleView])) - - import system.dispatcher - - system.scheduler.schedule(Duration.Zero, 2.seconds, persistentActor, "scheduled") - system.scheduler.schedule(Duration.Zero, 5.seconds, view, "snap") -}