Identical contents in scala/persistence-query.md and java/persistence-query.md
This commit is contained in:
parent
1d2748cad2
commit
af3b3d3732
3 changed files with 214 additions and 56 deletions
|
|
@ -14,15 +14,23 @@ recommend (in the spirit of CQRS) of splitting up the write/read sides into sepa
|
|||
|
||||
Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:
|
||||
|
||||
@@@vars
|
||||
```
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence-query_$scala.binary_version$</artifactId>
|
||||
<version>$akka.version$</version>
|
||||
</dependency>
|
||||
```
|
||||
@@@
|
||||
Scala
|
||||
: @@@vars
|
||||
```
|
||||
"com.typesafe.akka" %% "akka-persistence-query" % "$akka.version$"
|
||||
```
|
||||
@@@
|
||||
|
||||
Java
|
||||
: @@@vars
|
||||
```
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence-query_$scala.binary_version$</artifactId>
|
||||
<version>$akka.version$</version>
|
||||
</dependency>
|
||||
```
|
||||
@@@
|
||||
|
||||
## Design overview
|
||||
|
||||
|
|
@ -44,10 +52,14 @@ Read journals are implemented as [Community plugins](http://akka.io/community/#p
|
|||
databases). For example, given a library that provides a `akka.persistence.query.my-read-journal` obtaining the related
|
||||
journal is as simple as:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #basic-usage }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #basic-usage }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #basic-usage }
|
||||
|
||||
Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via
|
||||
`getJournalFor(NoopJournal.class, NoopJournal.identifier)`, however this is not enforced.
|
||||
@scala[`readJournalFor[NoopJournal](NoopJournal.identifier)`]@java[`getJournalFor(NoopJournal.class, NoopJournal.identifier)`], however this is not enforced.
|
||||
|
||||
Read journal implementations are available as [Community plugins](http://akka.io/community/#plugins-to-akka-persistence-query).
|
||||
|
||||
|
|
@ -73,19 +85,31 @@ The predefined queries are:
|
|||
By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new
|
||||
persistence ids as they come into the system:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-live }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-live }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-live }
|
||||
|
||||
If your usage does not require a live stream, you can use the `currentPersistenceIds` query:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-snap }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-snap }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-snap }
|
||||
|
||||
#### EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery
|
||||
|
||||
`eventsByPersistenceId` is a query equivalent to replaying a @ref:[PersistentActor](persistence.md#event-sourcing),
|
||||
`eventsByPersistenceId` is a query equivalent to replaying a @ref:[PersistentActor](persistence.md#event-sourcing),
|
||||
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
|
||||
persistent actor identified by the given `persistenceId`.
|
||||
persistent actor identified by the given `persistenceId`.
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-persistent-id }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-persistent-id }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-persistent-id }
|
||||
|
||||
Most journals will have to revert to polling in order to achieve this,
|
||||
which can typically be configured with a `refresh-interval` configuration property.
|
||||
|
|
@ -104,7 +128,11 @@ Some journals may support tagging of events via an @ref:[Event Adapters](persist
|
|||
`akka.persistence.journal.Tagged` with the given `tags`. The journal may support other ways of doing tagging - again,
|
||||
how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter:
|
||||
|
||||
@@snip [LeveldbPersistenceQueryDocTest.java]($code$/java/jdocs/persistence/query/LeveldbPersistenceQueryDocTest.java) { #tagger }
|
||||
Scala
|
||||
: @@snip [LeveldbPersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala) { #tagger }
|
||||
|
||||
Java
|
||||
: @@snip [LeveldbPersistenceQueryDocTest.java]($code$/java/jdocs/persistence/query/LeveldbPersistenceQueryDocTest.java) { #tagger }
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -121,7 +149,11 @@ In the example below we query all events which have been tagged (we assume this
|
|||
@ref:[EventAdapter](persistence.md#event-adapters), or that the journal is smart enough that it can figure out what we mean by this
|
||||
tag - for example if the journal stored the events as json it may try to find those with the field `tag` set to this value etc.).
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-tag }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-tag }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-tag }
|
||||
|
||||
As you can see, we can use all the usual stream combinators available from @ref:[Streams](stream/index.md) on the resulting query stream,
|
||||
including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in `EventsByTag`
|
||||
|
|
@ -141,11 +173,25 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered
|
|||
is defined as the second type parameter of the returned `Source`, which allows journals to provide users with their
|
||||
specialised query object, as demonstrated in the sample below:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-types }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-types }
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-definition }
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-types }
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-usage }
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-definition }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-definition }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-usage }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-usage }
|
||||
|
||||
## Performance and denormalization
|
||||
|
||||
|
|
@ -176,7 +222,11 @@ it may be more efficient or interesting to query it (instead of the source event
|
|||
If the read datastore exposes a [Reactive Streams](http://reactive-streams.org) interface then implementing a simple projection
|
||||
is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-rs }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-rs }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-rs }
|
||||
|
||||
### Materialize view using mapAsync
|
||||
|
||||
|
|
@ -186,9 +236,18 @@ you may have to implement the write logic using plain functions or Actors instea
|
|||
In case your write logic is state-less and you just need to convert the events from one data type to another
|
||||
before writing into the alternative datastore, then the projection is as simple as:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple-classes }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple-classes }
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple }
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple-classes }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple }
|
||||
|
||||
### Resumable projections
|
||||
|
||||
|
|
@ -200,9 +259,18 @@ The example below additionally highlights how you would use Actors to implement
|
|||
you need to do some complex logic that would be best handled inside an Actor before persisting the event
|
||||
into the other datastore:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor-run }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run }
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor }
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor-run }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor }
|
||||
|
||||
<a id="read-journal-plugin-api"></a>
|
||||
## Query plugins
|
||||
|
|
@ -227,18 +295,26 @@ their exposed semantics as well as handled query scenarios.
|
|||
A read journal plugin must implement `akka.persistence.query.ReadJournalProvider` which
|
||||
creates instances of `akka.persistence.query.scaladsl.ReadJournal` and
|
||||
`akka.persistence.query.javaadsl.ReadJournal`. The plugin must implement both the `scaladsl`
|
||||
and the `javadsl` interfaces because the `akka.stream.scaladsl.Source` and
|
||||
and the `javadsl` @scala[traits]@java[interfaces] because the `akka.stream.scaladsl.Source` and
|
||||
`akka.stream.javadsl.Source` are different types and even though those types can easily be converted
|
||||
to each other it is most convenient for the end user to get access to the Java or Scala `Source` directly.
|
||||
As illustrated below one of the implementations can delegate to the other.
|
||||
|
||||
Below is a simple journal implementation:
|
||||
|
||||
@@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #my-read-journal }
|
||||
|
||||
And the `EventsByTag` could be backed by such an Actor for example:
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal }
|
||||
|
||||
@@snip [MyEventsByTagJavaPublisher.java]($code$/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java) { #events-by-tag-publisher }
|
||||
And the `eventsByTag` could be backed by such an Actor for example:
|
||||
|
||||
Scala
|
||||
: @@snip [MyEventsByTagPublisher.scala]($code$/scala/docs/persistence/query/MyEventsByTagPublisher.scala) { #events-by-tag-publisher }
|
||||
|
||||
Java
|
||||
: @@snip [MyEventsByTagJavaPublisher.java]($code$/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java) { #events-by-tag-publisher }
|
||||
|
||||
The `ReadJournalProvider` class must have a constructor with one of these signatures:
|
||||
|
||||
|
|
@ -254,7 +330,7 @@ If the underlying datastore only supports queries that are completed when they r
|
|||
end of the "result set", the journal has to submit new queries after a while in order
|
||||
to support "infinite" event streams that include events stored after the initial query
|
||||
has completed. It is recommended that the plugin use a configuration property named
|
||||
`refresh-interval` for defining such a refresh interval.
|
||||
`refresh-interval` for defining such a refresh interval.
|
||||
|
||||
### Plugin TCK
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,23 @@ recommend (in the spirit of CQRS) of splitting up the write/read sides into sepa
|
|||
|
||||
Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:
|
||||
|
||||
@@@vars
|
||||
```
|
||||
"com.typesafe.akka" %% "akka-persistence-query" % "$akka.version$"
|
||||
```
|
||||
@@@
|
||||
Scala
|
||||
: @@@vars
|
||||
```
|
||||
"com.typesafe.akka" %% "akka-persistence-query" % "$akka.version$"
|
||||
```
|
||||
@@@
|
||||
|
||||
Java
|
||||
: @@@vars
|
||||
```
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence-query_$scala.binary_version$</artifactId>
|
||||
<version>$akka.version$</version>
|
||||
</dependency>
|
||||
```
|
||||
@@@
|
||||
|
||||
## Design overview
|
||||
|
||||
|
|
@ -40,10 +52,14 @@ Read journals are implemented as [Community plugins](http://akka.io/community/#p
|
|||
databases). For example, given a library that provides a `akka.persistence.query.my-read-journal` obtaining the related
|
||||
journal is as simple as:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #basic-usage }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #basic-usage }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #basic-usage }
|
||||
|
||||
Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via
|
||||
`readJournalFor[NoopJournal](NoopJournal.identifier)`, however this is not enforced.
|
||||
@scala[`readJournalFor[NoopJournal](NoopJournal.identifier)`]@java[`getJournalFor(NoopJournal.class, NoopJournal.identifier)`], however this is not enforced.
|
||||
|
||||
Read journal implementations are available as [Community plugins](http://akka.io/community/#plugins-to-akka-persistence-query).
|
||||
|
||||
|
|
@ -69,11 +85,19 @@ The predefined queries are:
|
|||
By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new
|
||||
persistence ids as they come into the system:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-live }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-live }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-live }
|
||||
|
||||
If your usage does not require a live stream, you can use the `currentPersistenceIds` query:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-snap }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #all-persistence-ids-snap }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #all-persistence-ids-snap }
|
||||
|
||||
#### EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery
|
||||
|
||||
|
|
@ -81,7 +105,11 @@ If your usage does not require a live stream, you can use the `currentPersistenc
|
|||
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
|
||||
persistent actor identified by the given `persistenceId`.
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-persistent-id }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-persistent-id }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-persistent-id }
|
||||
|
||||
Most journals will have to revert to polling in order to achieve this,
|
||||
which can typically be configured with a `refresh-interval` configuration property.
|
||||
|
|
@ -100,7 +128,11 @@ Some journals may support tagging of events via an @ref:[Event Adapters](persist
|
|||
`akka.persistence.journal.Tagged` with the given `tags`. The journal may support other ways of doing tagging - again,
|
||||
how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter:
|
||||
|
||||
@@snip [LeveldbPersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala) { #tagger }
|
||||
Scala
|
||||
: @@snip [LeveldbPersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala) { #tagger }
|
||||
|
||||
Java
|
||||
: @@snip [LeveldbPersistenceQueryDocTest.java]($code$/java/jdocs/persistence/query/LeveldbPersistenceQueryDocTest.java) { #tagger }
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -117,7 +149,11 @@ In the example below we query all events which have been tagged (we assume this
|
|||
@ref:[EventAdapter](persistence.md#event-adapters), or that the journal is smart enough that it can figure out what we mean by this
|
||||
tag - for example if the journal stored the events as json it may try to find those with the field `tag` set to this value etc.).
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-tag }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #events-by-tag }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #events-by-tag }
|
||||
|
||||
As you can see, we can use all the usual stream combinators available from @ref:[Streams](stream/index.md) on the resulting query stream,
|
||||
including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in `EventsByTag`
|
||||
|
|
@ -137,11 +173,25 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered
|
|||
is defined as the second type parameter of the returned `Source`, which allows journals to provide users with their
|
||||
specialised query object, as demonstrated in the sample below:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-types }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-types }
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-definition }
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-types }
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-usage }
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-definition }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-definition }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #advanced-journal-query-usage }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #advanced-journal-query-usage }
|
||||
|
||||
## Performance and denormalization
|
||||
|
||||
|
|
@ -172,7 +222,11 @@ it may be more efficient or interesting to query it (instead of the source event
|
|||
If the read datastore exposes a [Reactive Streams](http://reactive-streams.org) interface then implementing a simple projection
|
||||
is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-rs }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-rs }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-rs }
|
||||
|
||||
### Materialize view using mapAsync
|
||||
|
||||
|
|
@ -182,7 +236,18 @@ you may have to implement the write logic using plain functions or Actors instea
|
|||
In case your write logic is state-less and you just need to convert the events from one data type to another
|
||||
before writing into the alternative datastore, then the projection is as simple as:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple-classes }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple-classes }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-simple }
|
||||
|
||||
### Resumable projections
|
||||
|
||||
|
|
@ -194,9 +259,18 @@ The example below additionally highlights how you would use Actors to implement
|
|||
you need to do some complex logic that would be best handled inside an Actor before persisting the event
|
||||
into the other datastore:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run }
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor }
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor-run }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #projection-into-different-store-actor }
|
||||
|
||||
<a id="read-journal-plugin-api"></a>
|
||||
## Query plugins
|
||||
|
|
@ -221,18 +295,26 @@ their exposed semantics as well as handled query scenarios.
|
|||
A read journal plugin must implement `akka.persistence.query.ReadJournalProvider` which
|
||||
creates instances of `akka.persistence.query.scaladsl.ReadJournal` and
|
||||
`akka.persistence.query.javaadsl.ReadJournal`. The plugin must implement both the `scaladsl`
|
||||
and the `javadsl` traits because the `akka.stream.scaladsl.Source` and
|
||||
and the `javadsl` @scala[traits]@java[interfaces] because the `akka.stream.scaladsl.Source` and
|
||||
`akka.stream.javadsl.Source` are different types and even though those types can easily be converted
|
||||
to each other it is most convenient for the end user to get access to the Java or Scala directly.
|
||||
to each other it is most convenient for the end user to get access to the Java or Scala `Source` directly.
|
||||
As illustrated below one of the implementations can delegate to the other.
|
||||
|
||||
Below is a simple journal implementation:
|
||||
|
||||
@@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #my-read-journal }
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #my-read-journal }
|
||||
|
||||
Java
|
||||
: @@snip [PersistenceQueryDocTest.java]($code$/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal }
|
||||
|
||||
And the `eventsByTag` could be backed by such an Actor for example:
|
||||
|
||||
@@snip [MyEventsByTagPublisher.scala]($code$/scala/docs/persistence/query/MyEventsByTagPublisher.scala) { #events-by-tag-publisher }
|
||||
Scala
|
||||
: @@snip [MyEventsByTagPublisher.scala]($code$/scala/docs/persistence/query/MyEventsByTagPublisher.scala) { #events-by-tag-publisher }
|
||||
|
||||
Java
|
||||
: @@snip [MyEventsByTagJavaPublisher.java]($code$/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java) { #events-by-tag-publisher }
|
||||
|
||||
The `ReadJournalProvider` class must have a constructor with one of these signatures:
|
||||
|
||||
|
|
|
|||
|
|
@ -308,11 +308,11 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
|||
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](
|
||||
"akka.persistence.query.my-read-journal")
|
||||
|
||||
//#projection-into-different-store-simple
|
||||
//#projection-into-different-store-simple-classes
|
||||
trait ExampleStore {
|
||||
def save(event: Any): Future[Unit]
|
||||
}
|
||||
//#projection-into-different-store-simple
|
||||
//#projection-into-different-store-simple-classes
|
||||
|
||||
//#projection-into-different-store-simple
|
||||
val store: ExampleStore = ???
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue