Merge pull request #18374 from akka/wip-18360-tagged-patriknw
!per #18360 Move Tagged to akka.persistence.journal
This commit is contained in:
commit
9d3eea3287
14 changed files with 66 additions and 30 deletions
|
|
@ -11,7 +11,7 @@ import scala.runtime.BoxedUnit;
|
|||
import akka.actor.ActorSystem;
|
||||
import akka.persistence.journal.WriteEventAdapter;
|
||||
import akka.persistence.journal.EventSeq;
|
||||
import akka.persistence.journal.leveldb.Tagged;
|
||||
import akka.persistence.journal.Tagged;
|
||||
import akka.persistence.query.AllPersistenceIds;
|
||||
import akka.persistence.query.EventEnvelope;
|
||||
import akka.persistence.query.EventsByPersistenceId;
|
||||
|
|
@ -67,7 +67,7 @@ public class LeveldbPersistenceQueryDocTest {
|
|||
|
||||
static
|
||||
//#tagger
|
||||
public class MyEventAdapter implements WriteEventAdapter {
|
||||
public class MyTaggingEventAdapter implements WriteEventAdapter {
|
||||
|
||||
@Override
|
||||
public Object toJournal(Object event) {
|
||||
|
|
|
|||
|
|
@ -69,8 +69,10 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
|
|||
.build());
|
||||
}
|
||||
|
||||
public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) {
|
||||
return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval));
|
||||
public static Props props(Connection conn, String tag, Long offset,
|
||||
FiniteDuration refreshInterval) {
|
||||
return Props.create(() ->
|
||||
new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -102,7 +104,8 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
|
|||
final Long id = in.first();
|
||||
final byte[] bytes = in.second();
|
||||
|
||||
final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get();
|
||||
final PersistentRepr p =
|
||||
serialization.deserialize(bytes, PersistentRepr.class).get();
|
||||
|
||||
return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload());
|
||||
}).collect(toList());
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ all domain events of an Aggregate Root type.
|
|||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag
|
||||
|
||||
To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged``
|
||||
To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.Tagged``
|
||||
with the given ``tags``.
|
||||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger
|
||||
|
|
|
|||
|
|
@ -75,6 +75,9 @@ significantly inefficient.
|
|||
|
||||
The predefined queries are:
|
||||
|
||||
AllPersistenceIds
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system.
|
||||
By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new
|
||||
persistence ids as they come into the system:
|
||||
|
|
@ -86,6 +89,9 @@ If your usage does not require a live stream, you can disable refreshing by usin
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap
|
||||
|
||||
EventsByPersistenceId
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor <event-sourcing-scala>`,
|
||||
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
|
||||
persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve
|
||||
|
|
@ -93,11 +99,20 @@ this, which can be configured using the ``RefreshInterval`` query hint:
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh
|
||||
|
||||
``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to
|
||||
implement in some journals or may need some additional preparation of the used data store to be executed efficiently,
|
||||
please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query
|
||||
is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented
|
||||
depends on the used journal.
|
||||
EventsByTag
|
||||
^^^^^^^^^^^
|
||||
|
||||
``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to
|
||||
implement in some journals or may need some additional preparation of the used data store to be executed efficiently.
|
||||
The goal of this query is to allow querying for all events which are "tagged" with a specific tag.
|
||||
That includes the use case to query all domain events of an Aggregate Root type.
|
||||
Please refer to your read journal plugin's documentation to find out if and how it is supported.
|
||||
|
||||
Some journals may support tagging of events via an :ref:`event-adapters-java` that wraps the events in a
|
||||
``akka.persistence.journal.Tagged`` with the given ``tags``. The journal may support other ways of doing tagging - again,
|
||||
how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter:
|
||||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger
|
||||
|
||||
.. note::
|
||||
A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag``
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.actor._
|
|||
import akka.pattern.BackoffSupervisor
|
||||
import akka.persistence._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{Source, Sink, Flow}
|
||||
import akka.stream.scaladsl.{ Source, Sink, Flow }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import akka.persistence.journal.{ EventAdapter, EventSeq }
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.persistence.query.PersistenceQuery
|
||||
import akka.persistence.query.journal.leveldb.LeveldbReadJournal
|
||||
import akka.persistence.journal.leveldb.Tagged
|
||||
import akka.persistence.journal.Tagged
|
||||
import akka.persistence.query.EventsByPersistenceId
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.persistence.query.EventEnvelope
|
||||
|
|
@ -19,9 +19,9 @@ import scala.annotation.tailrec
|
|||
object LeveldbPersistenceQueryDocSpec {
|
||||
//#tagger
|
||||
import akka.persistence.journal.WriteEventAdapter
|
||||
import akka.persistence.journal.leveldb.Tagged
|
||||
import akka.persistence.journal.Tagged
|
||||
|
||||
class ColorTagger extends WriteEventAdapter {
|
||||
class MyTaggingEventAdapter extends WriteEventAdapter {
|
||||
val colors = Set("green", "black", "blue")
|
||||
override def toJournal(event: Any): Any = event match {
|
||||
case s: String ⇒
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ all domain events of an Aggregate Root type.
|
|||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByTag
|
||||
|
||||
To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged``
|
||||
To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.Tagged``
|
||||
with the given ``tags``.
|
||||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger
|
||||
|
|
|
|||
|
|
@ -71,6 +71,9 @@ significantly inefficient.
|
|||
|
||||
The predefined queries are:
|
||||
|
||||
AllPersistenceIds
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system.
|
||||
By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new
|
||||
persistence ids as they come into the system:
|
||||
|
|
@ -82,6 +85,9 @@ If your usage does not require a live stream, you can disable refreshing by usin
|
|||
|
||||
.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap
|
||||
|
||||
EventsByPersistenceId
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor <event-sourcing-scala>`,
|
||||
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
|
||||
persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve
|
||||
|
|
@ -89,11 +95,20 @@ this, which can be configured using the ``RefreshInterval`` query hint:
|
|||
|
||||
.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh
|
||||
|
||||
``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to
|
||||
implement in some journals or may need some additional preparation of the used data store to be executed efficiently,
|
||||
please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query
|
||||
is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented
|
||||
depends on the used journal.
|
||||
EventsByTag
|
||||
^^^^^^^^^^^
|
||||
|
||||
``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to
|
||||
implement in some journals or may need some additional preparation of the used data store to be executed efficiently.
|
||||
The goal of this query is to allow querying for all events which are "tagged" with a specific tag.
|
||||
That includes the use case to query all domain events of an Aggregate Root type.
|
||||
Please refer to your read journal plugin's documentation to find out if and how it is supported.
|
||||
|
||||
Some journals may support tagging of events via an :ref:`event-adapters-scala` that wraps the events in a
|
||||
``akka.persistence.journal.Tagged`` with the given ``tags``. The journal may support other ways of doing tagging - again,
|
||||
how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter:
|
||||
|
||||
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger
|
||||
|
||||
.. note::
|
||||
A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag``
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ object LeveldbReadJournal {
|
|||
* a given tag, e.g. all events of an Aggregate Root type.
|
||||
*
|
||||
* To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events
|
||||
* in a [[akka.persistence.journal.leveldb.Tagged]] with the given `tags`.
|
||||
* in a [[akka.persistence.journal.Tagged]] with the given `tags`.
|
||||
*
|
||||
* You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all
|
||||
* events with a given tag. The `offset` corresponds to an ordered sequence number for
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.testkit.TestKit
|
|||
import akka.persistence.query.NoRefresh
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.persistence.query.EventsByTag
|
||||
import akka.persistence.journal.leveldb.Tagged
|
||||
import akka.persistence.journal.Tagged
|
||||
import akka.persistence.journal.EventSeq
|
||||
import akka.persistence.journal.EventAdapter
|
||||
import akka.persistence.query.EventEnvelope
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ interface SnapshotStorePlugin {
|
|||
* @param criteria
|
||||
* selection criteria for loading.
|
||||
*/
|
||||
Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria);
|
||||
Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId,
|
||||
SnapshotSelectionCriteria criteria);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: asynchronously saves a snapshot.
|
||||
|
|
|
|||
|
|
@ -1,16 +1,17 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.journal.leveldb
|
||||
package akka.persistence.journal
|
||||
|
||||
import java.util.ArrayList
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* The LevelDB journal supports tagging of events that are used
|
||||
* by the `EventsByTag` query. To specify the tags you create an
|
||||
* The journal may support tagging of events that are used by the
|
||||
* `EventsByTag` query and it may support specifying the tags via an
|
||||
* [[akka.persistence.journal.EventAdapter]] that wraps the events
|
||||
* in a `Tagged` with the given `tags`.
|
||||
* in a `Tagged` with the given `tags`. The journal may support other
|
||||
* ways of doing tagging. Please consult the documentation of the specific
|
||||
* journal implementation for more information.
|
||||
*
|
||||
* The journal will unwrap the event and store the `payload`.
|
||||
*/
|
||||
|
|
@ -88,7 +88,7 @@ private[persistence] object LeveldbJournal {
|
|||
* Subscribe the `sender` to changes (appended events) for a specific `tag`.
|
||||
* Used by query-side. The journal will send [[TaggedEventAppended]] messages to
|
||||
* the subscriber when `asyncWriteMessages` has been called.
|
||||
* Events are tagged by wrapping in [[akka.persistence.journal.leveldb.Tagged]]
|
||||
* Events are tagged by wrapping in [[akka.persistence.journal.Tagged]]
|
||||
* via an [[akka.persistence.journal.EventAdapter]].
|
||||
*/
|
||||
final case class SubscribeTag(tag: String) extends SubscriptionCommand
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import scala.util._
|
|||
import scala.concurrent.Future
|
||||
import scala.util.control.NonFatal
|
||||
import akka.persistence.journal.AsyncWriteJournal
|
||||
import akka.persistence.journal.Tagged
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue