diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java new file mode 100644 index 0000000000..01638e2cf1 --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.persistence.query; + +import java.util.HashSet; +import java.util.Set; +import scala.runtime.BoxedUnit; + +import akka.actor.ActorSystem; +import akka.persistence.journal.WriteEventAdapter; +import akka.persistence.journal.EventSeq; +import akka.persistence.journal.leveldb.Tagged; +import akka.persistence.query.AllPersistenceIds; +import akka.persistence.query.EventEnvelope; +import akka.persistence.query.EventsByPersistenceId; +import akka.persistence.query.EventsByTag; +import akka.persistence.query.PersistenceQuery; +import akka.persistence.query.javadsl.ReadJournal; +import akka.persistence.query.journal.leveldb.LeveldbReadJournal; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Source; + +public class LeveldbPersistenceQueryDocTest { + + final ActorSystem system = ActorSystem.create(); + + public void demonstrateReadJournal() { + //#get-read-journal + final ActorMaterializer mat = ActorMaterializer.create(system); + + ReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + //#get-read-journal + } + + public void demonstrateEventsByPersistenceId() { + //#EventsByPersistenceId + ReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + + Source source = + queries.query(EventsByPersistenceId.create("some-persistence-id", 0, Long.MAX_VALUE)); + //#EventsByPersistenceId + } + + public void demonstrateAllPersistenceIds() { + //#AllPersistenceIds + ReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + + Source source = + queries.query(AllPersistenceIds.getInstance()); + //#AllPersistenceIds + } + + public void demonstrateEventsByTag() { + //#EventsByTag + ReadJournal queries = + PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + + Source source = + queries.query(EventsByTag.create("green", 0)); + //#EventsByTag + } + + static + //#tagger + public class MyEventAdapter implements WriteEventAdapter { + + @Override + public Object toJournal(Object event) { + if (event instanceof String) { + String s = (String) event; + Set tags = new HashSet(); + if (s.contains("green")) tags.add("green"); + if (s.contains("black")) tags.add("black"); + if (s.contains("blue")) tags.add("blue"); + if (tags.isEmpty()) + return event; + else + return new Tagged(event, tags); + } else { + return event; + } + } + + @Override + public String manifest(Object event) { + return ""; + } + } + //#tagger +} diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 173a0fd58e..7d82c3f873 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -13,4 +13,5 @@ Actors fsm persistence persistence-query + persistence-query-leveldb testing diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 34e7c48dfe..0f568410b0 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -331,7 +331,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u persistent actors may call the ``deleteMessages`` method. Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with -:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)`` up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, while still having access to the accumulated state during replays - by loading the snapshot. diff --git a/akka-docs/rst/java/persistence-query-leveldb.rst b/akka-docs/rst/java/persistence-query-leveldb.rst new file mode 100644 index 0000000000..57621d5830 --- /dev/null +++ b/akka-docs/rst/java/persistence-query-leveldb.rst @@ -0,0 +1,161 @@ +.. _persistence-query-leveldb-java: + +############################# +Persistence Query for LevelDB +############################# + +This is documentation for the LevelDB implementation of the :ref:`persistence-query-java` API. +Note that implementations for other journals may have different semantics. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence LevelDB query implementation is bundled in the ``akka-persistence-query-experimental`` artifact. +Make sure that you have the following dependency in your project:: + + + com.typesafe.akka + akka-persistence-query-experimental_@binVersion@ + @version@ + + + +How to get the ReadJournal +========================== + +The ``ReadJournal`` is retrieved via the ``akka.persistence.query.PersistenceQuery`` +extension: + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#get-read-journal + +Supported Queries +================= + +EventsByPersistenceId +--------------------- + +``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` +identified by ``persistenceId``. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByPersistenceId + +You can retrieve a subset of all events by specifying ``fromSequenceNr`` and ``toSequenceNr`` +or use ``0L`` and ``Long.MAX_VALUE`` respectively to retrieve all events. Note that +the corresponding sequence number of each event is provided in the ``EventEnvelope``, +which makes it possible to resume the stream at a later point from a given sequence number. + +The returned event stream is ordered by sequence number, i.e. the same order as the +``PersistentActor`` persisted the events. The same prefix of stream elements (in same order) +are returned for multiple executions of the query, except for when events have been deleted. + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently stored events, + but it continues to push new events when new events are persisted. This is the + default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently stored events. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as events are persisted, but for +efficiency reasons the query side retrieves the events in batches that sometimes can +be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval`` +hint. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +AllPersistenceIds +----------------- + +``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#AllPersistenceIds + +The returned event stream is unordered and you can expect different order for multiple +executions of the query. + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently used ``persistenceIds``, + but it continues to push new ``persistenceIds`` when new persistent actors are created. + This is the default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently used ``persistenceIds``. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are +created and there is no periodic polling or batching involved in this query. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +EventsByTag +----------- + +``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g. +all domain events of an Aggregate Root type. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag + +To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged`` +with the given ``tags``. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger + +You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all +events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag. +Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible +to resume the stream at a later point from a given offset. + +In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr`` +for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the +``persistenceId`` that persisted the event. The ``persistenceId`` + ``sequenceNr`` is an unique +identifier for the event. + +The returned event stream is ordered by the offset (tag sequence number), which corresponds +to the same order as the write journal stored the events. The same stream elements (in same order) +are returned for multiple executions of the query. Deleted events are not deleted from the +tagged event stream. + +.. note:: + + Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream". + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently stored events, + but it continues to push new events when new events are persisted. This is the + default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently stored events. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for +efficiency reasons the query side retrieves the events in batches that sometimes can +be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval`` +hint. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +Configuration +============= + +Configuration settings can be defined in the configuration section with the +absolute path corresponding to the identifier, which is ``"akka.persistence.query.journal.leveldb"`` +for the default ``LeveldbReadJournal.Identifier``. + +It can be configured with the following properties: + +.. includecode:: ../../../akka-persistence-query/src/main/resources/reference.conf#query-leveldb diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index f373b99938..c1e98e27ae 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -26,7 +26,11 @@ Dependencies Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: - "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + + com.typesafe.akka + akka-persistence-query-experimental_@binVersion@ + @version@ + Design overview =============== @@ -201,6 +205,8 @@ Query plugins Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. +The plugin for LevelDB is described in :ref:`persistence-query-leveldb-java`. + This section aims to provide tips and guide plugin developers through implementing a custom query plugin. Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 80cadedf29..ca17bf366d 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -334,7 +334,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u persistent actors may call the ``deleteMessages`` method. Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with -:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)`` up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, while still having access to the accumulated state during replays - by loading the snapshot. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala new file mode 100644 index 0000000000..e6f0573362 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.persistence.query + +import akka.persistence.journal.{ EventAdapter, EventSeq } +import akka.testkit.AkkaSpec +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.journal.leveldb.LeveldbReadJournal +import akka.persistence.journal.leveldb.Tagged +import akka.persistence.query.EventsByPersistenceId +import akka.stream.scaladsl.Source +import akka.persistence.query.EventEnvelope +import akka.stream.ActorMaterializer +import akka.persistence.query.AllPersistenceIds +import akka.persistence.query.EventsByTag +import scala.annotation.tailrec + +object LeveldbPersistenceQueryDocSpec { + //#tagger + import akka.persistence.journal.WriteEventAdapter + import akka.persistence.journal.leveldb.Tagged + + class ColorTagger extends WriteEventAdapter { + val colors = Set("green", "black", "blue") + override def toJournal(event: Any): Any = event match { + case s: String ⇒ + var tags = colors.foldLeft(Set.empty[String]) { (acc, c) ⇒ + if (s.contains(c)) acc + c else acc + } + if (tags.isEmpty) event + else Tagged(event, tags) + case _ ⇒ event + } + + override def manifest(event: Any): String = "" + } + //#tagger +} + +class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { + + def this() = this("") + + "LeveldbPersistentQuery" must { + "demonstrate how get ReadJournal" in { + //#get-read-journal + import akka.persistence.query.PersistenceQuery + import akka.persistence.query.journal.leveldb.LeveldbReadJournal + + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + //#get-read-journal + } + + "demonstrate EventsByPersistenceId" in { + //#EventsByPersistenceId + import akka.persistence.query.EventsByPersistenceId + implicit val mat = ActorMaterializer()(system) + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + val src: Source[EventEnvelope, Unit] = + queries.query(EventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)) + + val events: Source[Any, Unit] = src.map(_.event) + //#EventsByPersistenceId + } + + "demonstrate AllPersistenceIds" in { + //#AllPersistenceIds + import akka.persistence.query.AllPersistenceIds + implicit val mat = ActorMaterializer()(system) + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + val src: Source[String, Unit] = queries.query(AllPersistenceIds) + //#AllPersistenceIds + } + + "demonstrate EventsByTag" in { + //#EventsByTag + import akka.persistence.query.EventsByTag + implicit val mat = ActorMaterializer()(system) + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + val src: Source[EventEnvelope, Unit] = + queries.query(EventsByTag(tag = "green", offset = 0L)) + //#EventsByTag + } + + } + +} diff --git a/akka-docs/rst/scala/index-actors.rst b/akka-docs/rst/scala/index-actors.rst index 31babc0126..cc92986f34 100644 --- a/akka-docs/rst/scala/index-actors.rst +++ b/akka-docs/rst/scala/index-actors.rst @@ -14,6 +14,7 @@ Actors persistence persistence-schema-evolution persistence-query + persistence-query-leveldb testing actordsl typed-actors diff --git a/akka-docs/rst/scala/persistence-query-leveldb.rst b/akka-docs/rst/scala/persistence-query-leveldb.rst new file mode 100644 index 0000000000..ca1e801afb --- /dev/null +++ b/akka-docs/rst/scala/persistence-query-leveldb.rst @@ -0,0 +1,156 @@ +.. _persistence-query-leveldb-scala: + +############################# +Persistence Query for LevelDB +############################# + +This is documentation for the LevelDB implementation of the :ref:`persistence-query-scala` API. +Note that implementations for other journals may have different semantics. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence LevelDB query implementation is bundled in the ``akka-persistence-query-experimental`` artifact. +Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +How to get the ReadJournal +========================== + +The ``ReadJournal`` is retrieved via the ``akka.persistence.query.PersistenceQuery`` +extension: + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#get-read-journal + +Supported Queries +================= + +EventsByPersistenceId +--------------------- + +``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor`` +identified by ``persistenceId``. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByPersistenceId + +You can retrieve a subset of all events by specifying ``fromSequenceNr`` and ``toSequenceNr`` +or use ``0L`` and ``Long.MaxValue`` respectively to retrieve all events. Note that +the corresponding sequence number of each event is provided in the ``EventEnvelope``, +which makes it possible to resume the stream at a later point from a given sequence number. + +The returned event stream is ordered by sequence number, i.e. the same order as the +``PersistentActor`` persisted the events. The same prefix of stream elements (in same order) +are returned for multiple executions of the query, except for when events have been deleted. + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently stored events, + but it continues to push new events when new events are persisted. This is the + default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently stored events. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as events are persisted, but for +efficiency reasons the query side retrieves the events in batches that sometimes can +be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval`` +hint. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +AllPersistenceIds +----------------- + +``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#AllPersistenceIds + +The returned event stream is unordered and you can expect different order for multiple +executions of the query. + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently used ``persistenceIds``, + but it continues to push new ``persistenceIds`` when new persistent actors are created. + This is the default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently used ``persistenceIds``. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are +created and there is no periodic polling or batching involved in this query. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +EventsByTag +----------- + +``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g. +all domain events of an Aggregate Root type. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByTag + +To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged`` +with the given ``tags``. + +.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger + +You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all +events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag. +Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible +to resume the stream at a later point from a given offset. + +In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr`` +for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the +``persistenceId`` that persisted the event. The ``persistenceId`` + ``sequenceNr`` is an unique +identifier for the event. + +The returned event stream is ordered by the offset (tag sequence number), which corresponds +to the same order as the write journal stored the events. The same stream elements (in same order) +are returned for multiple executions of the query. Deleted events are not deleted from the +tagged event stream. + +.. note:: + + Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream". + +The query supports two different completion modes: + +* The stream is not completed when it reaches the end of the currently stored events, + but it continues to push new events when new events are persisted. This is the + default mode that is used when no hints are given. It can also be specified with + hint ``RefreshInterval``. + +* The stream is completed when it reaches the end of the currently stored events. + This mode is specified with hint ``NoRefresh``. + +The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for +efficiency reasons the query side retrieves the events in batches that sometimes can +be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval`` +hint. + +The stream is completed with failure if there is a failure in executing the query in the +backend journal. + +Configuration +============= + +Configuration settings can be defined in the configuration section with the +absolute path corresponding to the identifier, which is ``"akka.persistence.query.journal.leveldb"`` +for the default ``LeveldbReadJournal.Identifier``. + +It can be configured with the following properties: + +.. includecode:: ../../../akka-persistence-query/src/main/resources/reference.conf#query-leveldb diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 2dce052344..8e1a75106b 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -199,6 +199,8 @@ Query plugins Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. +The plugin for LevelDB is described in :ref:`persistence-query-leveldb-scala`. + This section aims to provide tips and guide plugin developers through implementing a custom query plugin. Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index eeb10e3e4d..61229da62f 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -321,7 +321,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u persistent actors may call the ``deleteMessages`` method. Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with -:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)`` +:ref:`snapshotting `, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)`` up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events, while still having access to the accumulated state during replays - by loading the snapshot. diff --git a/akka-persistence-query/src/main/resources/reference.conf b/akka-persistence-query/src/main/resources/reference.conf index 5a1d70a679..18e0a0f2d7 100644 --- a/akka-persistence-query/src/main/resources/reference.conf +++ b/akka-persistence-query/src/main/resources/reference.conf @@ -5,26 +5,25 @@ # This is the reference config file that contains all the default settings. # Make your edits in your application.conf in order to override these settings. -akka.persistence.query { - journal { - leveldb { - class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal" - - # Absolute path to the write journal plugin configuration entry that this query journal - # will connect to. That must be a LeveldbJournal or SharedLeveldbJournal. - # If undefined (or "") it will connect to the default journal as specified by the - # akka.persistence.journal.plugin property. - write-plugin = "" - - # Look for more data with this interval. The query journal is also notified by - # the write journal when something is changed and thereby updated quickly, but - # when there are a lot of changes it falls back to periodic queries to avoid - # overloading the system with many small queries. - refresh-interval = 3s - - # How many events to fetch in one query and keep buffered until they - # are delivered downstreams. - max-buffer-size = 100 - } - } +#//#query-leveldb +# Configuration for the LeveldbReadJournal +akka.persistence.query.journal.leveldb { + # Implementation class of the LevelDB ReadJournal + class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal" + + # Absolute path to the write journal plugin configuration entry that this + # query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal. + # If undefined (or "") it will connect to the default journal as specified by the + # akka.persistence.journal.plugin property. + write-plugin = "" + + # The LevelDB write journal is notifying the query side as soon as things + # are persisted, but for efficiency reasons the query side retrieves the events + # in batches that sometimes can be delayed up to the configured `refresh-interval`. + refresh-interval = 3s + + # How many events to fetch in one query (replay) and keep buffered until they + # are delivered downstreams. + max-buffer-size = 100 } +#//#query-leveldb diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala index 5695747e31..0a12b686ed 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala @@ -22,9 +22,135 @@ import akka.util.ByteString import java.net.URLEncoder object LeveldbReadJournal { + /** + * The default identifier for [[LeveldbReadJournal]] to be used with + * [[akka.persistence.query.PersistenceQuery#readJournalFor]]. + * + * The value is `"akka.persistence.query.journal.leveldb"` and corresponds + * to the absolute path to the read journal configuration entry. + */ final val Identifier = "akka.persistence.query.journal.leveldb" } +/** + * [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. + * + * It is retrieved with Scala API: + * {{{ + * val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + * }}} + * + * or with Java API: + * {{{ + * ReadJournal queries = + * PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier()); + * }}} + * + * Configuration settings can be defined in the configuration section with the + * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` + * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. + * + * The following queries are supported. + * + * == EventsByPersistenceId == + * + * [[akka.persistence.query.EventsByPersistenceId]] is used for retrieving events for a specific + * `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that + * the corresponding sequence number of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given sequence number. + * + * The returned event stream is ordered by sequence number, i.e. the same order as the + * `PersistentActor` persisted the events. The same prefix of stream elements (in same order) + * are returned for multiple executions of the query, except for when events have been deleted. + * + * The query supports two different completion modes: + *
    + *
  • The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. This is the + * default mode that is used when no hints are given. It can also be specified with + * hint [[akka.persistence.query.RefreshInterval]].
  • + *
  • The stream is completed when it reaches the end of the currently stored events. + * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • + *
+ * + * The LevelDB write journal is notifying the query side as soon as events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]] + * hint. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + * + * == AllPersistenceIds == + * + * [[akka.persistence.query.AllPersistenceIds]] is used for retrieving all `persistenceIds` of all + * persistent actors. + * + * The returned event stream is unordered and you can expect different order for multiple + * executions of the query. + * + * The query supports two different completion modes: + *
    + *
  • The stream is not completed when it reaches the end of the currently used `persistenceIds`, + * but it continues to push new `persistenceIds` when new persistent actors are created. + * This is the default mode that is used when no hints are given. It can also be specified with + * hint [[akka.persistence.query.RefreshInterval]].
  • + *
  • The stream is completed when it reaches the end of the currently used `persistenceIds`. + * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • + *
