diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 4f7955e272..aecb3a43da 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -5,7 +5,9 @@ package docs.persistence; import static akka.pattern.Patterns.ask; - +import java.util.HashSet; +import java.util.Set; +import java.util.Iterator; import com.typesafe.config.Config; import akka.actor.*; @@ -44,48 +46,156 @@ public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); + + static + //#advanced-journal-query-types + public class RichEvent { + public final Settags; + public final Object payload; + + public RichEvent(Set tags, Object payload) { + this.tags = tags; + this.payload = payload; + } + } + //#advanced-journal-query-types + static + //#advanced-journal-query-types + // a plugin can provide: + public final class QueryMetadata{ + public final boolean deterministicOrder; + public final boolean infinite; + + public QueryMetadata(boolean deterministicOrder, boolean infinite) { + this.deterministicOrder = deterministicOrder; + this.infinite = infinite; + } + } + //#advanced-journal-query-types + + static //#my-read-journal - class MyReadJournal implements ReadJournal { - private final ExtendedActorSystem system; + public class MyReadJournalProvider implements ReadJournalProvider { + private final MyJavadslReadJournal javadslReadJournal; - public MyReadJournal(ExtendedActorSystem system, Config config) { - this.system = system; + public MyReadJournalProvider(ExtendedActorSystem system, Config config) { + this.javadslReadJournal = new MyJavadslReadJournal(system, config); + } + + @Override + public MyScaladslReadJournal scaladslReadJournal() { + return new MyScaladslReadJournal(javadslReadJournal); } - final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + @Override + public MyJavadslReadJournal javadslReadJournal() { + return this.javadslReadJournal; + } + } + //#my-read-journal + + static + //#my-read-journal + public class MyJavadslReadJournal implements + akka.persistence.query.javadsl.ReadJournal, + akka.persistence.query.javadsl.EventsByTagQuery, + akka.persistence.query.javadsl.EventsByPersistenceIdQuery, + akka.persistence.query.javadsl.AllPersistenceIdsQuery, + akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { + + private final FiniteDuration refreshInterval; - @SuppressWarnings("unchecked") - public Source query(Query q, Hint... hints) { - if (q instanceof EventsByTag) { - final EventsByTag eventsByTag = (EventsByTag) q; - final String tag = eventsByTag.tag(); - long offset = eventsByTag.offset(); - - final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); - - return (Source) Source.actorPublisher(props) - .mapMaterializedValue(noMaterializedValue()); - } else { - // unsuported - return Source.failed( - new UnsupportedOperationException( - "Query " + q + " not supported by " + getClass().getName())) - .mapMaterializedValue(noMaterializedValue()); - } + public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { + refreshInterval = + FiniteDuration.create(config.getDuration("refresh-interval", + TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } - private FiniteDuration refreshInterval(Hint[] hints) { - for (Hint hint : hints) - if (hint instanceof RefreshInterval) - return ((RefreshInterval) hint).interval(); - - return defaultRefreshInterval; + @Override + public Source eventsByTag(String tag, long offset) { + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval); + return Source.actorPublisher(props). + mapMaterializedValue(m -> BoxedUnit.UNIT); } - private akka.japi.function.Function noMaterializedValue() { - return param -> (M) null; + @Override + public Source eventsByPersistenceId(String persistenceId, + long fromSequenceNr, long toSequenceNr) { + // implement in a similar way as eventsByTag + throw new UnsupportedOperationException("Not implemented yet"); } + + @Override + public Source allPersistenceIds() { + // implement in a similar way as eventsByTag + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Source currentPersistenceIds() { + // implement in a similar way as eventsByTag + throw new UnsupportedOperationException("Not implemented yet"); + } + + // possibility to add more plugin specific queries + + //#advanced-journal-query-definition + public Source byTagsWithMeta(Set tags) { + //#advanced-journal-query-definition + // implement in a similar way as eventsByTag + throw new UnsupportedOperationException("Not implemented yet"); + } + + } + //#my-read-journal + + static + //#my-read-journal + public class MyScaladslReadJournal implements + akka.persistence.query.scaladsl.ReadJournal, + akka.persistence.query.scaladsl.EventsByTagQuery, + akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, + akka.persistence.query.scaladsl.AllPersistenceIdsQuery, + akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { + + private final MyJavadslReadJournal javadslReadJournal; + + public MyScaladslReadJournal(MyJavadslReadJournal javadslReadJournal) { + this.javadslReadJournal = javadslReadJournal; + } + + @Override + public akka.stream.scaladsl.Source eventsByTag( + String tag, long offset) { + return javadslReadJournal.eventsByTag(tag, offset).asScala(); + } + + @Override + public akka.stream.scaladsl.Source eventsByPersistenceId( + String persistenceId, long fromSequenceNr, long toSequenceNr) { + return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, + toSequenceNr).asScala(); + } + + @Override + public akka.stream.scaladsl.Source allPersistenceIds() { + return javadslReadJournal.allPersistenceIds().asScala(); + } + + @Override + public akka.stream.scaladsl.Source currentPersistenceIds() { + return javadslReadJournal.currentPersistenceIds().asScala(); + } + + // possibility to add more plugin specific queries + + public akka.stream.scaladsl.Source byTagsWithMeta( + scala.collection.Set tags) { + Set jTags = scala.collection.JavaConversions.setAsJavaSet(tags); + return javadslReadJournal.byTagsWithMeta(jTags).asScala(); + } + } //#my-read-journal @@ -94,13 +204,13 @@ public class PersistenceQueryDocTest { //#basic-usage // obtain read journal by plugin id - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); // issue query to journal Source source = - readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); + readJournal.eventsByPersistenceId("user-1337", 0, Long.MAX_VALUE); // materialize stream, consuming events ActorMaterializer mat = ActorMaterializer.create(system); @@ -109,52 +219,51 @@ public class PersistenceQueryDocTest { } void demonstrateAllPersistenceIdsLive() { - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#all-persistence-ids-live - readJournal.query(AllPersistenceIds.getInstance()); + readJournal.allPersistenceIds(); //#all-persistence-ids-live } void demonstrateNoRefresh() { final ActorSystem system = ActorSystem.create(); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#all-persistence-ids-snap - readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance()); + readJournal.currentPersistenceIds(); //#all-persistence-ids-snap } void demonstrateRefresh() { final ActorSystem system = ActorSystem.create(); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); - //#events-by-persistent-id-refresh - final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS); - readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh); - //#events-by-persistent-id-refresh + //#events-by-persistent-id + readJournal.eventsByPersistenceId("user-us-1337", 0L, Long.MAX_VALUE); + //#events-by-persistent-id } void demonstrateEventsByTag() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#events-by-tag // assuming journal is able to work with numeric offsets we can: final Source blueThings = - readJournal.query(EventsByTag.create("blue")); + readJournal.eventsByTag("blue", 0L); // find top 10 blue things: final Future> top10BlueThings = @@ -167,56 +276,39 @@ public class PersistenceQueryDocTest { }, mat); // start another query, from the known offset - Source blue = readJournal.query(EventsByTag.create("blue", 10)); + Source blue = readJournal.eventsByTag("blue", 10); //#events-by-tag } - //#materialized-query-metadata-classes - // a plugin can provide: - //#materialized-query-metadata-classes - - static - //#materialized-query-metadata-classes - final class QueryMetadata { - public final boolean deterministicOrder; - public final boolean infinite; - - public QueryMetadata(Boolean deterministicOrder, Boolean infinite) { - this.deterministicOrder = deterministicOrder; - this.infinite = infinite; - } - } - - //#materialized-query-metadata-classes - - static - //#materialized-query-metadata-classes - final class AllEvents implements Query { - private AllEvents() {} - private static AllEvents INSTANCE = new AllEvents(); - } - - //#materialized-query-metadata-classes void demonstrateMaterializedQueryValues() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); - //#materialized-query-metadata + //#advanced-journal-query-usage - final Source events = readJournal.query(AllEvents.INSTANCE); - - events.mapMaterializedValue(meta -> { - System.out.println("The query is: " + - "ordered deterministically: " + meta.deterministicOrder + " " + - "infinite: " + meta.infinite); - return meta; - }); - //#materialized-query-metadata + Set tags = new HashSet(); + tags.add("red"); + tags.add("blue"); + final Source events = readJournal.byTagsWithMeta(tags) + .mapMaterializedValue(meta -> { + System.out.println("The query is: " + + "ordered deterministically: " + meta.deterministicOrder + " " + + "infinite: " + meta.infinite); + return meta; + }); + + events.map(event -> { + System.out.println("Event payload: " + event.payload); + return event.payload; + }).runWith(Sink.ignore(), mat); + + + //#advanced-journal-query-usage } class ReactiveStreamsCompatibleDBDriver { @@ -229,9 +321,9 @@ public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#projection-into-different-store-rs @@ -240,7 +332,7 @@ public class PersistenceQueryDocTest { // Using an example (Reactive Streams) Database driver readJournal - .query(EventsByPersistenceId.create("user-1337")) + .eventsByPersistenceId("user-1337", 0L, Long.MAX_VALUE) .map(envelope -> envelope.event()) .grouped(20) // batch inserts into groups of 20 .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database @@ -262,16 +354,16 @@ public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#projection-into-different-store-simple final ExampleStore store = new ExampleStore(); readJournal - .query(EventsByTag.create("bid")) + .eventsByTag("bid", 0L) .mapAsync(1, store::save) .runWith(Sink.ignore(), mat); //#projection-into-different-store-simple @@ -305,9 +397,9 @@ public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - final ReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor("akka.persistence.query.noop-read-journal"); + final MyJavadslReadJournal readJournal = + PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, + "akka.persistence.query.my-read-journal"); //#projection-into-different-store-actor-run @@ -321,7 +413,7 @@ public class PersistenceQueryDocTest { long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration()); readJournal - .query(EventsByTag.create("bid", startFromOffset)) + .eventsByTag("bid", startFromOffset) .mapAsync(8, envelope -> { final Future f = ask(writer, envelope.event(), timeout); return f.map(new Mapper() { diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java index 3f76442587..5b7d14b5c2 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -10,15 +10,10 @@ import scala.runtime.BoxedUnit; import akka.actor.ActorSystem; import akka.persistence.journal.WriteEventAdapter; -import akka.persistence.journal.EventSeq; import akka.persistence.journal.Tagged; -import akka.persistence.query.AllPersistenceIds; import akka.persistence.query.EventEnvelope; -import akka.persistence.query.EventsByPersistenceId; -import akka.persistence.query.EventsByTag; import akka.persistence.query.PersistenceQuery; -import akka.persistence.query.javadsl.ReadJournal; -import akka.persistence.query.journal.leveldb.LeveldbReadJournal; +import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Source; @@ -30,38 +25,41 @@ public class LeveldbPersistenceQueryDocTest { //#get-read-journal final ActorMaterializer mat = ActorMaterializer.create(system); - ReadJournal queries = - PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + LeveldbReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, + LeveldbReadJournal.Identifier()); //#get-read-journal } public void demonstrateEventsByPersistenceId() { //#EventsByPersistenceId - ReadJournal queries = - PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + LeveldbReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, + LeveldbReadJournal.Identifier()); Source source = - queries.query(EventsByPersistenceId.create("some-persistence-id", 0, Long.MAX_VALUE)); + queries.eventsByPersistenceId("some-persistence-id", 0, Long.MAX_VALUE); //#EventsByPersistenceId } public void demonstrateAllPersistenceIds() { //#AllPersistenceIds - ReadJournal queries = - PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + LeveldbReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, + LeveldbReadJournal.Identifier()); - Source source = - queries.query(AllPersistenceIds.getInstance()); + Source source = queries.allPersistenceIds(); //#AllPersistenceIds } public void demonstrateEventsByTag() { //#EventsByTag - ReadJournal queries = - PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + LeveldbReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, + LeveldbReadJournal.Identifier()); Source source = - queries.query(EventsByTag.create("green", 0)); + queries.eventsByTag("green", 0); //#EventsByTag } diff --git a/akka-docs/rst/java/persistence-query-leveldb.rst b/akka-docs/rst/java/persistence-query-leveldb.rst index 8d3c0fa6da..b52cd159a6 100644 --- a/akka-docs/rst/java/persistence-query-leveldb.rst +++ b/akka-docs/rst/java/persistence-query-leveldb.rst @@ -38,10 +38,10 @@ extension: Supported Queries ================= -EventsByPersistenceId ---------------------- +EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery +---------------------------------------------------------------- -``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` +``eventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` identified by ``persistenceId``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByPersistenceId @@ -55,15 +55,10 @@ The returned event stream is ordered by sequence number, i.e. the same order as ``PersistentActor`` persisted the events. The same prefix of stream elements (in same order) are returned for multiple executions of the query, except for when events have been deleted. -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently stored events, - but it continues to push new events when new events are persisted. This is the - default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently stored events. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently stored events, +but it continues to push new events when new events are persisted. +Corresponding query that is completed when it reaches the end of the currently +stored events is provided by ``currentEventsByPersistenceId``. The LevelDB write journal is notifying the query side as soon as events are persisted, but for efficiency reasons the query side retrieves the events in batches that sometimes can @@ -73,25 +68,20 @@ hint. The stream is completed with failure if there is a failure in executing the query in the backend journal. -AllPersistenceIds ------------------ +AllPersistenceIdsQuery and CurrentPersistenceIdsQuery +----------------------------------------------------- -``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. +``allPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#AllPersistenceIds The returned event stream is unordered and you can expect different order for multiple executions of the query. -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently used ``persistenceIds``, - but it continues to push new ``persistenceIds`` when new persistent actors are created. - This is the default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently used ``persistenceIds``. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently used `persistenceIds`, +but it continues to push new `persistenceIds` when new persistent actors are created. +Corresponding query that is completed when it reaches the end of the currently +currently used `persistenceIds` is provided by ``currentPersistenceIds``. The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are created and there is no periodic polling or batching involved in this query. @@ -99,10 +89,10 @@ created and there is no periodic polling or batching involved in this query. The stream is completed with failure if there is a failure in executing the query in the backend journal. -EventsByTag ------------ +EventsByTag and CurrentEventsByTag +---------------------------------- -``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g. +``eventsByTag`` is used for retrieving events that were marked with a given tag, e.g. all domain events of an Aggregate Root type. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag @@ -131,15 +121,10 @@ tagged event stream. Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream". -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently stored events, - but it continues to push new events when new events are persisted. This is the - default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently stored events. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently stored events, +but it continues to push new events when new events are persisted. +Corresponding query that is completed when it reaches the end of the currently +stored events is provided by ``currentEventsByTag``. The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for efficiency reasons the query side retrieves the events in batches that sometimes can diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index a3df47c3f1..91bfb64d2f 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -51,20 +51,20 @@ Read Journals In order to issue queries one has to first obtain an instance of a ``ReadJournal``. Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC -databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +databases). For example, given a library that provides a ``akka.persistence.query.my-read-journal`` obtaining the related journal is as simple as: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via -``getJournalFor(NoopJournal.identifier)``, however this is not enforced. +``getJournalFor(NoopJournal.class, NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. Predefined queries ------------------ -Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +Akka persistence query comes with a number of query interfaces built in and suggests Journal implementors to implement them according to the semantics described below. It is important to notice that while these query types are very common a journal is not obliged to implement all of them - for example because in a given journal such query would be significantly inefficient. @@ -75,34 +75,37 @@ significantly inefficient. The predefined queries are: -AllPersistenceIds -^^^^^^^^^^^^^^^^^ +AllPersistenceIdsQuery and CurrentPersistenceIdsQuery +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +``allPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new persistence ids as they come into the system: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-live -If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in -``NoRefresh`` hint to the query: +If your usage does not require a live stream, you can use the ``currentPersistenceIds`` query: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap -EventsByPersistenceId -^^^^^^^^^^^^^^^^^^^^^ +EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +``eventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve -this, which can be configured using the ``RefreshInterval`` query hint: +persistent actor identified by the given ``persistenceId``. -.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id -EventsByTag -^^^^^^^^^^^ +Most journals will have to revert to polling in order to achieve this, +which can typically be configured with a ``refresh-interval`` configuration property. -``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to +If your usage does not require a live stream, you can use the ``currentEventsByPersistenceId`` query. + +EventsByTag and CurrentEventsByTag +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``eventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to implement in some journals or may need some additional preparation of the used data store to be executed efficiently. The goal of this query is to allow querying for all events which are "tagged" with a specific tag. That includes the use case to query all domain events of an Aggregate Root type. @@ -134,6 +137,7 @@ query has an optionally supported offset parameter (of type ``Long``) which the For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +If your usage does not require a live stream, you can use the ``currentEventsByTag`` query. Materialized values of queries ------------------------------ @@ -142,11 +146,14 @@ which are a feature of `Akka Streams`_ that allows to expose additional values a More advanced query journals may use this technique to expose information about the character of the materialized stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type -is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +is defined as the second type parameter of the returned ``Source``, which allows journals to provide users with their specialised query object, as demonstrated in the sample below: -.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes -.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#advanced-journal-query-types + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#advanced-journal-query-definition + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#advanced-journal-query-usage .. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values .. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java.html @@ -154,7 +161,7 @@ specialised query object, as demonstrated in the sample below: Performance and denormalization =============================== -When building systems using :ref:`event-sourcing-scala` and CQRS (`Command & Query Responsibility Segragation`_) techniques +When building systems using :ref:`event-sourcing-java` and CQRS (`Command & Query Responsibility Segragation`_) techniques it is tremendously important to realise that the write-side has completely different needs from the read-side, and separating those concerns into datastores that are optimised for either side makes it possible to offer the best expirience for the write and read sides independently. @@ -232,8 +239,13 @@ Most users will not need to implement journals themselves, except if targeting a ReadJournal plugin API ---------------------- -Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. -For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. +A read journal plugin must implement ``akka.persistence.query.ReadJournalProvider`` which +creates instances of ``akka.persistence.query.scaladsl.ReadJournal`` and +``akka.persistence.query.javaadsl.ReadJournal``. The plugin must implement both the ``scaladsl`` +and the ``javadsl`` interfaces because the ``akka.stream.scaladsl.Source`` and +``akka.stream.javadsl.Source`` are different types and even though those types can easily be converted +to each other it is most convenient for the end user to get access to the Java or Scala ``Source`` directly. +As illustrated below one of the implementations can delegate to the other. Below is a simple journal implementation: @@ -243,6 +255,12 @@ And the ``EventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher +If the underlying datastore only supports queries that are completed when they reach the +end of the "result set", the journal has to submit new queries after a while in order +to support "infinite" event streams that include events stored after the initial query +has completed. It is recommended that the plugin use a configuration property named +``refresh-interval`` for defining such a refresh interval. + Plugin TCK ---------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala index 77e8eb041a..65f7a06986 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala @@ -136,7 +136,7 @@ trait DomainEvent case class Person(name: String, age: Int) extends DomainEvent case class Box(length: Int) extends DomainEvent -case class MyTaggingJournalModel(payload: Any, tags: immutable.Set[String]) +case class MyTaggingJournalModel(payload: Any, tags: Set[String]) //#identity-event-adapter class MyEventAdapter(system: ExtendedActorSystem) extends EventAdapter { @@ -230,4 +230,4 @@ object v1 { trait Event trait UserEvent extends v1.Event trait ItemEvent extends v1.Event -} \ No newline at end of file +} diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index 41248de7db..1a3c365b4e 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -6,14 +6,11 @@ package docs.persistence.query import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec import akka.persistence.query.PersistenceQuery -import akka.persistence.query.journal.leveldb.LeveldbReadJournal +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.journal.Tagged -import akka.persistence.query.EventsByPersistenceId import akka.stream.scaladsl.Source import akka.persistence.query.EventEnvelope import akka.stream.ActorMaterializer -import akka.persistence.query.AllPersistenceIds -import akka.persistence.query.EventsByTag import scala.annotation.tailrec object LeveldbPersistenceQueryDocSpec { @@ -46,20 +43,21 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { "demonstrate how get ReadJournal" in { //#get-read-journal import akka.persistence.query.PersistenceQuery - import akka.persistence.query.journal.leveldb.LeveldbReadJournal + import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( + LeveldbReadJournal.Identifier) //#get-read-journal } "demonstrate EventsByPersistenceId" in { //#EventsByPersistenceId - import akka.persistence.query.EventsByPersistenceId implicit val mat = ActorMaterializer()(system) - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( + LeveldbReadJournal.Identifier) val src: Source[EventEnvelope, Unit] = - queries.query(EventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)) + queries.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue) val events: Source[Any, Unit] = src.map(_.event) //#EventsByPersistenceId @@ -67,22 +65,22 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { "demonstrate AllPersistenceIds" in { //#AllPersistenceIds - import akka.persistence.query.AllPersistenceIds implicit val mat = ActorMaterializer()(system) - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( + LeveldbReadJournal.Identifier) - val src: Source[String, Unit] = queries.query(AllPersistenceIds) + val src: Source[String, Unit] = queries.allPersistenceIds() //#AllPersistenceIds } "demonstrate EventsByTag" in { //#EventsByTag - import akka.persistence.query.EventsByTag implicit val mat = ActorMaterializer()(system) - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( + LeveldbReadJournal.Identifier) val src: Source[EventEnvelope, Unit] = - queries.query(EventsByTag(tag = "green", offset = 0L)) + queries.eventsByTag(tag = "green", offset = 0L) //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index cf41f39279..49dc80de3f 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -5,12 +5,12 @@ package docs.persistence.query import akka.actor._ -import akka.persistence.query.scaladsl.ReadJournal import akka.persistence.{ Recovery, PersistentActor } import akka.persistence.query._ import akka.stream.{ FlowShape, ActorMaterializer } import akka.stream.scaladsl.FlowGraph import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.stream.javadsl import akka.testkit.AkkaSpec import akka.util.Timeout import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal } @@ -25,42 +25,112 @@ object PersistenceQueryDocSpec { implicit val timeout = Timeout(3.seconds) + //#advanced-journal-query-types + final case class RichEvent(tags: Set[String], payload: Any) + + // a plugin can provide: + case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) + //#advanced-journal-query-types + //#my-read-journal - class MyReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal { + class MyReadJournalProvider(system: ExtendedActorSystem, config: Config) + extends ReadJournalProvider { - private val defaulRefreshInterval = 3.seconds + override val scaladslReadJournal: MyScaladslReadJournal = + new MyScaladslReadJournal(system, config) - override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - q match { - case EventsByTag(tag, offset) ⇒ - val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)) - Source.actorPublisher[EventEnvelope](props) - .mapMaterializedValue(_ ⇒ noMaterializedValue) + override val javadslReadJournal: MyJavadslReadJournal = + new MyJavadslReadJournal(scaladslReadJournal) + } - case unsupported ⇒ - Source.failed[T]( - new UnsupportedOperationException( - s"Query $unsupported not supported by ${getClass.getName}")) - .mapMaterializedValue(_ ⇒ noMaterializedValue) - } + class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config) + extends akka.persistence.query.scaladsl.ReadJournal + with akka.persistence.query.scaladsl.EventsByTagQuery + with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery + with akka.persistence.query.scaladsl.AllPersistenceIdsQuery + with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { - private def refreshInterval(hints: Seq[Hint]): FiniteDuration = - hints.collectFirst { case RefreshInterval(interval) ⇒ interval } - .getOrElse(defaulRefreshInterval) + private val refreshInterval: FiniteDuration = + config.getDuration("refresh-interval", MILLISECONDS).millis - private def noMaterializedValue[M]: M = - null.asInstanceOf[M] + override def eventsByTag( + tag: String, offset: Long = 0L): Source[EventEnvelope, Unit] = { + val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval) + Source.actorPublisher[EventEnvelope](props) + .mapMaterializedValue(_ ⇒ ()) + } + + override def eventsByPersistenceId( + persistenceId: String, fromSequenceNr: Long = 0L, + toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, Unit] = { + // implement in a similar way as eventsByTag + ??? + } + + override def allPersistenceIds(): Source[String, Unit] = { + // implement in a similar way as eventsByTag + ??? + } + + override def currentPersistenceIds(): Source[String, Unit] = { + // implement in a similar way as eventsByTag + ??? + } + + // possibility to add more plugin specific queries + + //#advanced-journal-query-definition + def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = { + //#advanced-journal-query-definition + // implement in a similar way as eventsByTag + ??? + } + + } + + class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal) + extends akka.persistence.query.javadsl.ReadJournal + with akka.persistence.query.javadsl.EventsByTagQuery + with akka.persistence.query.javadsl.EventsByPersistenceIdQuery + with akka.persistence.query.javadsl.AllPersistenceIdsQuery + with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { + + override def eventsByTag( + tag: String, offset: Long = 0L): javadsl.Source[EventEnvelope, Unit] = + scaladslReadJournal.eventsByTag(tag, offset).asJava + + override def eventsByPersistenceId( + persistenceId: String, fromSequenceNr: Long = 0L, + toSequenceNr: Long = Long.MaxValue): javadsl.Source[EventEnvelope, Unit] = + scaladslReadJournal.eventsByPersistenceId( + persistenceId, fromSequenceNr, toSequenceNr).asJava + + override def allPersistenceIds(): javadsl.Source[String, Unit] = + scaladslReadJournal.allPersistenceIds().asJava + + override def currentPersistenceIds(): javadsl.Source[String, Unit] = + scaladslReadJournal.currentPersistenceIds().asJava + + // possibility to add more plugin specific queries + + def byTagsWithMeta( + tags: java.util.Set[String]): javadsl.Source[RichEvent, QueryMetadata] = { + import scala.collection.JavaConverters._ + scaladslReadJournal.byTagsWithMeta(tags.asScala.toSet).asJava + } } //#my-read-journal + case class ComplexState() { def readyToSave = false } case class Record(any: Any) class DummyStore { def save(record: Record) = Future.successful(42L) } + val JournalId = "akka.persistence.query.my-read-journal" + class X { - val JournalId = "" def convertToReadSideTypes(in: Any): Any = ??? @@ -72,13 +142,14 @@ object PersistenceQueryDocSpec { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() - val readJournal = PersistenceQuery(system).readJournalFor(JournalId) + val readJournal = + PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId) val dbBatchWriter: Subscriber[immutable.Seq[Any]] = ReactiveStreamsCompatibleDBDriver.batchWriter // Using an example (Reactive Streams) Database driver readJournal - .query(EventsByPersistenceId("user-1337")) + .eventsByPersistenceId("user-1337") .map(envelope => envelope.event) .map(convertToReadSideTypes) // convert to datatype .grouped(20) // batch inserts into groups of 20 @@ -109,103 +180,81 @@ object PersistenceQueryDocSpec { } class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { + import PersistenceQueryDocSpec._ def this() { this( """ - akka.persistence.query.noop-read-journal { - class = "docs.persistence.query.NoopReadJournal" + akka.persistence.query.my-read-journal { + class = "docs.persistence.query.PersistenceQueryDocSpec$MyReadJournalProvider" + refresh-interval = 3s } - """.stripMargin) + """) } - //#basic-usage - // obtain read journal by plugin id - val readJournal = - PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal") - - // issue query to journal - val source: Source[EventEnvelope, Unit] = - readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue)) - - // materialize stream, consuming events implicit val mat = ActorMaterializer() - source.runForeach { event => println("Event: " + event) } - //#basic-usage - //#all-persistence-ids-live - readJournal.query(AllPersistenceIds) - //#all-persistence-ids-live + class BasicUsage { + //#basic-usage + // obtain read journal by plugin id + val readJournal = + PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]( + "akka.persistence.query.my-read-journal") - //#all-persistence-ids-snap - readJournal.query(AllPersistenceIds, hints = NoRefresh) - //#all-persistence-ids-snap + // issue query to journal + val source: Source[EventEnvelope, Unit] = + readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue) - //#events-by-tag - // assuming journal is able to work with numeric offsets we can: + // materialize stream, consuming events + implicit val mat = ActorMaterializer() + source.runForeach { event => println("Event: " + event) } + //#basic-usage - val blueThings: Source[EventEnvelope, Unit] = - readJournal.query(EventsByTag("blue")) + //#all-persistence-ids-live + readJournal.allPersistenceIds() + //#all-persistence-ids-live - // find top 10 blue things: - val top10BlueThings: Future[Vector[Any]] = - blueThings - .map(_.event) - .take(10) // cancels the query stream after pulling 10 elements - .runFold(Vector.empty[Any])(_ :+ _) + //#all-persistence-ids-snap + readJournal.currentPersistenceIds() + //#all-persistence-ids-snap - // start another query, from the known offset - val furtherBlueThings = readJournal.query(EventsByTag("blue", offset = 10)) - //#events-by-tag + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: - //#events-by-persistent-id-refresh - readJournal.query(EventsByPersistenceId("user-us-1337"), hints = RefreshInterval(1.second)) + val blueThings: Source[EventEnvelope, Unit] = + readJournal.eventsByTag("blue") - //#events-by-persistent-id-refresh + // find top 10 blue things: + val top10BlueThings: Future[Vector[Any]] = + blueThings + .map(_.event) + .take(10) // cancels the query stream after pulling 10 elements + .runFold(Vector.empty[Any])(_ :+ _) - //#advanced-journal-query-definition - final case class RichEvent(tags: immutable.Set[String], payload: Any) + // start another query, from the known offset + val furtherBlueThings = readJournal.eventsByTag("blue", offset = 10) + //#events-by-tag - case class QueryStats(totalEvents: Long) + //#events-by-persistent-id + readJournal.eventsByPersistenceId("user-us-1337") - case class ByTagsWithStats(tags: immutable.Set[String]) - extends Query[RichEvent, QueryStats] + //#events-by-persistent-id - //#advanced-journal-query-definition + //#advanced-journal-query-usage + val query: Source[RichEvent, QueryMetadata] = + readJournal.byTagsWithMeta(Set("red", "blue")) - //#advanced-journal-query-hints + query + .mapMaterializedValue { meta => + println(s"The query is: " + + s"ordered deterministically: ${meta.deterministicOrder}, " + + s"infinite: ${meta.infinite}") + } + .map { event => println(s"Event payload: ${event.payload}") } + .runWith(Sink.ignore) - import scala.concurrent.duration._ - - readJournal.query(EventsByTag("blue"), hints = RefreshInterval(1.second)) - //#advanced-journal-query-hints - - //#advanced-journal-query-usage - val query: Source[RichEvent, QueryStats] = - readJournal.query(ByTagsWithStats(Set("red", "blue"))) - - query - .mapMaterializedValue { stats => println(s"Stats: $stats") } - .map { event => println(s"Event payload: ${event.payload}") } - .runWith(Sink.ignore) - - //#advanced-journal-query-usage - - //#materialized-query-metadata - // a plugin can provide: - case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) - - case object AllEvents extends Query[Any, QueryMetadata] - - val events = readJournal.query(AllEvents) - events - .mapMaterializedValue { meta => - println(s"The query is: " + - s"ordered deterministically: ${meta.deterministicOrder}, " + - s"infinite: ${meta.infinite}") - } - - //#materialized-query-metadata + //#advanced-journal-query-usage + } //#projection-into-different-store class MyResumableProjection(name: String) { @@ -215,6 +264,9 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#projection-into-different-store class RunWithActor { + val readJournal = + PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId) + //#projection-into-different-store-actor-run import akka.pattern.ask import system.dispatcher @@ -227,7 +279,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { bidProjection.latestOffset.foreach { startFromOffset => readJournal - .query(EventsByTag("bid", startFromOffset)) + .eventsByTag("bid", startFromOffset) .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) } .mapAsync(1) { offset => bidProjection.saveProgress(offset) } .runWith(Sink.ignore) @@ -236,6 +288,10 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { } class RunWithAsyncFunction { + val readJournal = + PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]( + "akka.persistence.query.my-read-journal") + //#projection-into-different-store-simple trait ExampleStore { def save(event: Any): Future[Unit] @@ -246,7 +302,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { val store: ExampleStore = ??? readJournal - .query(EventsByTag("bid")) + .eventsByTag("bid") .mapAsync(1) { e => store.save(e) } .runWith(Sink.ignore) //#projection-into-different-store-simple @@ -254,7 +310,3 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { } -class NoopReadJournal(sys: ExtendedActorSystem) extends ReadJournal { - override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - Source.empty.mapMaterializedValue(_ => null.asInstanceOf[M]) -} diff --git a/akka-docs/rst/scala/persistence-query-leveldb.rst b/akka-docs/rst/scala/persistence-query-leveldb.rst index f583a15a5e..c88acef242 100644 --- a/akka-docs/rst/scala/persistence-query-leveldb.rst +++ b/akka-docs/rst/scala/persistence-query-leveldb.rst @@ -33,10 +33,10 @@ extension: Supported Queries ================= -EventsByPersistenceId ---------------------- +EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery +---------------------------------------------------------------- -``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` +``eventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` identified by ``persistenceId``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByPersistenceId @@ -50,15 +50,10 @@ The returned event stream is ordered by sequence number, i.e. the same order as ``PersistentActor`` persisted the events. The same prefix of stream elements (in same order) are returned for multiple executions of the query, except for when events have been deleted. -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently stored events, - but it continues to push new events when new events are persisted. This is the - default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently stored events. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently stored events, +but it continues to push new events when new events are persisted. +Corresponding query that is completed when it reaches the end of the currently +stored events is provided by ``currentEventsByPersistenceId``. The LevelDB write journal is notifying the query side as soon as events are persisted, but for efficiency reasons the query side retrieves the events in batches that sometimes can @@ -68,25 +63,20 @@ hint. The stream is completed with failure if there is a failure in executing the query in the backend journal. -AllPersistenceIds ------------------ +AllPersistenceIdsQuery and CurrentPersistenceIdsQuery +----------------------------------------------------- -``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. +``allPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#AllPersistenceIds The returned event stream is unordered and you can expect different order for multiple executions of the query. -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently used ``persistenceIds``, - but it continues to push new ``persistenceIds`` when new persistent actors are created. - This is the default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently used ``persistenceIds``. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently used `persistenceIds`, +but it continues to push new `persistenceIds` when new persistent actors are created. +Corresponding query that is completed when it reaches the end of the currently +currently used `persistenceIds` is provided by ``currentPersistenceIds``. The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are created and there is no periodic polling or batching involved in this query. @@ -94,10 +84,10 @@ created and there is no periodic polling or batching involved in this query. The stream is completed with failure if there is a failure in executing the query in the backend journal. -EventsByTag ------------ +EventsByTag and CurrentEventsByTag +---------------------------------- -``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g. +``eventsByTag`` is used for retrieving events that were marked with a given tag, e.g. all domain events of an Aggregate Root type. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByTag @@ -126,15 +116,10 @@ tagged event stream. Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream". -The query supports two different completion modes: - -* The stream is not completed when it reaches the end of the currently stored events, - but it continues to push new events when new events are persisted. This is the - default mode that is used when no hints are given. It can also be specified with - hint ``RefreshInterval``. - -* The stream is completed when it reaches the end of the currently stored events. - This mode is specified with hint ``NoRefresh``. +The stream is not completed when it reaches the end of the currently stored events, +but it continues to push new events when new events are persisted. +Corresponding query that is completed when it reaches the end of the currently +stored events is provided by ``currentEventsByTag``. The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for efficiency reasons the query side retrieves the events in batches that sometimes can diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 99b15bad06..00e1407d5b 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -47,20 +47,20 @@ Read Journals In order to issue queries one has to first obtain an instance of a ``ReadJournal``. Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC -databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +databases). For example, given a library that provides a ``akka.persistence.query.my-read-journal`` obtaining the related journal is as simple as: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via -``journalFor(NoopJournal.identifier)``, however this is not enforced. +``readJournalFor[NoopJournal](NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. Predefined queries ------------------ -Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +Akka persistence query comes with a number of query interfaces built in and suggests Journal implementors to implement them according to the semantics described below. It is important to notice that while these query types are very common a journal is not obliged to implement all of them - for example because in a given journal such query would be significantly inefficient. @@ -71,34 +71,37 @@ significantly inefficient. The predefined queries are: -AllPersistenceIds -^^^^^^^^^^^^^^^^^ +AllPersistenceIdsQuery and CurrentPersistenceIdsQuery +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +``allPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new persistence ids as they come into the system: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-live -If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in -``NoRefresh`` hint to the query: +If your usage does not require a live stream, you can use the ``currentPersistenceIds`` query: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap -EventsByPersistenceId -^^^^^^^^^^^^^^^^^^^^^ +EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +``eventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve -this, which can be configured using the ``RefreshInterval`` query hint: +persistent actor identified by the given ``persistenceId``. -.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id -EventsByTag -^^^^^^^^^^^ +Most journals will have to revert to polling in order to achieve this, +which can typically be configured with a ``refresh-interval`` configuration property. -``EventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to +If your usage does not require a live stream, you can use the ``currentEventsByPersistenceId`` query. + +EventsByTag and CurrentEventsByTag +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``eventsByTag`` allows querying events regardless of which ``persistenceId`` they are associated with. This query is hard to implement in some journals or may need some additional preparation of the used data store to be executed efficiently. The goal of this query is to allow querying for all events which are "tagged" with a specific tag. That includes the use case to query all domain events of an Aggregate Root type. @@ -130,6 +133,7 @@ query has an optionally supported offset parameter (of type ``Long``) which the For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +If your usage does not require a live stream, you can use the ``currentEventsByTag`` query. Materialized values of queries ------------------------------ @@ -138,10 +142,14 @@ which are a feature of `Akka Streams`_ that allows to expose additional values a More advanced query journals may use this technique to expose information about the character of the materialized stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type -is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +is defined as the second type parameter of the returned ``Source``, which allows journals to provide users with their specialised query object, as demonstrated in the sample below: -.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#materialized-query-metadata +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#advanced-journal-query-types + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#advanced-journal-query-definition + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#advanced-journal-query-usage .. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-quickstart.html#Materialized_values .. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala.html @@ -226,17 +234,28 @@ Most users will not need to implement journals themselves, except if targeting a ReadJournal plugin API ---------------------- -Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. -For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. +A read journal plugin must implement ``akka.persistence.query.ReadJournalProvider`` which +creates instances of ``akka.persistence.query.scaladsl.ReadJournal`` and +``akka.persistence.query.javaadsl.ReadJournal``. The plugin must implement both the ``scaladsl`` +and the ``javadsl`` traits because the ``akka.stream.scaladsl.Source`` and +``akka.stream.javadsl.Source`` are different types and even though those types can easily be converted +to each other it is most convenient for the end user to get access to the Java or Scala directly. +As illustrated below one of the implementations can delegate to the other. Below is a simple journal implementation: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#my-read-journal -And the ``EventsByTag`` could be backed by such an Actor for example: +And the ``eventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher +If the underlying datastore only supports queries that are completed when they reach the +end of the "result set", the journal has to submit new queries after a while in order +to support "infinite" event streams that include events stored after the initial query +has completed. It is recommended that the plugin use a configuration property named +``refresh-interval`` for defining such a refresh interval. + Plugin TCK ---------- diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java deleted file mode 100644 index 6bf5be87f8..0000000000 --- a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.query.javadsl; - -import akka.persistence.query.Query; -import akka.persistence.query.Hint; -import akka.stream.javadsl.Source; -import scala.annotation.varargs; - -/** - * Java API - *

- * API for reading persistent events and information derived - * from stored persistent events. - *

- * The purpose of the API is not to enforce compatibility between different - * journal implementations, because the technical capabilities may be very different. - * The interface is very open so that different journals may implement specific queries. - *

- * Usage: - *


- * final ReadJournal journal =
- *   PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
- *
- * final Source<EventEnvelope, ?> events =
- *   journal.query(new EventsByTag("mytag", 0L));
- * 
- */ - -public interface ReadJournal { - - /** - * Java API - *

- * A query that returns a `Source` with output type `T` and materialized value `M`. - *

- * The `hints` are optional parameters that defines how to execute the - * query, typically specific to the journal implementation. - * - */ - Source query(Query q, Hint... hints); - -} diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java deleted file mode 100644 index 96866071fb..0000000000 --- a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.query.javadsl; - -import akka.japi.Util; -import akka.persistence.query.Hint; -import akka.persistence.query.Query; -import akka.stream.javadsl.Source; - -/** - * INTERNAL API - * - * Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal} - * to JavaDSL {@link ReadJournal}. - */ -public final class ReadJournalAdapter implements ReadJournal { - - private final akka.persistence.query.scaladsl.ReadJournal backing; - - public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) { - this.backing = backing; - } - - @Override - public Source query(Query q, Hint... hints) { - return backing.query(q, Util.immutableSeq(hints)).asJava(); - } - -} diff --git a/akka-persistence-query/src/main/resources/reference.conf b/akka-persistence-query/src/main/resources/reference.conf index 18e0a0f2d7..9c566097d8 100644 --- a/akka-persistence-query/src/main/resources/reference.conf +++ b/akka-persistence-query/src/main/resources/reference.conf @@ -8,8 +8,8 @@ #//#query-leveldb # Configuration for the LeveldbReadJournal akka.persistence.query.journal.leveldb { - # Implementation class of the LevelDB ReadJournal - class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal" + # Implementation class of the LevelDB ReadJournalProvider + class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider" # Absolute path to the write journal plugin configuration entry that this # query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal. diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala new file mode 100644 index 0000000000..fce035160f --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -0,0 +1,14 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +/** + * Event wrapper adding meta data for the events in the result stream of + * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries. + */ +final case class EventEnvelope( + offset: Long, + persistenceId: String, + sequenceNr: Long, + event: Any) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala deleted file mode 100644 index 62a48a942d..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.persistence.query - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.FiniteDuration - -/** - * A query hint that defines how to execute the query, - * typically specific to the journal implementation. - * - * A plugin may optionally support a [[Hint]]. - */ -trait Hint - -/** - * If the underlying datastore only supports queries that are completed when they reach the - * end of the "result set", the journal has to submit new queries after a while in order - * to support "infinite" event streams that include events stored after the initial query has completed. - * - * A plugin may optionally support this [[Hint]] for defining such a refresh interval. - */ -final case class RefreshInterval(interval: FiniteDuration) extends Hint -object RefreshInterval { - /** Java API */ - def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit)) - /** Java API */ - def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval) -} - -/** - * Indicates that the event stream is supposed to be completed immediately when it - * reaches the end of the "result set", as described in [[RefreshInterval]]. - * - */ -final case object NoRefresh extends NoRefresh { - /** Java API */ - def getInstance: NoRefresh = this -} -sealed class NoRefresh extends Hint - diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 7fafe87fd5..f1cfb3ff6d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -24,7 +24,9 @@ object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdPr def lookup() = PersistenceQuery /** INTERNAL API. */ - private[persistence] case class PluginHolder(plugin: scaladsl.ReadJournal) extends Extension + private[persistence] case class PluginHolder( + scaladslPlugin: scaladsl.ReadJournal, javadslPlugin: akka.persistence.query.javadsl.ReadJournal) + extends Extension } @@ -37,33 +39,38 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) /** - * Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given read journal configuration entry. + * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given + * read journal configuration entry. */ - @tailrec final def readJournalFor(readJournalPluginId: String): scaladsl.ReadJournal = { + final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T = + readJournalPluginFor(readJournalPluginId).scaladslPlugin.asInstanceOf[T] + + /** + * Java API: Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given + * read journal configuration entry. + */ + final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = + readJournalPluginFor(readJournalPluginId).javadslPlugin.asInstanceOf[T] + + @tailrec private def readJournalPluginFor(readJournalPluginId: String): PluginHolder = { val configPath = readJournalPluginId val extensionIdMap = readJournalPluginExtensionIds.get extensionIdMap.get(configPath) match { case Some(extensionId) ⇒ - extensionId(system).plugin + extensionId(system) case None ⇒ val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = - PluginHolder(createPlugin(configPath)) + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val provider = createPlugin(configPath) + PluginHolder(provider.scaladslReadJournal(), provider.javadslReadJournal()) + } } readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - readJournalFor(readJournalPluginId) // Recursive invocation. + readJournalPluginFor(readJournalPluginId) // Recursive invocation. } } - /** - * Java API - * - * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. - */ - final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = - new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId)) - - private def createPlugin(configPath: String): scaladsl.ReadJournal = { + private def createPlugin(configPath: String): ReadJournalProvider = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") val pluginConfig = system.settings.config.getConfig(configPath) @@ -71,26 +78,12 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { log.debug(s"Create plugin: ${configPath} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get - // TODO remove duplication - val scalaPlugin = - if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass)) - system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) - .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)) - .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) - .recoverWith { - case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) - } - else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass)) - system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) - .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)) - .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil)) - .map(jj ⇒ new scaladsl.ReadJournalAdapter(jj)) - .recoverWith { - case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) - } - else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend") - - scalaPlugin.get + system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)) + .orElse(system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + }.get } /** Check for default or missing identity. */ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala deleted file mode 100644 index 30206e673d..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.persistence.query - -/** - * General interface for all queries. There are a few pre-defined queries, - * such as [[EventsByPersistenceId]], [[AllPersistenceIds]] and [[EventsByTag]] - * but implementation of these queries are optional. Query (journal) plugins - * may define their own specialized queries. - * - * If a query plugin does not support a query it will return a stream that - * will be completed with a failure of [[UnsupportedOperationException]]. - */ -trait Query[T, M] - -/** - * Query all `PersistentActor` identifiers, i.e. as defined by the - * `persistenceId` of the `PersistentActor`. - * - * A plugin may optionally support this [[Query]]. - */ -final case object AllPersistenceIds extends AllPersistenceIds { - /** Java API */ - final def getInstance: AllPersistenceIds = this -} -abstract class AllPersistenceIds extends Query[String, Unit] - -/** - * Query events for a specific `PersistentActor` identified by `persistenceId`. - * - * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` - * or use `0L` and `Long.MaxValue` respectively to retrieve all events. - * - * The returned event stream should be ordered by sequence number. - * - * A plugin may optionally support this [[Query]]. - */ -final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) - extends Query[EventEnvelope, Unit] -object EventsByPersistenceId { - /** Java API */ - def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = - EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) - - /** Java API */ - def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId = - EventsByPersistenceId(persistenceId, fromSequenceNr) - - /** Java API */ - def create(persistenceId: String): EventsByPersistenceId = - EventsByPersistenceId(persistenceId) -} -/** - * Query events that have a specific tag. A tag can for example correspond to an - * aggregate root type (in DDD terminology). - * - * The consumer can keep track of its current position in the event stream by storing the - * `offset` and restart the query from a given `offset` after a crash/restart. - * - * The exact meaning of the `offset` depends on the journal and must be documented by the - * read journal plugin. It may be a sequential id number that uniquely identifies the - * position of each event within the event stream. Distributed data stores cannot easily - * support those semantics and they may use a weaker meaning. For example it may be a - * timestamp (taken when the event was created or stored). Timestamps are not unique and - * not strictly ordered, since clocks on different machines may not be synchronized. - * - * The returned event stream should be ordered by `offset` if possible, but this can also be - * difficult to fulfill for a distributed data store. The order must be documented by the - * read journal plugin. - * - * A plugin may optionally support this [[Query]]. - */ -final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] -object EventsByTag { - /** Java API */ - def create(tag: String): EventsByTag = EventsByTag(tag) - /** Java API */ - def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag) -} - -/** - * Event wrapper adding meta data for the events in the result stream of - * [[EventsByTag]] query, or similar queries. - */ -//#event-envelope -final case class EventEnvelope( - offset: Long, - persistenceId: String, - sequenceNr: Long, - event: Any) -//#event-envelope diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala new file mode 100644 index 0000000000..ee23f30353 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query + +/** + * A query plugin must implement a class that implements this trait. + * It provides the concrete implementations for the Java and Scala APIs. + * + * A read journal plugin must provide implementations for both + * `akka.persistence.query.scaladsl.ReadJournal` and `akka.persistence.query.javaadsl.ReadJournal`. + * The plugin must implement both the `scaladsl` and the `javadsl` traits because the + * `akka.stream.scaladsl.Source` and `akka.stream.javadsl.Source` are different types + * and even though those types can easily be converted to each other it is most convenient + * for the end user to get access to the Java or Scala `Source` directly. + * One of the implementations can delegate to the other. + * + */ +trait ReadJournalProvider { + /** + * The `ReadJournal` implementation for the Scala API. + * This corresponds to the instance that is returned by [[PersistenceQuery#readJournalFor]]. + */ + def scaladslReadJournal(): scaladsl.ReadJournal + + /** + * The `ReadJournal` implementation for the Java API. + * This corresponds to the instance that is returned by [[PersistenceQuery#getReadJournalFor]]. + */ + def javadslReadJournal(): javadsl.ReadJournal +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala new file mode 100644 index 0000000000..10bd575ff3 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait AllPersistenceIdsQuery extends ReadJournal { + + /** + * Query all `PersistentActor` identifiers, i.e. as defined by the + * `persistenceId` of the `PersistentActor`. + * + * The stream is not completed when it reaches the end of the currently used `persistenceIds`, + * but it continues to push new `persistenceIds` when new persistent actors are created. + * Corresponding query that is completed when it reaches the end of the currently + * currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]]. + */ + def allPersistenceIds(): Source[String, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByPersistenceIdQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByPersistenceIdQuery.scala new file mode 100644 index 0000000000..fd53a9a56b --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByPersistenceIdQuery.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait CurrentEventsByPersistenceIdQuery extends ReadJournal { + + /** + * Same type of query as [[EventsByPersistenceIdQuery#eventsByPersistenceId]] + * but the event stream is completed immediately when it reaches the end of + * the "result set". Events that are stored after the query is completed are + * not included in the event stream. + */ + def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala new file mode 100644 index 0000000000..1cb2658359 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait CurrentEventsByTagQuery extends ReadJournal { + + /** + * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] + +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala new file mode 100644 index 0000000000..107a55119a --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait CurrentPersistenceIdsQuery extends ReadJournal { + + /** + * Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream + * is completed immediately when it reaches the end of the "result set". Persistent + * actors that are created after the query is completed are not included in the stream. + */ + def currentPersistenceIds(): Source[String, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByPersistenceIdQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByPersistenceIdQuery.scala new file mode 100644 index 0000000000..553b7d5312 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByPersistenceIdQuery.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait EventsByPersistenceIdQuery extends ReadJournal { + + /** + * Query events for a specific `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. + * + * The returned event stream should be ordered by sequence number. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByPersistenceIdQuery#currentEventsByPersistenceId]]. + */ + def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala new file mode 100644 index 0000000000..f011d572bf --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +import akka.stream.javadsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this interface. + */ +trait EventsByTagQuery extends ReadJournal { + + /** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. + */ + def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala new file mode 100644 index 0000000000..b2edab6e07 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query.javadsl + +/** + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * There are a few pre-defined queries that a query implementation may implement, + * such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]] + * Implementation of these queries are optional and query (journal) plugins may define + * their own specialized queries by implementing other methods. + * + * Usage: + * {{{ + * SomeCoolReadJournal journal = + * PersistenceQuery.get(system).getReadJournalFor(SomeCoolReadJournal.class, queryPluginConfigPath); + * Source events = journal.eventsByTag("mytag", 0L); + * }}} + * + * For Scala API see [[akka.persistence.query.scaladsl.ReadJournal]]. + */ +trait ReadJournal + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala deleted file mode 100644 index aae17379cd..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.persistence.query.journal.leveldb - -import scala.concurrent.duration._ -import akka.actor.ExtendedActorSystem -import akka.persistence.query.EventsByPersistenceId -import akka.persistence.query.Hint -import akka.persistence.query.Query -import akka.persistence.query.scaladsl -import akka.serialization.SerializationExtension -import akka.stream.scaladsl.Source -import scala.concurrent.duration.FiniteDuration -import akka.persistence.query.NoRefresh -import akka.persistence.query.RefreshInterval -import com.typesafe.config.Config -import akka.persistence.query.EventEnvelope -import akka.persistence.query.AllPersistenceIds -import akka.persistence.query.EventsByTag -import akka.util.ByteString -import java.net.URLEncoder - -object LeveldbReadJournal { - /** - * The default identifier for [[LeveldbReadJournal]] to be used with - * [[akka.persistence.query.PersistenceQuery#readJournalFor]]. - * - * The value is `"akka.persistence.query.journal.leveldb"` and corresponds - * to the absolute path to the read journal configuration entry. - */ - final val Identifier = "akka.persistence.query.journal.leveldb" -} - -/** - * [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. - * - * It is retrieved with Scala API: - * {{{ - * val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) - * }}} - * - * or with Java API: - * {{{ - * ReadJournal queries = - * PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); - * }}} - * - * Configuration settings can be defined in the configuration section with the - * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` - * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. - * - * The following queries are supported. - * - * == EventsByPersistenceId == - * - * [[akka.persistence.query.EventsByPersistenceId]] is used for retrieving events for a specific - * `PersistentActor` identified by `persistenceId`. - * - * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` - * or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that - * the corresponding sequence number of each event is provided in the - * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the - * stream at a later point from a given sequence number. - * - * The returned event stream is ordered by sequence number, i.e. the same order as the - * `PersistentActor` persisted the events. The same prefix of stream elements (in same order) - * are returned for multiple executions of the query, except for when events have been deleted. - * - * The query supports two different completion modes: - *

    - *
  • The stream is not completed when it reaches the end of the currently stored events, - * but it continues to push new events when new events are persisted. This is the - * default mode that is used when no hints are given. It can also be specified with - * hint [[akka.persistence.query.RefreshInterval]].
  • - *
  • The stream is completed when it reaches the end of the currently stored events. - * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • - *
