From d63e5417b781307a3218d12140403c7cbc34482f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 1 Sep 2015 08:14:59 +0200 Subject: [PATCH] !per #18360 Move Tagged to akka.persistence.journal breaking api since 2.4.0-RC1, but only related to new things in 2.4 anyway --- .../query/LeveldbPersistenceQueryDocTest.java | 4 +-- .../query/MyEventsByTagJavaPublisher.java | 9 ++++--- .../rst/java/persistence-query-leveldb.rst | 2 +- akka-docs/rst/java/persistence-query.rst | 25 +++++++++++++++---- .../docs/persistence/PersistenceDocSpec.scala | 2 +- .../LeveldbPersistenceQueryDocSpec.scala | 6 ++--- .../rst/scala/persistence-query-leveldb.rst | 2 +- akka-docs/rst/scala/persistence-query.rst | 25 +++++++++++++++---- .../journal/leveldb/LeveldbReadJournal.scala | 2 +- .../journal/leveldb/EventsByTagSpec.scala | 2 +- .../snapshot/japi/SnapshotStorePlugin.java | 3 ++- .../journal/{leveldb => }/Tagged.scala | 11 ++++---- .../journal/leveldb/LeveldbJournal.scala | 2 +- .../journal/leveldb/LeveldbStore.scala | 1 + 14 files changed, 66 insertions(+), 30 deletions(-) rename akka-persistence/src/main/scala/akka/persistence/journal/{leveldb => }/Tagged.scala (54%) diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java index 01638e2cf1..3f76442587 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -11,7 +11,7 @@ import scala.runtime.BoxedUnit; import akka.actor.ActorSystem; import akka.persistence.journal.WriteEventAdapter; import akka.persistence.journal.EventSeq; -import akka.persistence.journal.leveldb.Tagged; +import akka.persistence.journal.Tagged; import akka.persistence.query.AllPersistenceIds; import akka.persistence.query.EventEnvelope; import akka.persistence.query.EventsByPersistenceId; @@ -67,7 +67,7 @@ public class LeveldbPersistenceQueryDocTest { static //#tagger - public class MyEventAdapter implements WriteEventAdapter { + public class MyTaggingEventAdapter implements WriteEventAdapter { @Override public Object toJournal(Object event) { diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java index 0053246d0f..f46e92fd5b 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -69,8 +69,10 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { .build()); } - public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) { - return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); + public static Props props(Connection conn, String tag, Long offset, + FiniteDuration refreshInterval) { + return Props.create(() -> + new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); } @Override @@ -102,7 +104,8 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { final Long id = in.first(); final byte[] bytes = in.second(); - final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + final PersistentRepr p = + serialization.deserialize(bytes, PersistentRepr.class).get(); return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); }).collect(toList()); diff --git a/akka-docs/rst/java/persistence-query-leveldb.rst b/akka-docs/rst/java/persistence-query-leveldb.rst index 57621d5830..8d3c0fa6da 100644 --- a/akka-docs/rst/java/persistence-query-leveldb.rst +++ b/akka-docs/rst/java/persistence-query-leveldb.rst @@ -107,7 +107,7 @@ all domain events of an Aggregate Root type. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag -To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged`` +To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.Tagged`` with the given ``tags``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index c1e98e27ae..a3df47c3f1 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -75,6 +75,9 @@ significantly inefficient. The predefined queries are: +AllPersistenceIds +^^^^^^^^^^^^^^^^^ + ``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new persistence ids as they come into the system: @@ -86,6 +89,9 @@ If your usage does not require a live stream, you can disable refreshing by usin .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap +EventsByPersistenceId +^^^^^^^^^^^^^^^^^^^^^ + ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve @@ -93,11 +99,20 @@ this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh -``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to -implement in some journals or may need some additional preparation of the used data store to be executed efficiently, -please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query -is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented -depends on the used journal. +EventsByTag +^^^^^^^^^^^ + +``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently. +The goal of this query is to allow querying for all events which are "tagged" with a specific tag. +That includes the use case to query all domain events of an Aggregate Root type. +Please refer to your read journal plugin's documentation to find out if and how it is supported. + +Some journals may support tagging of events via an :ref:`event-adapters-java` that wraps the events in a +``akka.persistence.journal.Tagged`` with the given ``tags``. The journal may support other ways of doing tagging - again, +how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter: + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger .. note:: A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 3f806b2f86..1c60ac7916 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.pattern.BackoffSupervisor import akka.persistence._ import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Source, Sink, Flow} +import akka.stream.scaladsl.{ Source, Sink, Flow } import scala.concurrent.duration._ import scala.language.postfixOps diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index e6f0573362..41248de7db 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -7,7 +7,7 @@ import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec import akka.persistence.query.PersistenceQuery import akka.persistence.query.journal.leveldb.LeveldbReadJournal -import akka.persistence.journal.leveldb.Tagged +import akka.persistence.journal.Tagged import akka.persistence.query.EventsByPersistenceId import akka.stream.scaladsl.Source import akka.persistence.query.EventEnvelope @@ -19,9 +19,9 @@ import scala.annotation.tailrec object LeveldbPersistenceQueryDocSpec { //#tagger import akka.persistence.journal.WriteEventAdapter - import akka.persistence.journal.leveldb.Tagged + import akka.persistence.journal.Tagged - class ColorTagger extends WriteEventAdapter { + class MyTaggingEventAdapter extends WriteEventAdapter { val colors = Set("green", "black", "blue") override def toJournal(event: Any): Any = event match { case s: String ⇒ diff --git a/akka-docs/rst/scala/persistence-query-leveldb.rst b/akka-docs/rst/scala/persistence-query-leveldb.rst index ca1e801afb..f583a15a5e 100644 --- a/akka-docs/rst/scala/persistence-query-leveldb.rst +++ b/akka-docs/rst/scala/persistence-query-leveldb.rst @@ -102,7 +102,7 @@ all domain events of an Aggregate Root type. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByTag -To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged`` +To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.Tagged`` with the given ``tags``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 8e1a75106b..99b15bad06 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -71,6 +71,9 @@ significantly inefficient. The predefined queries are: +AllPersistenceIds +^^^^^^^^^^^^^^^^^ + ``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new persistence ids as they come into the system: @@ -82,6 +85,9 @@ If your usage does not require a live stream, you can disable refreshing by usin .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap +EventsByPersistenceId +^^^^^^^^^^^^^^^^^^^^^ + ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve @@ -89,11 +95,20 @@ this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh -``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to -implement in some journals or may need some additional preparation of the used data store to be executed efficiently, -please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query -is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented -depends on the used journal. +EventsByTag +^^^^^^^^^^^ + +``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently. +The goal of this query is to allow querying for all events which are "tagged" with a specific tag. +That includes the use case to query all domain events of an Aggregate Root type. +Please refer to your read journal plugin's documentation to find out if and how it is supported. + +Some journals may support tagging of events via an :ref:`event-adapters-scala` that wraps the events in a +``akka.persistence.journal.Tagged`` with the given ``tags``. The journal may support other ways of doing tagging - again, +how exactly this is implemented depends on the used journal. Here is an example of such a tagging event adapter: + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger .. note:: A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala index 0a12b686ed..aae17379cd 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala @@ -115,7 +115,7 @@ object LeveldbReadJournal { * a given tag, e.g. all events of an Aggregate Root type. * * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events - * in a [[akka.persistence.journal.leveldb.Tagged]] with the given `tags`. + * in a [[akka.persistence.journal.Tagged]] with the given `tags`. * * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all * events with a given tag. The `offset` corresponds to an ordered sequence number for diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index ceaa39d646..c9feba7f0e 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -15,7 +15,7 @@ import akka.testkit.TestKit import akka.persistence.query.NoRefresh import akka.testkit.AkkaSpec import akka.persistence.query.EventsByTag -import akka.persistence.journal.leveldb.Tagged +import akka.persistence.journal.Tagged import akka.persistence.journal.EventSeq import akka.persistence.journal.EventAdapter import akka.persistence.query.EventEnvelope diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index 3c32516726..ec7d8895ae 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -21,7 +21,8 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for loading. */ - Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); + Future> doLoadAsync(String persistenceId, + SnapshotSelectionCriteria criteria); /** * Java API, Plugin API: asynchronously saves a snapshot. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala b/akka-persistence/src/main/scala/akka/persistence/journal/Tagged.scala similarity index 54% rename from akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala rename to akka-persistence/src/main/scala/akka/persistence/journal/Tagged.scala index 988cd78c80..951cb427d0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/Tagged.scala @@ -1,16 +1,17 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.persistence.journal.leveldb +package akka.persistence.journal -import java.util.ArrayList import scala.collection.JavaConverters._ /** - * The LevelDB journal supports tagging of events that are used - * by the `EventsByTag` query. To specify the tags you create an + * The journal may support tagging of events that are used by the + * `EventsByTag` query and it may support specifying the tags via an * [[akka.persistence.journal.EventAdapter]] that wraps the events - * in a `Tagged` with the given `tags`. + * in a `Tagged` with the given `tags`. The journal may support other + * ways of doing tagging. Please consult the documentation of the specific + * journal implementation for more information. * * The journal will unwrap the event and store the `payload`. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 696fa088de..de6036cebd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -88,7 +88,7 @@ private[persistence] object LeveldbJournal { * Subscribe the `sender` to changes (appended events) for a specific `tag`. * Used by query-side. The journal will send [[TaggedEventAppended]] messages to * the subscriber when `asyncWriteMessages` has been called. - * Events are tagged by wrapping in [[akka.persistence.journal.leveldb.Tagged]] + * Events are tagged by wrapping in [[akka.persistence.journal.Tagged]] * via an [[akka.persistence.journal.EventAdapter]]. */ final case class SubscribeTag(tag: String) extends SubscriptionCommand diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 8ab3a1c90c..d11194ed0a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -17,6 +17,7 @@ import scala.util._ import scala.concurrent.Future import scala.util.control.NonFatal import akka.persistence.journal.AsyncWriteJournal +import akka.persistence.journal.Tagged /** * INTERNAL API.