+ * + * The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are + * created and there is no periodic polling or batching involved in this query. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + * + * == EventsByTag == + * + * [[akka.persistence.query.EventsByTag]] is used for retrieving events that were marked with + * a given tag, e.g. all events of an Aggregate Root type. + * + * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events + * in a [[akka.persistence.journal.leveldb.Tagged]] with the given `tags`. + * + * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all + * events with a given tag. The `offset` corresponds to an ordered sequence number for + * the specific tag. Note that the corresponding offset of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given offset. + * + * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` + * for each event. The `sequenceNr` is the sequence number for the persistent actor with the + * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique + * identifier for the event. + * + * The returned event stream is ordered by the offset (tag sequence number), which corresponds + * to the same order as the write journal stored the events. The same stream elements (in same order) + * are returned for multiple executions of the query. Deleted events are not deleted from the + * tagged event stream. + * + * The query supports two different completion modes: + *
    + *
  • The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. This is the + * default mode that is used when no hints are given. It can also be specified with + * hint [[akka.persistence.query.RefreshInterval]].
  • + *
  • The stream is completed when it reaches the end of the currently stored events. + * This mode is specified with hint [[akka.persistence.query.NoRefresh]].
  • + *
+ * + * The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for + * efficiency reasons the query side retrieves the events in batches that sometimes can + * be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]] + * hint. + * + * The stream is completed with failure if there is a failure in executing the query in the + * backend journal. + */ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal { private val serialization = SerializationExtension(system) diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index fc8802371d..ceaa39d646 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -19,6 +19,7 @@ import akka.persistence.journal.leveldb.Tagged import akka.persistence.journal.EventSeq import akka.persistence.journal.EventAdapter import akka.persistence.query.EventEnvelope +import akka.persistence.journal.WriteEventAdapter object EventsByTagSpec { val config = """ @@ -38,7 +39,7 @@ object EventsByTagSpec { } -class ColorTagger extends EventAdapter { +class ColorTagger extends WriteEventAdapter { val colors = Set("green", "black", "blue") override def toJournal(event: Any): Any = event match { case s: String ⇒ @@ -48,8 +49,6 @@ class ColorTagger extends EventAdapter { case _ ⇒ event } - override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) - override def manifest(event: Any): String = "" } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index d37ab72d3c..696fa088de 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -88,6 +88,8 @@ private[persistence] object LeveldbJournal { * Subscribe the `sender` to changes (appended events) for a specific `tag`. * Used by query-side. The journal will send [[TaggedEventAppended]] messages to * the subscriber when `asyncWriteMessages` has been called. + * Events are tagged by wrapping in [[akka.persistence.journal.leveldb.Tagged]] + * via an [[akka.persistence.journal.EventAdapter]]. */ final case class SubscribeTag(tag: String) extends SubscriptionCommand final case class TaggedEventAppended(tag: String) extends DeadLetterSuppression diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 9766de9715..8ab3a1c90c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -63,6 +63,9 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with } if (tags.nonEmpty && hasTagSubscribers) allTags ++= tags + + require(!p2.persistenceId.startsWith(tagPersistenceIdPrefix), + s"persistenceId [${p.persistenceId}] must not start with $tagPersistenceIdPrefix") addToMessageBatch(p2, tags, batch) } if (hasPersistenceIdSubscribers) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala index 94e8274ba8..988cd78c80 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala @@ -3,6 +3,9 @@ */ package akka.persistence.journal.leveldb +import java.util.ArrayList +import scala.collection.JavaConverters._ + /** * The LevelDB journal supports tagging of events that are used * by the `EventsByTag` query. To specify the tags you create an @@ -11,4 +14,12 @@ package akka.persistence.journal.leveldb * * The journal will unwrap the event and store the `payload`. */ -case class Tagged(payload: Any, tags: Set[String]) +case class Tagged(payload: Any, tags: Set[String]) { + + /** + * Java API + */ + def this(payload: Any, tags: java.util.Set[String]) = { + this(payload, tags.asScala.toSet) + } +}