diff --git a/akka-docs/src/main/paradox/general/message-delivery-reliability.md b/akka-docs/src/main/paradox/general/message-delivery-reliability.md index ad50a3029b..f3e9b70ea7 100644 --- a/akka-docs/src/main/paradox/general/message-delivery-reliability.md +++ b/akka-docs/src/main/paradox/general/message-delivery-reliability.md @@ -294,7 +294,7 @@ components may consume the event stream as a means to replicate the component’ state on a different continent or to react to changes). If the component’s state is lost—due to a machine failure or by being pushed out of a cache—it can be reconstructed by replaying the event stream (usually employing -snapshots to speed up the process). @ref:[Event sourcing](../persistence.md#event-sourcing) is supported by +snapshots to speed up the process). @ref:[Event sourcing](../typed/persistence.md#event-sourcing-concepts) is supported by Akka Persistence. ### Mailbox with Explicit Acknowledgement diff --git a/akka-docs/src/main/paradox/persistence-journals.md b/akka-docs/src/main/paradox/persistence-journals.md index d19cebbb95..3baf6120c5 100644 --- a/akka-docs/src/main/paradox/persistence-journals.md +++ b/akka-docs/src/main/paradox/persistence-journals.md @@ -4,7 +4,16 @@ Storage backends for journals and snapshot stores are pluggable in the Akka pers A directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see [Community plugins](http://akka.io/community/) This documentation described how to build a new storage backend. -### Journal plugin API +Applications can provide their own plugins by implementing a plugin API and activating them by configuration. +Plugin development requires the following imports: + +Scala +: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #plugin-imports } + +Java +: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #plugin-imports } + +## Journal plugin API A journal plugin extends `AsyncWriteJournal`. @@ -54,7 +63,7 @@ The `plugin-dispatcher` is the dispatcher used for the plugin actor. If not spec Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. -### Snapshot store plugin API +## Snapshot store plugin API A snapshot store plugin must extend the `SnapshotStore` actor and implement the following methods: @@ -86,7 +95,7 @@ The `plugin-dispatcher` is the dispatcher used for the plugin actor. If not spec Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. -### Plugin TCK +## Plugin TCK In order to help developers build correct and high quality storage plugins, we provide a Technology Compatibility Kit ([TCK](http://en.wikipedia.org/wiki/Technology_Compatibility_Kit) for short). @@ -135,12 +144,12 @@ Java We *highly recommend* including these specifications in your test suite, as they cover a broad range of cases you might have otherwise forgotten to test for when writing a plugin from scratch. -### Corrupt event logs +## Corrupt event logs If a journal can't prevent users from running persistent actors with the same `persistenceId` concurrently it is likely that an event log will be corrupted by having events with the same sequence number. -It is recommended that journals should still delivery these events during recovery so that a `replay-filter` can be used to decide what to do about it +It is recommended that journals should still deliver these events during recovery so that a `replay-filter` can be used to decide what to do about it in a journal agnostic way. diff --git a/akka-docs/src/main/paradox/persistence-plugins.md b/akka-docs/src/main/paradox/persistence-plugins.md new file mode 100644 index 0000000000..d860bdaa09 --- /dev/null +++ b/akka-docs/src/main/paradox/persistence-plugins.md @@ -0,0 +1,194 @@ +# Persistence Plugins + +Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension. + +A directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see [Community plugins](http://akka.io/community/) + +Two popular plugins are: + +* [akka-persistence-cassandra](https://doc.akka.io/docs/akka-persistence-cassandra/current/) +* [akka-persistence-jdbc](https://github.com/dnvriend/akka-persistence-jdbc) + +Plugins can be selected either by "default" for all persistent actors, +or "individually", when a persistent actor defines its own set of plugins. + +When a persistent actor does NOT override the `journalPluginId` and `snapshotPluginId` methods, +the persistence extension will use the "default" journal and snapshot-store plugins configured in `reference.conf`: + +``` +akka.persistence.journal.plugin = "" +akka.persistence.snapshot-store.plugin = "" +``` + +However, these entries are provided as empty "", and require explicit user configuration via override in the user `application.conf`. + +* For an example of a journal plugin which writes messages to LevelDB see [Local LevelDB journal](#local-leveldb-journal). +* For an example of a snapshot store plugin which writes snapshots as individual files to the local filesystem see [Local snapshot store](#local-snapshot-store). + +## Eager initialization of persistence plugin + +By default, persistence plugins are started on-demand, as they are used. In some case, however, it might be beneficial +to start a certain plugin eagerly. In order to do that, you should first add `akka.persistence.Persistence` +under the `akka.extensions` key. Then, specify the IDs of plugins you wish to start automatically under +`akka.persistence.journal.auto-start-journals` and `akka.persistence.snapshot-store.auto-start-snapshot-stores`. + +For example, if you want eager initialization for the leveldb journal plugin and the local snapshot store plugin, your configuration should look like this: + +``` +akka { + + extensions = [akka.persistence.Persistence] + + persistence { + + journal { + plugin = "akka.persistence.journal.leveldb" + auto-start-journals = ["akka.persistence.journal.leveldb"] + } + + snapshot-store { + plugin = "akka.persistence.snapshot-store.local" + auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"] + } + + } + +} +``` + +## Pre-packaged plugins + +The Akka Persistence module comes with few built-in persistence plugins, but none of these are suitable +for production usage in an Akka Cluster. + +### Local LevelDB journal + +This plugin writes events to a local LevelDB instance. + +@@@ warning +The LevelDB plugin cannot be used in an Akka Cluster since the storage is in a local file system. +@@@ + +The LevelDB journal plugin config entry is `akka.persistence.journal.leveldb`. Enable this plugin by +defining config property: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #leveldb-plugin-config } + +LevelDB based plugins will also require the following additional dependency declaration: + +@@dependency[sbt,Maven,Gradle] { + group="org.fusesource.leveldbjni" + artifact="leveldbjni-all" + version="1.8" +} + +The default location of LevelDB files is a directory named `journal` in the current working +directory. This location can be changed by configuration where the specified path can be relative or absolute: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #journal-config } + +With this plugin, each actor system runs its own private LevelDB instance. + +One peculiarity of LevelDB is that the deletion operation does not remove messages from the journal, but adds +a "tombstone" for each deleted message instead. In the case of heavy journal usage, especially one including frequent +deletes, this may be an issue as users may find themselves dealing with continuously increasing journal sizes. To +this end, LevelDB offers a special journal compaction function that is exposed via the following configuration: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #compaction-intervals-config } + +### Shared LevelDB journal + +For testing purposes a LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for +example, allows persistent actors to failover to a backup node and continue using the shared journal instance from the +backup node. + +@@@ warning +A shared LevelDB instance is a single point of failure and should therefore only be used for testing +purposes. +@@@ + +@@@ note +This plugin has been supplanted by [Persistence Plugin Proxy](#persistence-plugin-proxy). +@@@ + +A shared LevelDB instance is started by instantiating the `SharedLeveldbStore` actor. + +Scala +: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-creation } + +Java +: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #shared-store-creation } + +By default, the shared instance writes journaled messages to a local directory named `journal` in the current +working directory. The storage location can be changed by configuration: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-config } + +Actor systems that use a shared LevelDB store must activate the `akka.persistence.journal.leveldb-shared` +plugin. + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-journal-config } + +This plugin must be initialized by injecting the (remote) `SharedLeveldbStore` actor reference. Injection is +done by calling the `SharedLeveldbJournal.setStore` method with the actor reference as argument. + +Scala +: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-usage } + +Java +: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #shared-store-usage } + +Internal journal commands (sent by persistent actors) are buffered until injection completes. Injection is idempotent +i.e. only the first injection is used. + +### Local snapshot store + +This plugin writes snapshot files to the local filesystem. + +@@@ warning +The local snapshot store plugin cannot be used in an Akka Cluster since the storage is in a local file system. +@@@ + +The local snapshot store plugin config entry is `akka.persistence.snapshot-store.local`. +Enable this plugin by defining config property: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #leveldb-snapshot-plugin-config } + +The default storage location is a directory named `snapshots` in the current working +directory. This can be changed by configuration where the specified path can be relative or absolute: + +@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #snapshot-config } + +Note that it is not mandatory to specify a snapshot store plugin. If you don't use snapshots +you don't have to configure it. + +### Persistence Plugin Proxy + +For testing purposes a persistence plugin proxy allows sharing of journals and snapshot stores across multiple actor systems (on the same or +on different nodes). This, for example, allows persistent actors to failover to a backup node and continue using the +shared journal instance from the backup node. The proxy works by forwarding all the journal/snapshot store messages to a +single, shared, persistence plugin instance, and therefore supports any use case supported by the proxied plugin. + +@@@ warning +A shared journal/snapshot store is a single point of failure and should therefore only be used for testing +purposes. +@@@ + +The journal and snapshot store proxies are controlled via the `akka.persistence.journal.proxy` and +`akka.persistence.snapshot-store.proxy` configuration entries, respectively. Set the `target-journal-plugin` or +`target-snapshot-store-plugin` keys to the underlying plugin you wish to use (for example: +`akka.persistence.journal.leveldb`). The `start-target-journal` and `start-target-snapshot-store` keys should be +set to `on` in exactly one actor system - this is the system that will instantiate the shared persistence plugin. +Next, the proxy needs to be told how to find the shared plugin. This can be done by setting the `target-journal-address` +and `target-snapshot-store-address` configuration keys, or programmatically by calling the +`PersistencePluginProxy.setTargetLocation` method. + +@@@ note +Akka starts extensions lazily when they are required, and this includes the proxy. This means that in order for the +proxy to work, the persistence plugin on the target node must be instantiated. This can be done by instantiating the +`PersistencePluginProxyExtension` @ref:[extension](extending-akka.md), or by calling the `PersistencePluginProxy.start` method. +@@@ + +@@@ note +The proxied persistence plugin can (and should) be configured using its original configuration keys. +@@@ diff --git a/akka-docs/src/main/paradox/persistence-query.md b/akka-docs/src/main/paradox/persistence-query.md index 4a0bf02096..c01575507f 100644 --- a/akka-docs/src/main/paradox/persistence-query.md +++ b/akka-docs/src/main/paradox/persistence-query.md @@ -14,7 +14,7 @@ This will also add dependency on the @ref[Akka Persistence](persistence.md) modu ## Introduction -Akka persistence query complements @ref:[Persistence](persistence.md) by providing a universal asynchronous stream based +Akka persistence query complements @ref:[Event Sourcing](typed/persistence.md) by providing a universal asynchronous stream based query interface that various journal plugins can implement in order to expose their query capabilities. The most typical use case of persistence query is implementing the so-called query side (also known as "read side") @@ -93,7 +93,7 @@ Java #### EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery -`eventsByPersistenceId` is a query equivalent to replaying a @ref:[PersistentActor](persistence.md#event-sourcing), +`eventsByPersistenceId` is a query equivalent to replaying an @ref:[event sourced actor](typed/persistence.md#event-sourcing-concepts), however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the persistent actor identified by the given `persistenceId`. @@ -116,15 +116,16 @@ The goal of this query is to allow querying for all events which are "tagged" wi That includes the use case to query all domain events of an Aggregate Root type. Please refer to your read journal plugin's documentation to find out if and how it is supported. -Some journals may support tagging of events via an @ref:[Event Adapters](persistence.md#event-adapters) that wraps the events in a -`akka.persistence.journal.Tagged` with the given `tags`. The journal may support other ways of doing tagging - again, -how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter: +Some journals may support @ref:[tagging of events](typed/persistence.md#tagging) or +@ref:[Event Adapters](persistence.md#event-adapters) that wraps the events in a `akka.persistence.journal.Tagged` +with the given `tags`. The journal may support other ways of doing tagging - again, +how exactly this is implemented depends on the used journal. Here is an example of such a tagging with an `EventSourcedBehavior`: Scala -: @@snip [LeveldbPersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala) { #tagger } +: @@snip [BasicPersistentActorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #tagging-query } Java -: @@snip [LeveldbPersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/query/LeveldbPersistenceQueryDocTest.java) { #tagger } +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #tagging-query } @@@ note @@ -137,8 +138,9 @@ on relational databases, yet may be hard to implement efficiently on plain key-v @@@ -In the example below we query all events which have been tagged (we assume this was performed by the write-side using an -@ref:[EventAdapter](persistence.md#event-adapters), or that the journal is smart enough that it can figure out what we mean by this +In the example below we query all events which have been tagged (we assume this was performed by the write-side using +@ref:[tagging of events](typed/persistence.md#tagging) or @ref:[Event Adapters](persistence.md#event-adapters), or +that the journal is smart enough that it can figure out what we mean by this tag - for example if the journal stored the events as json it may try to find those with the field `tag` set to this value etc.). Scala @@ -187,7 +189,7 @@ Java ## Performance and denormalization -When building systems using @ref:[Event sourcing](persistence.md#event-sourcing) and CQRS ([Command & Query Responsibility Segregation](https://msdn.microsoft.com/en-us/library/jj554200.aspx)) techniques +When building systems using @ref:[Event sourcing](typed/persistence.md#event-sourcing-concepts) and CQRS ([Command & Query Responsibility Segregation](https://msdn.microsoft.com/en-us/library/jj554200.aspx)) techniques it is tremendously important to realise that the write-side has completely different needs from the read-side, and separating those concerns into datastores that are optimised for either side makes it possible to offer the best experience for the write and read sides independently. @@ -255,14 +257,14 @@ Scala : @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run } Java -: @@snip [PersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor-run } +: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor-run } Scala : @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor } Java -: @@snip [PersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor } +: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor } ## Query plugins diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index 52dec1f536..5fd5064f94 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -13,16 +13,8 @@ To use Akka Persistence, you must add the following dependency in your project: version="$akka.version$" } -The Akka Persistence extension comes with few built-in persistence plugins, including -in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal. - -LevelDB-based plugins will require the following additional dependency: - -@@dependency[sbt,Maven,Gradle] { - group="org.fusesource.leveldbjni" - artifact="leveldbjni-all" - version="1.8" -} +You also have to select journal plugin and optionally snapshot store plugin, see +@ref:[Persistence Plugins](persistence-plugins.md). ## Sample project @@ -33,27 +25,11 @@ to see what this looks like in practice. ## Introduction -Akka persistence enables stateful actors to persist their state so that it can be recovered when an actor -is either restarted, such as after a JVM crash, by a supervisor or a manual stop-start, or migrated within a cluster. The key concept behind Akka -persistence is that only the _events_ received by the actor are persisted, not the actual state of the actor -(though actor state snapshot support is also available). The events are persisted by appending to storage (nothing is ever mutated) which -allows for very high transaction rates and efficient replication. A stateful actor is recovered by replaying the stored -events to the actor, allowing it to rebuild its state. This can be either the full history of changes -or starting from a checkpoint in a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point -communication with at-least-once message delivery semantics. +See introduction in @ref:[Persistence](typed/persistence.md#introduction) -@@@ note +Akka Persistence also provides point-to-point communication with at-least-once message delivery semantics. -The General Data Protection Regulation (GDPR) requires that personal information must be deleted at the request of users. -Deleting or modifying events that carry personal information would be difficult. Data shredding can be used to forget -information instead of deleting or modifying it. This is achieved by encrypting the data with a key for a given data -subject id (person) and deleting the key when that data subject is to be forgotten. Lightbend's -[GDPR for Akka Persistence](https://doc.akka.io/docs/akka-enhancements/current/gdpr/index.html) -provides tools to facilitate in building GDPR capable systems. - -@@@ - -## Architecture +### Architecture * @scala[`PersistentActor`]@java[`AbstractPersistentActor`]: Is a persistent, stateful actor. It is able to persist events to a journal and can react to them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors. @@ -69,24 +45,9 @@ Replicated journals are available as [Community plugins](http://akka.io/communit used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem. Replicated snapshot stores are available as [Community plugins](http://akka.io/community/) * *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the -development of event sourced applications (see section [Event sourcing](#event-sourcing)). +development of event sourced applications (see section @ref:[Event sourcing](typed/persistence.md#event-sourcing-concepts)). -## Event sourcing - -See an [introduction to EventSourcing](https://msdn.microsoft.com/en-us/library/jj591559.aspx), what follows is -Akka's implementation via persistent actors. - -A persistent actor receives a (non-persistent) command -which is first validated if it can be applied to the current state. Here validation can mean anything, from simple -inspection of a command message's fields up to a conversation with several external services, for example. -If validation succeeds, events are generated from the command, representing the effect of the command. These events -are then persisted and, after successful persistence, used to change the actor's state. When the persistent actor -needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. -In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced -actors may also process commands that do not change application state such as query commands for example. - -Another excellent article about "thinking in Events" is [Events As First-Class Citizens](https://hackernoon.com/events-as-first-class-citizens-8633e8479493) by Randy Shoup. It is a short and recommended read if you're starting -developing Events based applications. +## Example Akka persistence supports event sourcing with the @scala[`PersistentActor` trait]@java[`AbstractPersistentActor` abstract class]. An actor that extends this @scala[trait]@java[class] uses the `persist` method to persist and handle events. The behavior of @scala[a `PersistentActor`]@java[an `AbstractPersistentActor`] @@ -596,34 +557,7 @@ Java ### Replay Filter -There could be cases where event streams are corrupted and multiple writers (i.e. multiple persistent actor instances) -journaled different messages with the same sequence number. -In such a case, you can configure how you filter replayed messages from multiple writers, upon recovery. - -In your configuration, under the `akka.persistence.journal.xxx.replay-filter` section (where `xxx` is your journal plugin id), -you can select the replay filter `mode` from one of the following values: - - * repair-by-discard-old - * fail - * warn - * off - -For example, if you configure the replay filter for leveldb plugin, it looks like this: - -``` -# The replay filter can detect a corrupt event stream by inspecting -# sequence numbers and writerUuid when replaying events. -akka.persistence.journal.leveldb.replay-filter { - # What the filter should do when detecting invalid events. - # Supported values: - # `repair-by-discard-old` : discard events from old writers, - # warning is logged - # `fail` : fail the replay, error is logged - # `warn` : log warning but emit events untouched - # `off` : disable this feature completely - mode = repair-by-discard-old -} -``` +See @ref:[Replay filter](typed/persistence.md#replay-filter) in the documentation of the new API. ## Snapshots @@ -708,23 +642,7 @@ an in memory representation of the snapshot, or in the case of failure to attemp ## Scaling out -In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or -where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can -resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a -cluster and address them by id. - -Akka Persistence is based on the single-writer principle. For a particular `persistenceId` only one `PersistentActor` -instance should be active at one time. If multiple instances were to persist events at the same time, the events would -be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one -active entity (`PersistentActor`) for each id within a data center. Lightbend's -[Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) -supports active-active persistent entities across data centers. - -The [Lagom framework](https://www.lagomframework.com), which is built on top of Akka encodes many of the best practices -around this. For more details see @java[[Managing Data Persistence](https://www.lagomframework.com/documentation/current/java/ES_CQRS.html)] -@scala[[Managing Data Persistence](https://www.lagomframework.com/documentation/current/scala/ES_CQRS.html)] and -@java[[Persistent Entity](https://www.lagomframework.com/documentation/current/java/PersistentEntity.html)] -@scala[[Persistent Entity](https://www.lagomframework.com/documentation/current/scala/PersistentEntity.html)] in the Lagom documentation. +See @ref:[Scaling out](typed/persistence.md#scaling-out) in the documentation of the new API. ## At-Least-Once Delivery @@ -878,199 +796,6 @@ For more advanced schema evolution techniques refer to the @ref:[Persistence - S @@@ -## Storage plugins - -Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension. - -A directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see [Community plugins](http://akka.io/community/) - -Plugins can be selected either by "default" for all persistent actors, -or "individually", when a persistent actor defines its own set of plugins. - -When a persistent actor does NOT override the `journalPluginId` and `snapshotPluginId` methods, -the persistence extension will use the "default" journal and snapshot-store plugins configured in `reference.conf`: - -``` -akka.persistence.journal.plugin = "" -akka.persistence.snapshot-store.plugin = "" -``` - -However, these entries are provided as empty "", and require explicit user configuration via override in the user `application.conf`. -For an example of a journal plugin which writes messages to LevelDB see [Local LevelDB journal](#local-leveldb-journal). -For an example of a snapshot store plugin which writes snapshots as individual files to the local filesystem see [Local snapshot store](#local-snapshot-store). - -Applications can provide their own plugins by implementing a plugin API and activating them by configuration. -Plugin development requires the following imports: - -Scala -: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #plugin-imports } - -Java -: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #plugin-imports } - -### Eager initialization of persistence plugin - -By default, persistence plugins are started on-demand, as they are used. In some case, however, it might be beneficial -to start a certain plugin eagerly. In order to do that, you should first add `akka.persistence.Persistence` -under the `akka.extensions` key. Then, specify the IDs of plugins you wish to start automatically under -`akka.persistence.journal.auto-start-journals` and `akka.persistence.snapshot-store.auto-start-snapshot-stores`. - -For example, if you want eager initialization for the leveldb journal plugin and the local snapshot store plugin, your configuration should look like this: - -``` -akka { - - extensions = [akka.persistence.Persistence] - - persistence { - - journal { - plugin = "akka.persistence.journal.leveldb" - auto-start-journals = ["akka.persistence.journal.leveldb"] - } - - snapshot-store { - plugin = "akka.persistence.snapshot-store.local" - auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"] - } - - } - -} -``` - -## Pre-packaged plugins - -### Local LevelDB journal - -The LevelDB journal plugin config entry is `akka.persistence.journal.leveldb`. It writes messages to a local LevelDB -instance. Enable this plugin by defining config property: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #leveldb-plugin-config } - -LevelDB based plugins will also require the following additional dependency declaration: - -@@dependency[sbt,Maven,Gradle] { - group="org.fusesource.leveldbjni" - artifact="leveldbjni-all" - version="1.8" -} - -The default location of LevelDB files is a directory named `journal` in the current working -directory. This location can be changed by configuration where the specified path can be relative or absolute: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #journal-config } - -With this plugin, each actor system runs its own private LevelDB instance. - -One peculiarity of LevelDB is that the deletion operation does not remove messages from the journal, but adds -a "tombstone" for each deleted message instead. In the case of heavy journal usage, especially one including frequent -deletes, this may be an issue as users may find themselves dealing with continuously increasing journal sizes. To -this end, LevelDB offers a special journal compaction function that is exposed via the following configuration: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #compaction-intervals-config } - -### Shared LevelDB journal - -A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for -example, allows persistent actors to failover to a backup node and continue using the shared journal instance from the -backup node. - -@@@ warning - -A shared LevelDB instance is a single point of failure and should therefore only be used for testing -purposes. Highly-available, replicated journals are available as [Community plugins](http://akka.io/community/). - -@@@ - -@@@ note - -This plugin has been supplanted by [Persistence Plugin Proxy](#persistence-plugin-proxy). - -@@@ - -A shared LevelDB instance is started by instantiating the `SharedLeveldbStore` actor. - -Scala -: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-creation } - -Java -: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #shared-store-creation } - -By default, the shared instance writes journaled messages to a local directory named `journal` in the current -working directory. The storage location can be changed by configuration: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-config } - -Actor systems that use a shared LevelDB store must activate the `akka.persistence.journal.leveldb-shared` -plugin. - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-journal-config } - -This plugin must be initialized by injecting the (remote) `SharedLeveldbStore` actor reference. Injection is -done by calling the `SharedLeveldbJournal.setStore` method with the actor reference as argument. - -Scala -: @@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #shared-store-usage } - -Java -: @@snip [LambdaPersistencePluginDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java) { #shared-store-usage } - -Internal journal commands (sent by persistent actors) are buffered until injection completes. Injection is idempotent -i.e. only the first injection is used. - -### Local snapshot store - -The local snapshot store plugin config entry is `akka.persistence.snapshot-store.local`. It writes snapshot files to -the local filesystem. Enable this plugin by defining config property: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #leveldb-snapshot-plugin-config } - -The default storage location is a directory named `snapshots` in the current working -directory. This can be changed by configuration where the specified path can be relative or absolute: - -@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #snapshot-config } - -Note that it is not mandatory to specify a snapshot store plugin. If you don't use snapshots -you don't have to configure it. - -### Persistence Plugin Proxy - -A persistence plugin proxy allows sharing of journals and snapshot stores across multiple actor systems (on the same or -on different nodes). This, for example, allows persistent actors to failover to a backup node and continue using the -shared journal instance from the backup node. The proxy works by forwarding all the journal/snapshot store messages to a -single, shared, persistence plugin instance, and therefore supports any use case supported by the proxied plugin. - -@@@ warning - -A shared journal/snapshot store is a single point of failure and should therefore only be used for testing -purposes. Highly-available, replicated persistence plugins are available as [Community plugins](http://akka.io/community/). - -@@@ - -The journal and snapshot store proxies are controlled via the `akka.persistence.journal.proxy` and -`akka.persistence.snapshot-store.proxy` configuration entries, respectively. Set the `target-journal-plugin` or -`target-snapshot-store-plugin` keys to the underlying plugin you wish to use (for example: -`akka.persistence.journal.leveldb`). The `start-target-journal` and `start-target-snapshot-store` keys should be -set to `on` in exactly one actor system - this is the system that will instantiate the shared persistence plugin. -Next, the proxy needs to be told how to find the shared plugin. This can be done by setting the `target-journal-address` -and `target-snapshot-store-address` configuration keys, or programmatically by calling the -`PersistencePluginProxy.setTargetLocation` method. - -@@@ note - -Akka starts extensions lazily when they are required, and this includes the proxy. This means that in order for the -proxy to work, the persistence plugin on the target node must be instantiated. This can be done by instantiating the -`PersistencePluginProxyExtension` @ref:[extension](extending-akka.md), or by calling the `PersistencePluginProxy.start` method. - -@@@ - -@@@ note - -The proxied persistence plugin can (and should) be configured using its original configuration keys. - -@@@ - ## Custom serialization Serialization of snapshots and payloads of `Persistent` messages is configurable with Akka's @@ -1109,7 +834,7 @@ Also note that for the LevelDB Java port, you will need the following dependenci @@@ warning -It is not possible to test persistence provided classes (i.e. [PersistentActor](#event-sourcing) +It is not possible to test persistence provided classes (i.e. `PersistentActor` and [AtLeastOnceDelivery](#at-least-once-delivery)) using `TestActorRef` due to its *synchronous* nature. These traits need to be able to perform asynchronous tasks in the background in order to handle internal persistence related events. @@ -1123,6 +848,9 @@ When testing Persistence based projects always rely on @ref:[asynchronous messag There are several configuration properties for the persistence module, please refer to the @ref:[reference configuration](general/configuration.md#config-akka-persistence). +The @ref:[journal and snapshot store plugins](persistence-plugins.md) have specific configuration, see +reference documentation of the chosen plugin. + ## Multiple persistence plugin configurations By default, a persistent actor will use the "default" journal and snapshot store plugins @@ -1169,4 +897,5 @@ Java ## See also * @ref[Persistent FSM](persistence-fsm.md) +* @ref[Building a new storage backend](persistence-plugins.md) * @ref[Building a new storage backend](persistence-journals.md) diff --git a/akka-docs/src/main/paradox/testing.md b/akka-docs/src/main/paradox/testing.md index 2a815546fc..62f8d20964 100644 --- a/akka-docs/src/main/paradox/testing.md +++ b/akka-docs/src/main/paradox/testing.md @@ -850,7 +850,7 @@ instead of using `TestActorRef` whenever possible. Due to the synchronous nature of `TestActorRef` it will **not** work with some support traits that Akka provides as they require asynchronous behaviors to function properly. -Examples of traits that do not mix well with test actor refs are @ref:[PersistentActor](persistence.md#event-sourcing) +Examples of traits that do not mix well with test actor refs are @ref:[PersistentActor](persistence.md#example) and @ref:[AtLeastOnceDelivery](persistence.md#at-least-once-delivery) provided by @ref:[Akka Persistence](persistence.md). @@@ diff --git a/akka-docs/src/main/paradox/typed/persistence-snapshot.md b/akka-docs/src/main/paradox/typed/persistence-snapshot.md index 43f0d6254b..a2adc27577 100644 --- a/akka-docs/src/main/paradox/typed/persistence-snapshot.md +++ b/akka-docs/src/main/paradox/typed/persistence-snapshot.md @@ -2,7 +2,7 @@ ## Snapshots -As you model your domain using @ref:[EventSourced actors](persistence.md), you may notice that some actors may be +As you model your domain using @ref:[event sourced actors](persistence.md), you may notice that some actors may be prone to accumulating extremely long event logs and experiencing long recovery times. Sometimes, the right approach may be to split out into a set of shorter lived actors. However, when this is not an option, you can use snapshots to reduce recovery times drastically. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index e1bd6888c8..85736a03ef 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -7,6 +7,7 @@ * [Persistence schema evolution](../persistence-schema-evolution.md) * [Persistence query](../persistence-query.md) * [Persistence query LevelDB](../persistence-query-leveldb.md) +* [Persistence Journals](../persistence-plugins.md) * [Persistence Journals](../persistence-journals.md) @@@ @@ -27,13 +28,49 @@ To use Akka Persistence Typed, add the module to your project: version=$akka.version$ } +You also have to select journal plugin and optionally snapshot store plugin, see +@ref:[Persistence Plugins](../persistence-plugins.md). + ## Introduction -Akka Persistence is a library for building event sourced actors. For background about how it works -see the @ref:[classic Akka Persistence section](../persistence.md). This documentation shows how the typed API for persistence -works and assumes you know what is meant by `Command`, `Event` and `State`. +Akka Persistence enables stateful actors to persist their state so that it can be recovered when an actor +is either restarted, such as after a JVM crash, by a supervisor or a manual stop-start, or migrated within a cluster. The key concept behind Akka +Persistence is that only the _events_ that are persisted by the actor are stored, not the actual state of the actor +(though actor state snapshot support is also available). The events are persisted by appending to storage (nothing is ever mutated) which +allows for very high transaction rates and efficient replication. A stateful actor is recovered by replaying the stored +events to the actor, allowing it to rebuild its state. This can be either the full history of changes +or starting from a checkpoint in a snapshot which can dramatically reduce recovery times. -## Example +@@@ note + +The General Data Protection Regulation (GDPR) requires that personal information must be deleted at the request of users. +Deleting or modifying events that carry personal information would be difficult. Data shredding can be used to forget +information instead of deleting or modifying it. This is achieved by encrypting the data with a key for a given data +subject id (person) and deleting the key when that data subject is to be forgotten. Lightbend's +[GDPR for Akka Persistence](https://doc.akka.io/docs/akka-enhancements/current/gdpr/index.html) +provides tools to facilitate in building GDPR capable systems. + +@@@ + +### Event sourcing concepts + +See an [introduction to EventSourcing](https://msdn.microsoft.com/en-us/library/jj591559.aspx) at MSDN. + +Another excellent article about "thinking in Events" is [Events As First-Class Citizens](https://hackernoon.com/events-as-first-class-citizens-8633e8479493) +by Randy Shoup. It is a short and recommended read if you're starting developing Events based applications. + +What follows is Akka's implementation via event sourced actors. + +An event sourced actor (also known as a persistent actor) receives a (non-persistent) command +which is first validated if it can be applied to the current state. Here validation can mean anything, from simple +inspection of a command message's fields up to a conversation with several external services, for example. +If validation succeeds, events are generated from the command, representing the effect of the command. These events +are then persisted and, after successful persistence, used to change the actor's state. When the event sourced actor +needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. +In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced +actors may also process commands that do not change application state such as query commands for example. + +## Example and core API Let's start with a simple example. The minimum required for a @apidoc[EventSourcedBehavior] is: @@ -178,7 +215,7 @@ Java ## Cluster Sharding and EventSourcedBehavior -In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or +In a use case where the number of persistent actors needed is higher than what would fit in the memory of one node or where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a cluster and address them by id. @@ -287,7 +324,7 @@ Note that there is only one of these. It is not possible to both persist and say These are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory] and once created additional side effects can be added. -Most of them time this will be done with the `thenRun` method on the `Effect` above. You can factor out +Most of the time this will be done with the `thenRun` method on the `Effect` above. You can factor out common side effects into functions and reuse for several commands. For example: Scala @@ -301,6 +338,18 @@ Java Any side effects are executed on an at-once basis and will not be executed if the persist fails. The side effects are executed sequentially, it is not possible to execute side effects in parallel. +### Atomic writes + +It is possible to store several events atomically by using the `persistAll` effect. That means that all events +passed to that method are stored or none of them are stored if there is an error. + +The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by +`persistAll`. + +Some journals may not support atomic writes of several events and they will then reject the `persistAll` +command. This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` (typically with a +`UnsupportedOperationException`) and can be handled with a @ref[supervisor](fault-tolerance.md). + ## Replies The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for @@ -377,8 +426,22 @@ recommendation if you don't have other preference. ## Recovery -It is strongly discouraged to perform side effects in `applyEvent`, -so side effects should be performed once recovery has completed as a reaction to the `RecoveryCompleted` signal @scala[`receiveSignal` handler] @java[by overriding `receiveSignal`] +An event sourced actor is automatically recovered on start and on restart by replaying journaled events. +New messages sent to the actor during recovery do not interfere with replayed events. +They are stashed and received by the `EventSourcedBehavior` after the recovery phase completes. + +The number of concurrent recoveries that can be in progress at the same time is limited +to not overload the system and the backend data store. When exceeding the limit the actors will wait +until other recoveries have been completed. This is configured by: + +``` +akka.persistence.max-concurrent-recoveries = 50 +``` + +The @ref:[event handler](#event-handler) is used for updating the state when replaying the journaled events. + +It is strongly discouraged to perform side effects in the event handler, so side effects should be performed +once recovery has completed as a reaction to the `RecoveryCompleted` signal @scala[in the `receiveSignal` handler] @java[by overriding `receiveSignal`] Scala : @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #recovery } @@ -388,8 +451,43 @@ Java The `RecoveryCompleted` contains the current `State`. +The actor will always receive a `RecoveryCompleted` signal, even if there are no events +in the journal and the snapshot store is empty, or if it's a new persistent actor with a previously +unused `PersistenceId`. + @ref[Snapshots](persistence-snapshot.md) can be used for optimizing recovery times. +### Replay filter + +There could be cases where event streams are corrupted and multiple writers (i.e. multiple persistent actor instances) +journaled different messages with the same sequence number. +In such a case, you can configure how you filter replayed messages from multiple writers, upon recovery. + +In your configuration, under the `akka.persistence.journal.xxx.replay-filter` section (where `xxx` is your journal plugin id), +you can select the replay filter `mode` from one of the following values: + + * repair-by-discard-old + * fail + * warn + * off + +For example, if you configure the replay filter for leveldb plugin, it looks like this: + +``` +# The replay filter can detect a corrupt event stream by inspecting +# sequence numbers and writerUuid when replaying events. +akka.persistence.journal.leveldb.replay-filter { + # What the filter should do when detecting invalid events. + # Supported values: + # `repair-by-discard-old` : discard events from old writers, + # warning is logged + # `fail` : fail the replay, error is logged + # `warn` : log warning but emit events untouched + # `off` : disable this feature completely + mode = repair-by-discard-old +} +``` + ## Tagging Persistence typed allows you to use event tags without using @ref[`EventAdapter`](../persistence.md#event-adapters): @@ -440,18 +538,22 @@ By default a `EventSourcedBehavior` will stop if an exception is thrown from the any `BackoffSupervisorStrategy`. It is not possible to use the normal supervision wrapping for this as it isn't valid to `resume` a behavior on a journal failure as it is not known if the event was persisted. - Scala : @@snip [BasicPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #supervision } Java : @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #supervision } -## Journal rejections +If there is a problem with recovering the state of the actor from the journal, `RecoveryFailed` signal is +emitted to the @scala[`receiveSignal` handler] @java[`receiveSignal` method] and the actor will be stopped +(or restarted with backoff). + +### Journal rejections Journals can reject events. The difference from a failure is that the journal must decide to reject an event before trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won't be in the journal. -This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md). +This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md). +Not all journal implementations use rejections and treat these kind of problems also as journal failures. ## Stash @@ -495,3 +597,25 @@ processed. It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the `unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`. + +## Scaling out + +In a use case where the number of persistent actors needed is higher than what would fit in the memory of one node or +where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can +resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a +cluster and address them by id. + +Akka Persistence is based on the single-writer principle. For a particular `PersistenceId` only one `EventSourcedBehavior` +instance should be active at one time. If multiple instances were to persist events at the same time, the events would +be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one +active entity (`EventSourcedBehavior`) for each id within a data center. Lightbend's +[Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) +supports active-active persistent entities across data centers. + +## Configuration + +There are several configuration properties for the persistence module, please refer +to the @ref:[reference configuration](../general/configuration.md#config-akka-persistence). + +The @ref:[journal and snapshot store plugins](../persistence-plugins.md) have specific configuration, see +reference documentation of the chosen plugin. diff --git a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java index 757f0bafb2..38c90f6a2a 100644 --- a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java @@ -4,14 +4,13 @@ package jdocs.persistence; -import static akka.pattern.Patterns.ask; - import java.sql.Connection; import java.time.Duration; import java.util.HashSet; import java.util.Set; import akka.NotUsed; +import akka.actor.typed.javadsl.AbstractBehavior; import akka.persistence.query.Sequence; import akka.persistence.query.Offset; import com.typesafe.config.Config; @@ -27,7 +26,6 @@ import org.reactivestreams.Subscriber; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; public class PersistenceQueryDocTest { @@ -60,6 +58,8 @@ public class PersistenceQueryDocTest { } // #advanced-journal-query-types + interface OrderCompleted {} + public // #my-read-journal static class MyReadJournalProvider implements ReadJournalProvider { @@ -266,13 +266,14 @@ public class PersistenceQueryDocTest { // #events-by-tag // assuming journal is able to work with numeric offsets we can: - final Source blueThings = - readJournal.eventsByTag("blue", new Sequence(0L)); + final Source completedOrders = + readJournal.eventsByTag("order-completed", new Sequence(0L)); - // find top 10 blue things: - final CompletionStage> top10BlueThings = - blueThings + // find first 10 completed orders: + final CompletionStage> firstCompleted = + completedOrders .map(EventEnvelope::event) + .collectType(OrderCompleted.class) .take(10) // cancels the query stream after pulling 10 elements .runFold( new ArrayList<>(10), @@ -283,7 +284,8 @@ public class PersistenceQueryDocTest { system); // start another query, from the known offset - Source blue = readJournal.eventsByTag("blue", new Sequence(10)); + Source furtherOrders = + readJournal.eventsByTag("order-completed", new Sequence(10)); // #events-by-tag } @@ -354,7 +356,7 @@ public class PersistenceQueryDocTest { } // #projection-into-different-store-simple-classes - class ExampleStore { + static class ExampleStore { CompletionStage save(Object any) { // ... // #projection-into-different-store-simple-classes @@ -383,7 +385,7 @@ public class PersistenceQueryDocTest { } // #projection-into-different-store - class MyResumableProjection { + static class MyResumableProjection { private final String name; public MyResumableProjection(String name) { @@ -406,40 +408,7 @@ public class PersistenceQueryDocTest { } // #projection-into-different-store - void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { - final ActorSystem system = ActorSystem.create(); - - final MyJavadslReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor( - MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); - - // #projection-into-different-store-actor-run - final Duration timeout = Duration.ofSeconds(3); - - final MyResumableProjection bidProjection = new MyResumableProjection("bid"); - - final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); - final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); - - long startFromOffset = - bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS); - - readJournal - .eventsByTag("bid", new Sequence(startFromOffset)) - .mapAsync( - 8, - envelope -> { - final CompletionStage f = ask(writer, envelope.event(), timeout); - return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher()); - }) - .mapAsync(1, offset -> bidProjection.saveProgress(offset)) - .runWith(Sink.ignore(), system); - } - - // #projection-into-different-store-actor-run - - class ComplexState { + static class ComplexState { boolean readyToSave() { return false; @@ -451,35 +420,4 @@ public class PersistenceQueryDocTest { return new Record(); } } - - // #projection-into-different-store-actor - final class TheOneWhoWritesToQueryJournal extends AbstractActor { - private final ExampleStore store; - - private ComplexState state = new ComplexState(); - - public TheOneWhoWritesToQueryJournal() { - store = new ExampleStore(); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchAny( - message -> { - state = updateState(state, message); - - // example saving logic that requires state to become ready: - if (state.readyToSave()) store.save(Record.of(state)); - }) - .build(); - } - - ComplexState updateState(ComplexState state, Object msg) { - // some complicated aggregation logic here ... - return state; - } - } - // #projection-into-different-store-actor - } diff --git a/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java b/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java new file mode 100644 index 0000000000..c30e49a5d3 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.persistence; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.Adapter; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.actor.typed.javadsl.AskPattern; +import akka.persistence.query.PersistenceQuery; +import akka.persistence.query.Sequence; +import akka.stream.javadsl.Sink; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static jdocs.persistence.PersistenceQueryDocTest.*; + +public interface ResumableProjectionExample { + + public static void runQuery( + ActorSystem system, ActorRef writer) + throws Exception { + + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(Adapter.toClassic(system)) + .getReadJournalFor( + MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); + + // #projection-into-different-store-actor-run + final Duration timeout = Duration.ofSeconds(3); + + final MyResumableProjection bidProjection = new MyResumableProjection("bid"); + + long startFromOffset = + bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS); + + readJournal + .eventsByTag("bid", new Sequence(startFromOffset)) + .mapAsync( + 8, + envelope -> { + final CompletionStage f = + AskPattern.ask( + writer, + (ActorRef replyTo) -> + new TheOneWhoWritesToQueryJournal.Update(envelope.event(), replyTo), + timeout, + system.scheduler()); + return f.thenApplyAsync(in -> envelope.offset(), system.executionContext()); + }) + .mapAsync(1, offset -> bidProjection.saveProgress(offset)) + .runWith(Sink.ignore(), system); + } + + // #projection-into-different-store-actor-run + + // #projection-into-different-store-actor + static final class TheOneWhoWritesToQueryJournal + extends AbstractBehavior { + + interface Command {} + + static class Update implements Command { + public final Object payload; + public final ActorRef replyTo; + + Update(Object payload, ActorRef replyTo) { + this.payload = payload; + this.replyTo = replyTo; + } + } + + public static Behavior create(String id, ExampleStore store) { + return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(store)); + } + + private final ExampleStore store; + + private ComplexState state = new ComplexState(); + + private TheOneWhoWritesToQueryJournal(ExampleStore store) { + this.store = store; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder().onMessage(Update.class, this::onUpdate).build(); + } + + private Behavior onUpdate(Update msg) { + state = updateState(state, msg); + if (state.readyToSave()) store.save(Record.of(state)); + return this; + } + + ComplexState updateState(ComplexState state, Update msg) { + // some complicated aggregation logic here ... + return state; + } + } + // #projection-into-different-store-actor + +} diff --git a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala index 78f8ad6e75..da5a323e9f 100644 --- a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -12,11 +12,14 @@ import akka.stream.javadsl import akka.testkit.AkkaSpec import akka.util.Timeout import org.reactivestreams.Subscriber - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ + +import akka.Done +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors import com.typesafe.config.Config object PersistenceQueryDocSpec { @@ -134,7 +137,7 @@ object PersistenceQueryDocSpec { def readyToSave = false } case class Record(any: Any) - class DummyStore { def save(record: Record) = Future.successful(42L) } + class ExampleStore { def save(record: Record) = Future.successful(42L) } val JournalId = "akka.persistence.query.my-read-journal" @@ -164,24 +167,31 @@ object PersistenceQueryDocSpec { //#projection-into-different-store-rs } + import akka.actor.typed.ActorRef //#projection-into-different-store-actor - class TheOneWhoWritesToQueryJournal(id: String) extends Actor { - val store = new DummyStore() + object TheOneWhoWritesToQueryJournal { - var state: ComplexState = ComplexState() + sealed trait Command + final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command - def receive = { - case m => - state = updateState(state, m) - if (state.readyToSave) store.save(Record(state)) + def apply(id: String, store: ExampleStore): Behavior[Command] = { + updated(ComplexState(), store) } - def updateState(state: ComplexState, msg: Any): ComplexState = { + private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = { + Behaviors.receiveMessage { + case command: Update => + val newState = updateState(state, command) + if (state.readyToSave) store.save(Record(state)) + updated(newState, store) + } + } + + private def updateState(state: ComplexState, command: Command): ComplexState = { // some complicated aggregation logic here ... state } } - //#projection-into-different-store-actor } @@ -222,21 +232,24 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { readJournal.currentPersistenceIds() //#all-persistence-ids-snap + trait OrderCompleted + //#events-by-tag // assuming journal is able to work with numeric offsets we can: - val blueThings: Source[EventEnvelope, NotUsed] = - readJournal.eventsByTag("blue", Offset.noOffset) + val completedOrders: Source[EventEnvelope, NotUsed] = + readJournal.eventsByTag("order-completed", Offset.noOffset) - // find top 10 blue things: - val top10BlueThings: Future[Vector[Any]] = - blueThings + // find first 10 completed orders: + val firstCompleted: Future[Vector[OrderCompleted]] = + completedOrders .map(_.event) + .collectType[OrderCompleted] .take(10) // cancels the query stream after pulling 10 elements - .runFold(Vector.empty[Any])(_ :+ _) + .runFold(Vector.empty[OrderCompleted])(_ :+ _) // start another query, from the known offset - val furtherBlueThings = readJournal.eventsByTag("blue", offset = Sequence(10)) + val furtherOrders = readJournal.eventsByTag("order-completed", offset = Sequence(10)) //#events-by-tag //#events-by-persistent-id @@ -271,29 +284,36 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#projection-into-different-store class RunWithActor { - val readJournal = - PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId) + import akka.actor.typed.ActorSystem + import akka.actor.typed.ActorRef + import akka.actor.typed.scaladsl.adapter._ + import akka.actor.typed.scaladsl.AskPattern._ //#projection-into-different-store-actor-run - import akka.pattern.ask - import system.dispatcher - implicit val timeout = Timeout(3.seconds) + def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = { - val bidProjection = new MyResumableProjection("bid") + val readJournal = + PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId) - val writerProps = Props(classOf[TheOneWhoWritesToQueryJournal], "bid") - val writer = system.actorOf(writerProps, "bid-projection-writer") + import system.executionContext + implicit val scheduler = system.scheduler + implicit val timeout = Timeout(3.seconds) - bidProjection.latestOffset.foreach { startFromOffset => - readJournal - .eventsByTag("bid", Sequence(startFromOffset)) - .mapAsync(8) { envelope => - (writer ? envelope.event).map(_ => envelope.offset) - } - .mapAsync(1) { offset => - bidProjection.saveProgress(offset) - } - .runWith(Sink.ignore) + val bidProjection = new MyResumableProjection("bid") + + bidProjection.latestOffset.foreach { startFromOffset => + readJournal + .eventsByTag("bid", Sequence(startFromOffset)) + .mapAsync(8) { envelope => + writer + .ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo)) + .map(_ => envelope.offset) + } + .mapAsync(1) { offset => + bidProjection.saveProgress(offset) + } + .runWith(Sink.ignore) + } } //#projection-into-different-store-actor-run } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index fd1a45e932..96e2ddcd0b 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -26,6 +26,7 @@ import akka.persistence.typed.javadsl.SignalHandler; import java.time.Duration; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -309,6 +310,42 @@ public class BasicPersistentBehaviorTest { // #wrapPersistentBehavior } + interface TaggingQuery { + + public abstract class MyPersistentBehavior + extends EventSourcedBehavior< + MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> { + + interface Command {} + + interface Event {} + + interface OrderCompleted extends Event {} + + public static class State {} + + MyPersistentBehavior(String entityId) { + super(PersistenceId.of("ShoppingCart", entityId)); + this.entityId = entityId; + } + + // #tagging-query + private final String entityId; + + public static final int NUMBER_OF_ENTITY_GROUPS = 10; + + @Override + public Set tagsFor(Event event) { + String entityGroup = "group-" + Math.abs(entityId.hashCode() % NUMBER_OF_ENTITY_GROUPS); + Set tags = new HashSet<>(); + tags.add(entityGroup); + if (event instanceof OrderCompleted) tags.add("order-completed"); + return tags; + } + // #tagging-query + } + } + interface Snapshotting { public class MyPersistentBehavior diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 3b568a9349..d098b0f3ba 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -121,6 +121,31 @@ object BasicPersistentBehaviorCompileOnly { //#tagging } + object TaggingBehavior2 { + sealed trait OrderCompleted extends Event + + //#tagging-query + val NumberOfEntityGroups = 10 + + def tagEvent(entityId: String, event: Event): Set[String] = { + val entityGroup = s"group-${math.abs(entityId.hashCode % NumberOfEntityGroups)}" + event match { + case _: OrderCompleted => Set(entityGroup, "order-completed") + case _ => Set(entityGroup) + } + } + + def apply(entityId: String): Behavior[Command] = { + EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId("ShoppingCart", entityId), + emptyState = State(), + commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), + eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) + .withTagger(event => tagEvent(entityId, event)) + } + //#tagging-query + } + object WrapBehavior { def apply(): Behavior[Command] = //#wrapPersistentBehavior