- * - * The LevelDB write journal is notifying the query side as soon as events are persisted, but for - * efficiency reasons the query side retrieves the events in batches that sometimes can - * be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]] - * hint. - * - * The stream is completed with failure if there is a failure in executing the query in the - * backend journal. - * - * == AllPersistenceIds == - * - * [[akka.persistence.query.AllPersistenceIds]] is used for retrieving all `persistenceIds` of all - * persistent actors. - * - * The returned event stream is unordered and you can expect different order for multiple - * executions of the query. - * - * The query supports two different completion modes: - *
    - *
  • The stream is not completed when it reaches the end of the currently used `persistenceIds`, - * but it continues to push new `persistenceIds` when new persistent actors are created. - * This is the default mode that is used when no hints are given. It can also be specified with - * hint [[akka.persistence.query.RefreshInterval]].
  • - *
  • The stream is completed when it reaches the end of the currently used `persistenceIds`. - * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • - *
- * - * The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are - * created and there is no periodic polling or batching involved in this query. - * - * The stream is completed with failure if there is a failure in executing the query in the - * backend journal. - * - * == EventsByTag == - * - * [[akka.persistence.query.EventsByTag]] is used for retrieving events that were marked with - * a given tag, e.g. all events of an Aggregate Root type. - * - * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events - * in a [[akka.persistence.journal.Tagged]] with the given `tags`. - * - * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all - * events with a given tag. The `offset` corresponds to an ordered sequence number for - * the specific tag. Note that the corresponding offset of each event is provided in the - * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the - * stream at a later point from a given offset. - * - * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` - * for each event. The `sequenceNr` is the sequence number for the persistent actor with the - * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique - * identifier for the event. - * - * The returned event stream is ordered by the offset (tag sequence number), which corresponds - * to the same order as the write journal stored the events. The same stream elements (in same order) - * are returned for multiple executions of the query. Deleted events are not deleted from the - * tagged event stream. - * - * The query supports two different completion modes: - *
    - *
  • The stream is not completed when it reaches the end of the currently stored events, - * but it continues to push new events when new events are persisted. This is the - * default mode that is used when no hints are given. It can also be specified with - * hint [[akka.persistence.query.RefreshInterval]].
  • - *
  • The stream is completed when it reaches the end of the currently stored events. - * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • - *
- * - * The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for - * efficiency reasons the query side retrieves the events in batches that sometimes can - * be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]] - * hint. - * - * The stream is completed with failure if there is a failure in executing the query in the - * backend journal. - */ -class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal { - - private val serialization = SerializationExtension(system) - private val defaulRefreshInterval: Option[FiniteDuration] = - Some(config.getDuration("refresh-interval", MILLISECONDS).millis) - private val writeJournalPluginId: String = config.getString("write-plugin") - private val maxBufSize: Int = config.getInt("max-buffer-size") - - override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { - case EventsByPersistenceId(pid, from, to) ⇒ eventsByPersistenceId(pid, from, to, hints) - case AllPersistenceIds ⇒ allPersistenceIds(hints) - case EventsByTag(tag, offset) ⇒ eventsByTag(tag, offset, hints) - case unknown ⇒ unsupportedQueryType(unknown) - } - - def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = { - Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr, - refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) - .named("eventsByPersistenceId-" + persistenceId) - } - - def allPersistenceIds(hints: Seq[Hint]): Source[String, Unit] = { - // no polling for this query, the write journal will push all changes, but - // we still use the `NoRefresh` hint as user API - val liveQuery = refreshInterval(hints).isDefined - Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery, maxBufSize, writeJournalPluginId)) - .mapMaterializedValue(_ ⇒ ()) - .named("allPersistenceIds") - } - - def eventsByTag(tag: String, offset: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = { - Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, - refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) - .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - } - - private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] = - if (hints.contains(NoRefresh)) - None - else - hints.collectFirst { case RefreshInterval(interval) ⇒ interval }.orElse(defaulRefreshInterval) - - private def unsupportedQueryType[M, T](unknown: Query[T, M]): Nothing = - throw new IllegalArgumentException(s"${getClass.getSimpleName} does not implement the ${unknown.getClass.getName} query type!") -} - diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala new file mode 100644 index 0000000000..9d9ff96708 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import akka.actor.ExtendedActorSystem +import akka.persistence.query.ReadJournalProvider +import com.typesafe.config.Config + +class LeveldbReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider { + + override val scaladslReadJournal: scaladsl.LeveldbReadJournal = + new scaladsl.LeveldbReadJournal(system, config) + + override val javadslReadJournal: javadsl.LeveldbReadJournal = + new javadsl.LeveldbReadJournal(scaladslReadJournal) + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala new file mode 100644 index 0000000000..e6a7e80a08 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb.javadsl + +import scala.concurrent.duration._ + +import akka.persistence.query.EventEnvelope +import akka.persistence.query.javadsl._ +import akka.stream.javadsl.Source + +/** + * Java API: [[akka.persistence.query.javadsl.ReadJournal]] implementation for LevelDB. + * + * It is retrieved with: + * {{{ + * LeveldbReadJournal queries = + * PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); + * }}} + * + * Corresponding Scala API is in [[akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]]. + * + * Configuration settings can be defined in the configuration section with the + * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` + * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. + * + */ +class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal) + extends ReadJournal + with AllPersistenceIdsQuery + with CurrentPersistenceIdsQuery + with EventsByPersistenceIdQuery + with CurrentEventsByPersistenceIdQuery + with EventsByTagQuery + with CurrentEventsByTagQuery { + + /** + * `allPersistenceIds` is used for retrieving all `persistenceIds` of all + * persistent actors. + * + * The returned event stream is unordered and you can expect different order for multiple + * executions of the query. + * + * The stream is not completed when it reaches the end of the currently used `persistenceIds`, + * but it continues to push new `persistenceIds` when new persistent actors are created. + * Corresponding query that is completed when it reaches the end of the currently + * currently used `persistenceIds` is provided by [[#currentPersistenceIds]]. + * + * The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are + * created and there is no periodic polling or batching involved in this query. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def allPersistenceIds(): Source[String, Unit] = + scaladslReadJournal.allPersistenceIds().asJava + + /** + * Same type of query as [[#allPersistenceIds]] but the stream + * is completed immediately when it reaches the end of the "result set". Persistent + * actors that are created after the query is completed are not included in the stream. + */ + override def currentPersistenceIds(): Source[String, Unit] = + scaladslReadJournal.currentPersistenceIds().asJava + + /** + * `eventsByPersistenceId` is used for retrieving events for a specific + * `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that + * the corresponding sequence number of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given sequence number. + * + * The returned event stream is ordered by sequence number, i.e. the same order as the + * `PersistentActor` persisted the events. The same prefix of stream elements (in same order) + * are returned for multiple executions of the query, except for when events have been deleted. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[#currentEventsByPersistenceId]]. + * + * The LevelDB write journal is notifying the query side as soon as events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval`. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] = + scaladslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava + + /** + * Same type of query as [[#eventsByPersistenceId]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + override def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] = + scaladslReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava + + /** + * `eventsByTag` is used for retrieving events that were marked with + * a given tag, e.g. all events of an Aggregate Root type. + * + * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events + * in a [[akka.persistence.journal.Tagged]] with the given `tags`. + * + * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all + * events with a given tag. The `offset` corresponds to an ordered sequence number for + * the specific tag. Note that the corresponding offset of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given offset. + * + * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` + * for each event. The `sequenceNr` is the sequence number for the persistent actor with the + * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique + * identifier for the event. + * + * The returned event stream is ordered by the offset (tag sequence number), which corresponds + * to the same order as the write journal stored the events. The same stream elements (in same order) + * are returned for multiple executions of the query. Deleted events are not deleted from the + * tagged event stream. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[#currentEventsByTag]]. + * + * The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval`. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] = + scaladslReadJournal.eventsByTag(tag, offset).asJava + + /** + * Same type of query as [[#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] = + scaladslReadJournal.currentEventsByTag(tag, offset).asJava +} + +object LeveldbReadJournal { + /** + * The default identifier for [[LeveldbReadJournal]] to be used with + * [[akka.persistence.query.PersistenceQuery#getReadJournalFor]]. + * + * The value is `"akka.persistence.query.journal.leveldb"` and corresponds + * to the absolute path to the read journal configuration entry. + */ + final val Identifier = akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal.Identifier +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala new file mode 100644 index 0000000000..8fe119d1ef --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb.scaladsl + +import java.net.URLEncoder + +import scala.concurrent.duration._ + +import akka.actor.ExtendedActorSystem +import akka.persistence.query.EventEnvelope +import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher +import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher +import akka.persistence.query.journal.leveldb.EventsByTagPublisher +import akka.persistence.query.scaladsl._ +import akka.persistence.query.scaladsl.ReadJournal +import akka.serialization.SerializationExtension +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.typesafe.config.Config + +/** + * Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. + * + * It is retrieved with: + * {{{ + * val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) + * }}} + * + * Corresponding Java API is in [[akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]]. + * + * Configuration settings can be defined in the configuration section with the + * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` + * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. + */ +class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal + with AllPersistenceIdsQuery + with CurrentPersistenceIdsQuery + with EventsByPersistenceIdQuery + with CurrentEventsByPersistenceIdQuery + with EventsByTagQuery + with CurrentEventsByTagQuery { + + private val serialization = SerializationExtension(system) + private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis) + private val writeJournalPluginId: String = config.getString("write-plugin") + private val maxBufSize: Int = config.getInt("max-buffer-size") + + /** + * `allPersistenceIds` is used for retrieving all `persistenceIds` of all + * persistent actors. + * + * The returned event stream is unordered and you can expect different order for multiple + * executions of the query. + * + * The stream is not completed when it reaches the end of the currently used `persistenceIds`, + * but it continues to push new `persistenceIds` when new persistent actors are created. + * Corresponding query that is completed when it reaches the end of the currently + * currently used `persistenceIds` is provided by [[#currentPersistenceIds]]. + * + * The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are + * created and there is no periodic polling or batching involved in this query. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def allPersistenceIds(): Source[String, Unit] = { + // no polling for this query, the write journal will push all changes, i.e. + // no refreshInterval + Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId)) + .mapMaterializedValue(_ ⇒ ()) + .named("allPersistenceIds") + } + + /** + * Same type of query as [[#allPersistenceIds]] but the stream + * is completed immediately when it reaches the end of the "result set". Persistent + * actors that are created after the query is completed are not included in the stream. + */ + override def currentPersistenceIds(): Source[String, Unit] = { + Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = false, maxBufSize, writeJournalPluginId)) + .mapMaterializedValue(_ ⇒ ()) + .named("currentPersistenceIds") + } + + /** + * `eventsByPersistenceId` is used for retrieving events for a specific + * `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that + * the corresponding sequence number of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given sequence number. + * + * The returned event stream is ordered by sequence number, i.e. the same order as the + * `PersistentActor` persisted the events. The same prefix of stream elements (in same order) + * are returned for multiple executions of the query, except for when events have been deleted. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[#currentEventsByPersistenceId]]. + * + * The LevelDB write journal is notifying the query side as soon as events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval`. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, + toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSequenceNr, toSequenceNr, + refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("eventsByPersistenceId-" + persistenceId) + } + + /** + * Same type of query as [[#eventsByPersistenceId]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + override def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, + toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSequenceNr, toSequenceNr, + None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("currentEventsByPersistenceId-" + persistenceId) + } + + /** + * `eventsByTag` is used for retrieving events that were marked with + * a given tag, e.g. all events of an Aggregate Root type. + * + * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events + * in a [[akka.persistence.journal.Tagged]] with the given `tags`. + * + * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all + * events with a given tag. The `offset` corresponds to an ordered sequence number for + * the specific tag. Note that the corresponding offset of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given offset. + * + * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` + * for each event. The `sequenceNr` is the sequence number for the persistent actor with the + * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique + * identifier for the event. + * + * The returned event stream is ordered by the offset (tag sequence number), which corresponds + * to the same order as the write journal stored the events. The same stream elements (in same order) + * are returned for multiple executions of the query. Deleted events are not deleted from the + * tagged event stream. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[#currentEventsByTag]]. + * + * The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval`. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ + override def eventsByTag(tag: String, offset: Long = 0L): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, + refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + } + + /** + * Same type of query as [[#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + override def currentEventsByTag(tag: String, offset: Long = 0L): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, + None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + } + +} + +object LeveldbReadJournal { + /** + * The default identifier for [[LeveldbReadJournal]] to be used with + * [[akka.persistence.query.PersistenceQuery#readJournalFor]]. + * + * The value is `"akka.persistence.query.journal.leveldb"` and corresponds + * to the absolute path to the read journal configuration entry. + */ + final val Identifier = "akka.persistence.query.journal.leveldb" +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala new file mode 100644 index 0000000000..cc4e72bfb5 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait AllPersistenceIdsQuery extends ReadJournal { + + /** + * Query all `PersistentActor` identifiers, i.e. as defined by the + * `persistenceId` of the `PersistentActor`. + * + * The stream is not completed when it reaches the end of the currently used `persistenceIds`, + * but it continues to push new `persistenceIds` when new persistent actors are created. + * Corresponding query that is completed when it reaches the end of the currently + * currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]]. + */ + def allPersistenceIds(): Source[String, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByPersistenceIdQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByPersistenceIdQuery.scala new file mode 100644 index 0000000000..3f35391aab --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByPersistenceIdQuery.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait CurrentEventsByPersistenceIdQuery extends ReadJournal { + + /** + * Same type of query as [[EventsByPersistenceIdQuery#eventsByPersistenceId]] + * but the event stream is completed immediately when it reaches the end of + * the "result set". Events that are stored after the query is completed are + * not included in the event stream. + */ + def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala new file mode 100644 index 0000000000..ce825bf7cd --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait CurrentEventsByTagQuery extends ReadJournal { + + /** + * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] + +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala new file mode 100644 index 0000000000..f355b3aa06 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait CurrentPersistenceIdsQuery extends ReadJournal { + + /** + * Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream + * is completed immediately when it reaches the end of the "result set". Persistent + * actors that are created after the query is completed are not included in the stream. + */ + def currentPersistenceIds(): Source[String, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByPersistenceIdQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByPersistenceIdQuery.scala new file mode 100644 index 0000000000..1c2e473bbc --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByPersistenceIdQuery.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait EventsByPersistenceIdQuery extends ReadJournal { + + /** + * Query events for a specific `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. + * + * The returned event stream should be ordered by sequence number. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByPersistenceIdQuery#currentEventsByPersistenceId]]. + */ + def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, Unit] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala new file mode 100644 index 0000000000..a15e676b20 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.scaladsl + +import akka.stream.scaladsl.Source +import akka.persistence.query.EventEnvelope + +/** + * A plugin may optionally support this query by implementing this trait. + */ +trait EventsByTagQuery extends ReadJournal { + + /** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. + */ + def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, Unit] + +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala index 936aadb7d9..395454cbdf 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -1,12 +1,8 @@ /* * Copyright (C) 2009-2015 Typesafe Inc. */ - package akka.persistence.query.scaladsl -import akka.persistence.query.{ Hint, Query } -import akka.stream.scaladsl.Source - /** * API for reading persistent events and information derived * from stored persistent events. @@ -15,30 +11,18 @@ import akka.stream.scaladsl.Source * journal implementations, because the technical capabilities may be very different. * The interface is very open so that different journals may implement specific queries. * + * There are a few pre-defined queries that a query implementation may implement, + * such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]] + * Implementation of these queries are optional and query (journal) plugins may define + * their own specialized queries by implementing other methods. + * * Usage: * {{{ - * val journal = PersistenceQuery(system).readJournalFor(queryPluginConfigPath) + * val journal = PersistenceQuery(system).readJournalFor[SomeCoolReadJournal](queryPluginConfigPath) * val events = journal.query(EventsByTag("mytag", 0L)) * }}} * * For Java API see [[akka.persistence.query.javadsl.ReadJournal]]. */ -abstract class ReadJournal { +trait ReadJournal - /** - * A query that returns a `Source` with output type `T` and materialized - * value `M`. - * - * The `hints` are optional parameters that defines how to execute the - * query, typically specific to the journal implementation. - * - */ - def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] - -} - -/** INTERNAL API */ -private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal { - override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - backing.query(q, hints: _*).asScala -} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java similarity index 52% rename from akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java rename to akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java index 1b0050cd81..fcab48ffd7 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java @@ -4,29 +4,25 @@ package akka.persistence.query; -import akka.persistence.query.javadsl.ReadJournal; -import akka.stream.javadsl.Source; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import scala.runtime.BoxedUnit; import java.util.Iterator; +import akka.persistence.query.javadsl.AllPersistenceIdsQuery; +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; + /** * Use for tests only! * Emits infinite stream of strings (representing queried for events). */ -class MockJavaReadJournal implements ReadJournal { - public static final String Identifier = "akka.persistence.query.journal.mock-java"; +public class DummyJavaReadJournal implements ReadJournal, AllPersistenceIdsQuery { + public static final String Identifier = "akka.persistence.query.journal.dummy-java"; - public static final Config config = ConfigFactory.parseString( - Identifier + " { \n" + - " class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" + - " }\n\n"); @Override - @SuppressWarnings("unchecked") - public Source query(Query q, Hint... hints) { - return (Source) Source.fromIterator(() -> new Iterator() { + public Source allPersistenceIds() { + return Source.fromIterator(() -> new Iterator() { private int i = 0; @Override public boolean hasNext() { return true; } @@ -35,4 +31,5 @@ class MockJavaReadJournal implements ReadJournal { } }); } -} \ No newline at end of file +} + diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java new file mode 100644 index 0000000000..2bfa9c66d4 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import scala.runtime.BoxedUnit; + + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +public class DummyJavaReadJournalForScala implements akka.persistence.query.scaladsl.ReadJournal, + akka.persistence.query.scaladsl.AllPersistenceIdsQuery { + + public static final String Identifier = DummyJavaReadJournal.Identifier; + + private final DummyJavaReadJournal readJournal; + + public DummyJavaReadJournalForScala(DummyJavaReadJournal readJournal) { + this.readJournal = readJournal; + } + + @Override + public akka.stream.scaladsl.Source allPersistenceIds() { + return readJournal.allPersistenceIds().asScala(); + } + +} diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalProvider.java b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalProvider.java new file mode 100644 index 0000000000..6bce25aa3a --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalProvider.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class DummyJavaReadJournalProvider implements ReadJournalProvider { + + public static final Config config = ConfigFactory.parseString(DummyJavaReadJournal.Identifier + " { \n" + + " class = \"" + DummyJavaReadJournalProvider.class.getCanonicalName() + "\" \n" + + " }\n\n"); + + private final DummyJavaReadJournal readJournal = new DummyJavaReadJournal(); + + @Override + public DummyJavaReadJournalForScala scaladslReadJournal() { + return new DummyJavaReadJournalForScala(readJournal); + } + + @Override + public DummyJavaReadJournal javadslReadJournal() { + return readJournal; + } + +} diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java index 2ab31f8301..2da3884560 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -5,7 +5,6 @@ package akka.persistence.query; import akka.actor.ActorSystem; -import akka.persistence.query.javadsl.ReadJournal; import akka.testkit.AkkaJUnitActorSystemResource; import org.junit.ClassRule; import scala.runtime.BoxedUnit; @@ -18,12 +17,11 @@ public class PersistenceQueryTest { private final ActorSystem system = actorSystemResource.getSystem(); - private final Hint hint = NoRefresh.getInstance(); - // compile-only test @SuppressWarnings("unused") public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception { - final ReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor("noop-journal"); - final akka.stream.javadsl.Source tag = readJournal.query(new EventsByTag("tag", 0L), hint, hint); // java varargs + final DummyJavaReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class, + "noop-journal"); + final akka.stream.javadsl.Source ids = readJournal.allPersistenceIds(); } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala new file mode 100644 index 0000000000..480420a102 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import akka.stream.scaladsl.Source +import com.typesafe.config.{ Config, ConfigFactory } +import scala.runtime.BoxedUnit + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.AllPersistenceIdsQuery { + override def allPersistenceIds(): Source[String, Unit] = + Source(() ⇒ Iterator.from(0)).map(_.toString) +} + +object DummyReadJournal { + final val Identifier = "akka.persistence.query.journal.dummy" +} + +class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.ReadJournal with javadsl.AllPersistenceIdsQuery { + override def allPersistenceIds(): akka.stream.javadsl.Source[String, Unit] = + readJournal.allPersistenceIds().asJava +} + +object DummyReadJournalProvider { + final val config: Config = ConfigFactory.parseString( + s""" + |${DummyReadJournal.Identifier} { + | class = "${classOf[DummyReadJournalProvider].getCanonicalName}" + |} + """.stripMargin) +} + +class DummyReadJournalProvider extends ReadJournalProvider { + + override val scaladslReadJournal: DummyReadJournal = + new DummyReadJournal + + override val javadslReadJournal: DummyReadJournalForJava = + new DummyReadJournalForJava(scaladslReadJournal) +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala deleted file mode 100644 index ea813a1cd5..0000000000 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.query - -import akka.stream.scaladsl.Source -import com.typesafe.config.{ Config, ConfigFactory } - -/** - * Use for tests only! - * Emits infinite stream of strings (representing queried for events). - */ -class MockReadJournal extends scaladsl.ReadJournal { - override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - Source(() ⇒ Iterator.from(0)).map(_.toString).asInstanceOf[Source[T, M]] -} - -object MockReadJournal { - final val Identifier = "akka.persistence.query.journal.mock" - - final val config: Config = ConfigFactory.parseString( - s""" - |$Identifier { - | class = "${classOf[MockReadJournal].getCanonicalName}" - |} - """.stripMargin) -} \ No newline at end of file diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index 5e3aeff7d5..be0714963e 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -15,11 +15,9 @@ import scala.concurrent.duration._ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll { - val anything: Query[String, _] = null - val eventAdaptersConfig = s""" - |akka.persistence.query.journal.mock { + |akka.persistence.query.journal.dummy { | event-adapters { | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} | } @@ -29,35 +27,25 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte "ReadJournal" must { "be found by full config key" in { withActorSystem() { system ⇒ - PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier) + PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier) } } "throw if unable to find query journal by config key" in { withActorSystem() { system ⇒ intercept[IllegalArgumentException] { - PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier + "-unknown") + PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "-unknown") }.getMessage should include("missing persistence read journal") } } - "expose scaladsl implemented journal as javadsl.ReadJournal" in { - withActorSystem() { system ⇒ - val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier) - } - } - "expose javadsl implemented journal as scaladsl.ReadJournal" in { - withActorSystem() { system ⇒ - val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier) - } - } } private val systemCounter = new AtomicInteger() private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { val config = - MockReadJournal.config - .withFallback(MockJavaReadJournal.config) + DummyReadJournalProvider.config + .withFallback(DummyJavaReadJournalProvider.config) .withFallback(ConfigFactory.parseString(conf)) .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) .withFallback(ConfigFactory.load()) @@ -75,3 +63,4 @@ object ExampleQueryModels { class PrefixStringWithPAdapter extends ReadEventAdapter { override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event) } + diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala index e27758a31c..f8db15ea33 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala @@ -4,18 +4,14 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.persistence.query.EventsByPersistenceId + import akka.persistence.query.PersistenceQuery -import akka.persistence.query.RefreshInterval +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.query.scaladsl.AllPersistenceIdsQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.ImplicitSender -import akka.testkit.TestKit -import akka.persistence.query.NoRefresh import akka.testkit.AkkaSpec -import akka.persistence.query.AllPersistenceIds +import akka.testkit.ImplicitSender object AllPersistenceIdsSpec { val config = """ @@ -32,9 +28,14 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) implicit val mat = ActorMaterializer()(system) - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) "Leveldb query AllPersistenceIds" must { + + "implement standard AllPersistenceIdsQuery" in { + queries.isInstanceOf[AllPersistenceIdsQuery] should ===(true) + } + "find existing persistenceIds" in { system.actorOf(TestActor.props("a")) ! "a1" expectMsg("a1-done") @@ -43,7 +44,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) system.actorOf(TestActor.props("c")) ! "c1" expectMsg("c1-done") - val src = queries.query(AllPersistenceIds, NoRefresh) + val src = queries.currentPersistenceIds() src.runWith(TestSink.probe[String]) .request(5) .expectNextUnordered("a", "b", "c") @@ -55,7 +56,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) system.actorOf(TestActor.props("d")) ! "d1" expectMsg("d1-done") - val src = queries.query(AllPersistenceIds) + val src = queries.allPersistenceIds() val probe = src.runWith(TestSink.probe[String]) .request(5) .expectNextUnorderedN(List("a", "b", "c", "d")) diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 03f4327594..c842d37c30 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -4,17 +4,15 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ + import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.persistence.query.EventsByPersistenceId import akka.persistence.query.PersistenceQuery -import akka.persistence.query.RefreshInterval +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.ImplicitSender -import akka.testkit.TestKit -import akka.persistence.query.NoRefresh import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender object EventsByPersistenceIdSpec { val config = """ @@ -22,6 +20,7 @@ object EventsByPersistenceIdSpec { akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec" akka.test.single-expect-default = 10s + akka.persistence.query.journal.leveldb.refresh-interval = 1s """ } @@ -31,9 +30,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi implicit val mat = ActorMaterializer()(system) - val refreshInterval = RefreshInterval(1.second) - - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) def setup(persistenceId: String): ActorRef = { val ref = system.actorOf(TestActor.props(persistenceId)) @@ -47,10 +44,15 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi } "Leveldb query EventsByPersistenceId" must { + + "implement standard EventsByTagQuery" in { + queries.isInstanceOf[EventsByTagQuery] should ===(true) + } + "find existing events" in { val ref = setup("a") - val src = queries.query(EventsByPersistenceId("a", 0L, Long.MaxValue), NoRefresh) + val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue) src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("a-1", "a-2") @@ -62,7 +64,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "find existing events up to a sequence number" in { val ref = setup("b") - val src = queries.query(EventsByPersistenceId("b", 0L, 2L), NoRefresh) + val src = queries.currentEventsByPersistenceId("b", 0L, 2L) src.map(_.event).runWith(TestSink.probe[Any]) .request(5) .expectNext("b-1", "b-2") @@ -71,7 +73,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "not see new events after demand request" in { val ref = setup("f") - val src = queries.query(EventsByPersistenceId("f", 0L, Long.MaxValue), NoRefresh) + val src = queries.currentEventsByPersistenceId("f", 0L, Long.MaxValue) val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("f-1", "f-2") @@ -91,7 +93,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "Leveldb live query EventsByPersistenceId" must { "find new events" in { val ref = setup("c") - val src = queries.query(EventsByPersistenceId("c", 0L, Long.MaxValue), refreshInterval) + val src = queries.eventsByPersistenceId("c", 0L, Long.MaxValue) val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(5) .expectNext("c-1", "c-2", "c-3") @@ -104,7 +106,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "find new events up to a sequence number" in { val ref = setup("d") - val src = queries.query(EventsByPersistenceId("d", 0L, 4L), refreshInterval) + val src = queries.eventsByPersistenceId("d", 0L, 4L) val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(5) .expectNext("d-1", "d-2", "d-3") @@ -117,7 +119,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "find new events after demand request" in { val ref = setup("e") - val src = queries.query(EventsByPersistenceId("e", 0L, Long.MaxValue), refreshInterval) + val src = queries.eventsByPersistenceId("e", 0L, Long.MaxValue) val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("e-1", "e-2") diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index c9feba7f0e..278785c6b9 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -4,22 +4,17 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ -import akka.actor.ActorRef -import akka.actor.ActorSystem + +import akka.persistence.journal.Tagged +import akka.persistence.journal.WriteEventAdapter +import akka.persistence.query.EventEnvelope import akka.persistence.query.PersistenceQuery -import akka.persistence.query.RefreshInterval +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.ImplicitSender -import akka.testkit.TestKit -import akka.persistence.query.NoRefresh import akka.testkit.AkkaSpec -import akka.persistence.query.EventsByTag -import akka.persistence.journal.Tagged -import akka.persistence.journal.EventSeq -import akka.persistence.journal.EventAdapter -import akka.persistence.query.EventEnvelope -import akka.persistence.journal.WriteEventAdapter +import akka.testkit.ImplicitSender object EventsByTagSpec { val config = """ @@ -34,6 +29,7 @@ object EventsByTagSpec { "java.lang.String" = color-tagger } } + akka.persistence.query.journal.leveldb.refresh-interval = 1s akka.test.single-expect-default = 10s """ @@ -58,11 +54,13 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) implicit val mat = ActorMaterializer()(system) - val refreshInterval = RefreshInterval(1.second) - - val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) "Leveldb query EventsByTag" must { + "implement standard EventsByTagQuery" in { + queries.isInstanceOf[EventsByTagQuery] should ===(true) + } + "find existing events" in { val a = system.actorOf(TestActor.props("a")) val b = system.actorOf(TestActor.props("b")) @@ -77,7 +75,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) b ! "a green leaf" expectMsg(s"a green leaf-done") - val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = 0L) greenSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) @@ -87,7 +85,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) .expectComplete() - val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), NoRefresh) + val blackSrc = queries.currentEventsByTag(tag = "black", offset = 0L) blackSrc.runWith(TestSink.probe[Any]) .request(5) .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) @@ -97,7 +95,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "not see new events after demand request" in { val c = system.actorOf(TestActor.props("c")) - val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = 0L) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) @@ -115,7 +113,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) } "find events from offset" in { - val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L), NoRefresh) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = 2L) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) @@ -129,7 +127,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "find new events" in { val d = system.actorOf(TestActor.props("d")) - val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), refreshInterval) + val blackSrc = queries.eventsByTag(tag = "black", offset = 0L) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) @@ -148,7 +146,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) } "find events from offset" in { - val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L)) + val greenSrc = queries.eventsByTag(tag = "green", offset = 2L) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala index 65709fdbc6..16fe5f3117 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala @@ -86,11 +86,11 @@ object PersistentActorStashingSpec { class AsyncStashingPersistentActor(name: String) extends ExamplePersistentActor(name) { var stashed = false val receiveCommand: Receive = commonBehavior orElse { - case Cmd("a") ⇒ persistAsync(Evt("a"))(updateState) + case Cmd("a") ⇒ persistAsync(Evt("a"))(updateState) case Cmd("b") if !stashed ⇒ stash(); stashed = true - case Cmd("b") ⇒ persistAsync(Evt("b"))(updateState) - case Cmd("c") ⇒ persistAsync(Evt("c"))(updateState); unstashAll() + case Cmd("b") ⇒ persistAsync(Evt("b"))(updateState) + case Cmd("c") ⇒ persistAsync(Evt("c"))(updateState); unstashAll() } }