=per #18288 Add docs of leveldb queries

This commit is contained in:
Patrik Nordwall 2015-08-21 11:35:51 +02:00
parent dfba334fda
commit fed622eb9f
17 changed files with 683 additions and 30 deletions

View file

@ -0,0 +1,95 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence.query;
import java.util.HashSet;
import java.util.Set;
import scala.runtime.BoxedUnit;
import akka.actor.ActorSystem;
import akka.persistence.journal.WriteEventAdapter;
import akka.persistence.journal.EventSeq;
import akka.persistence.journal.leveldb.Tagged;
import akka.persistence.query.AllPersistenceIds;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.EventsByPersistenceId;
import akka.persistence.query.EventsByTag;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.persistence.query.journal.leveldb.LeveldbReadJournal;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
public class LeveldbPersistenceQueryDocTest {
final ActorSystem system = ActorSystem.create();
public void demonstrateReadJournal() {
//#get-read-journal
final ActorMaterializer mat = ActorMaterializer.create(system);
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
//#get-read-journal
}
public void demonstrateEventsByPersistenceId() {
//#EventsByPersistenceId
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<EventEnvelope, BoxedUnit> source =
queries.query(EventsByPersistenceId.create("some-persistence-id", 0, Long.MAX_VALUE));
//#EventsByPersistenceId
}
public void demonstrateAllPersistenceIds() {
//#AllPersistenceIds
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<String, BoxedUnit> source =
queries.query(AllPersistenceIds.getInstance());
//#AllPersistenceIds
}
public void demonstrateEventsByTag() {
//#EventsByTag
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<EventEnvelope, BoxedUnit> source =
queries.query(EventsByTag.create("green", 0));
//#EventsByTag
}
static
//#tagger
public class MyEventAdapter implements WriteEventAdapter {
@Override
public Object toJournal(Object event) {
if (event instanceof String) {
String s = (String) event;
Set<String> tags = new HashSet<String>();
if (s.contains("green")) tags.add("green");
if (s.contains("black")) tags.add("black");
if (s.contains("blue")) tags.add("blue");
if (tags.isEmpty())
return event;
else
return new Tagged(event, tags);
} else {
return event;
}
}
@Override
public String manifest(Object event) {
return "";
}
}
//#tagger
}

View file

@ -13,4 +13,5 @@ Actors
fsm
persistence
persistence-query
persistence-query-leveldb
testing

View file

@ -331,7 +331,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u
persistent actors may call the ``deleteMessages`` method.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.

View file

@ -0,0 +1,161 @@
.. _persistence-query-leveldb-java:
#############################
Persistence Query for LevelDB
#############################
This is documentation for the LevelDB implementation of the :ref:`persistence-query-java` API.
Note that implementations for other journals may have different semantics.
.. warning::
This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
improve this API based on our users feedback, which implies that while we try to keep incompatible
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
contents of the ``akka.persistence.query`` package.
Dependencies
============
Akka persistence LevelDB query implementation is bundled in the ``akka-persistence-query-experimental`` artifact.
Make sure that you have the following dependency in your project::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-query-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
How to get the ReadJournal
==========================
The ``ReadJournal`` is retrieved via the ``akka.persistence.query.PersistenceQuery``
extension:
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#get-read-journal
Supported Queries
=================
EventsByPersistenceId
---------------------
``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor``
identified by ``persistenceId``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByPersistenceId
You can retrieve a subset of all events by specifying ``fromSequenceNr`` and ``toSequenceNr``
or use ``0L`` and ``Long.MAX_VALUE`` respectively to retrieve all events. Note that
the corresponding sequence number of each event is provided in the ``EventEnvelope``,
which makes it possible to resume the stream at a later point from a given sequence number.
The returned event stream is ordered by sequence number, i.e. the same order as the
``PersistentActor`` persisted the events. The same prefix of stream elements (in same order)
are returned for multiple executions of the query, except for when events have been deleted.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
AllPersistenceIds
-----------------
``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#AllPersistenceIds
The returned event stream is unordered and you can expect different order for multiple
executions of the query.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently used ``persistenceIds``,
but it continues to push new ``persistenceIds`` when new persistent actors are created.
This is the default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently used ``persistenceIds``.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are
created and there is no periodic polling or batching involved in this query.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
EventsByTag
-----------
``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g.
all domain events of an Aggregate Root type.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag
To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged``
with the given ``tags``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger
You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all
events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag.
Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible
to resume the stream at a later point from a given offset.
In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr``
for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the
``persistenceId`` that persisted the event. The ``persistenceId`` + ``sequenceNr`` is an unique
identifier for the event.
The returned event stream is ordered by the offset (tag sequence number), which corresponds
to the same order as the write journal stored the events. The same stream elements (in same order)
are returned for multiple executions of the query. Deleted events are not deleted from the
tagged event stream.
.. note::
Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream".
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
Configuration
=============
Configuration settings can be defined in the configuration section with the
absolute path corresponding to the identifier, which is ``"akka.persistence.query.journal.leveldb"``
for the default ``LeveldbReadJournal.Identifier``.
It can be configured with the following properties:
.. includecode:: ../../../akka-persistence-query/src/main/resources/reference.conf#query-leveldb

View file

@ -26,7 +26,11 @@ Dependencies
Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project::
"com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-query-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
Design overview
===============
@ -201,6 +205,8 @@ Query plugins
Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds
of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page.
The plugin for LevelDB is described in :ref:`persistence-query-leveldb-java`.
This section aims to provide tips and guide plugin developers through implementing a custom query plugin.
Most users will not need to implement journals themselves, except if targeting a not yet supported datastore.

View file

@ -334,7 +334,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u
persistent actors may call the ``deleteMessages`` method.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.

View file

@ -0,0 +1,91 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence.query
import akka.persistence.journal.{ EventAdapter, EventSeq }
import akka.testkit.AkkaSpec
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.LeveldbReadJournal
import akka.persistence.journal.leveldb.Tagged
import akka.persistence.query.EventsByPersistenceId
import akka.stream.scaladsl.Source
import akka.persistence.query.EventEnvelope
import akka.stream.ActorMaterializer
import akka.persistence.query.AllPersistenceIds
import akka.persistence.query.EventsByTag
import scala.annotation.tailrec
object LeveldbPersistenceQueryDocSpec {
//#tagger
import akka.persistence.journal.WriteEventAdapter
import akka.persistence.journal.leveldb.Tagged
class ColorTagger extends WriteEventAdapter {
val colors = Set("green", "black", "blue")
override def toJournal(event: Any): Any = event match {
case s: String
var tags = colors.foldLeft(Set.empty[String]) { (acc, c)
if (s.contains(c)) acc + c else acc
}
if (tags.isEmpty) event
else Tagged(event, tags)
case _ event
}
override def manifest(event: Any): String = ""
}
//#tagger
}
class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
def this() = this("")
"LeveldbPersistentQuery" must {
"demonstrate how get ReadJournal" in {
//#get-read-journal
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.LeveldbReadJournal
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
//#get-read-journal
}
"demonstrate EventsByPersistenceId" in {
//#EventsByPersistenceId
import akka.persistence.query.EventsByPersistenceId
implicit val mat = ActorMaterializer()(system)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, Unit] =
queries.query(EventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue))
val events: Source[Any, Unit] = src.map(_.event)
//#EventsByPersistenceId
}
"demonstrate AllPersistenceIds" in {
//#AllPersistenceIds
import akka.persistence.query.AllPersistenceIds
implicit val mat = ActorMaterializer()(system)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
val src: Source[String, Unit] = queries.query(AllPersistenceIds)
//#AllPersistenceIds
}
"demonstrate EventsByTag" in {
//#EventsByTag
import akka.persistence.query.EventsByTag
implicit val mat = ActorMaterializer()(system)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, Unit] =
queries.query(EventsByTag(tag = "green", offset = 0L))
//#EventsByTag
}
}
}

View file

@ -14,6 +14,7 @@ Actors
persistence
persistence-schema-evolution
persistence-query
persistence-query-leveldb
testing
actordsl
typed-actors

View file

@ -0,0 +1,156 @@
.. _persistence-query-leveldb-scala:
#############################
Persistence Query for LevelDB
#############################
This is documentation for the LevelDB implementation of the :ref:`persistence-query-scala` API.
Note that implementations for other journals may have different semantics.
.. warning::
This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
improve this API based on our users feedback, which implies that while we try to keep incompatible
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
contents of the ``akka.persistence.query`` package.
Dependencies
============
Akka persistence LevelDB query implementation is bundled in the ``akka-persistence-query-experimental`` artifact.
Make sure that you have the following dependency in your project::
"com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@
How to get the ReadJournal
==========================
The ``ReadJournal`` is retrieved via the ``akka.persistence.query.PersistenceQuery``
extension:
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#get-read-journal
Supported Queries
=================
EventsByPersistenceId
---------------------
``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor``
identified by ``persistenceId``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByPersistenceId
You can retrieve a subset of all events by specifying ``fromSequenceNr`` and ``toSequenceNr``
or use ``0L`` and ``Long.MaxValue`` respectively to retrieve all events. Note that
the corresponding sequence number of each event is provided in the ``EventEnvelope``,
which makes it possible to resume the stream at a later point from a given sequence number.
The returned event stream is ordered by sequence number, i.e. the same order as the
``PersistentActor`` persisted the events. The same prefix of stream elements (in same order)
are returned for multiple executions of the query, except for when events have been deleted.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
AllPersistenceIds
-----------------
``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#AllPersistenceIds
The returned event stream is unordered and you can expect different order for multiple
executions of the query.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently used ``persistenceIds``,
but it continues to push new ``persistenceIds`` when new persistent actors are created.
This is the default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently used ``persistenceIds``.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are
created and there is no periodic polling or batching involved in this query.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
EventsByTag
-----------
``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g.
all domain events of an Aggregate Root type.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#EventsByTag
To tag events you create an :ref:`event-adapters-scala` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged``
with the given ``tags``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger
You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all
events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag.
Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible
to resume the stream at a later point from a given offset.
In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr``
for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the
``persistenceId`` that persisted the event. The ``persistenceId`` + ``sequenceNr`` is an unique
identifier for the event.
The returned event stream is ordered by the offset (tag sequence number), which corresponds
to the same order as the write journal stored the events. The same stream elements (in same order)
are returned for multiple executions of the query. Deleted events are not deleted from the
tagged event stream.
.. note::
Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream".
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
Configuration
=============
Configuration settings can be defined in the configuration section with the
absolute path corresponding to the identifier, which is ``"akka.persistence.query.journal.leveldb"``
for the default ``LeveldbReadJournal.Identifier``.
It can be configured with the following properties:
.. includecode:: ../../../akka-persistence-query/src/main/resources/reference.conf#query-leveldb

View file

@ -199,6 +199,8 @@ Query plugins
Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds
of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page.
The plugin for LevelDB is described in :ref:`persistence-query-leveldb-scala`.
This section aims to provide tips and guide plugin developers through implementing a custom query plugin.
Most users will not need to implement journals themselves, except if targeting a not yet supported datastore.

View file

@ -321,7 +321,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u
persistent actors may call the ``deleteMessages`` method.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.

View file

@ -5,26 +5,25 @@
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
akka.persistence.query {
journal {
leveldb {
#//#query-leveldb
# Configuration for the LeveldbReadJournal
akka.persistence.query.journal.leveldb {
# Implementation class of the LevelDB ReadJournal
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal"
# Absolute path to the write journal plugin configuration entry that this query journal
# will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# Absolute path to the write journal plugin configuration entry that this
# query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# If undefined (or "") it will connect to the default journal as specified by the
# akka.persistence.journal.plugin property.
write-plugin = ""
# Look for more data with this interval. The query journal is also notified by
# the write journal when something is changed and thereby updated quickly, but
# when there are a lot of changes it falls back to periodic queries to avoid
# overloading the system with many small queries.
# The LevelDB write journal is notifying the query side as soon as things
# are persisted, but for efficiency reasons the query side retrieves the events
# in batches that sometimes can be delayed up to the configured `refresh-interval`.
refresh-interval = 3s
# How many events to fetch in one query and keep buffered until they
# How many events to fetch in one query (replay) and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
}
}
#//#query-leveldb

View file

@ -22,9 +22,135 @@ import akka.util.ByteString
import java.net.URLEncoder
object LeveldbReadJournal {
/**
* The default identifier for [[LeveldbReadJournal]] to be used with
* [[akka.persistence.query.PersistenceQuery#readJournalFor]].
*
* The value is `"akka.persistence.query.journal.leveldb"` and corresponds
* to the absolute path to the read journal configuration entry.
*/
final val Identifier = "akka.persistence.query.journal.leveldb"
}
/**
* [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB.
*
* It is retrieved with Scala API:
* {{{
* val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
* }}}
*
* or with Java API:
* {{{
* ReadJournal queries =
* PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
* }}}
*
* Configuration settings can be defined in the configuration section with the
* absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"`
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
*
* The following queries are supported.
*
* == EventsByPersistenceId ==
*
* [[akka.persistence.query.EventsByPersistenceId]] is used for retrieving events for a specific
* `PersistentActor` identified by `persistenceId`.
*
* You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr`
* or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that
* the corresponding sequence number of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given sequence number.
*
* The returned event stream is ordered by sequence number, i.e. the same order as the
* `PersistentActor` persisted the events. The same prefix of stream elements (in same order)
* are returned for multiple executions of the query, except for when events have been deleted.
*
* The query supports two different completion modes:
* <ul>
* <li>The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted. This is the
* default mode that is used when no hints are given. It can also be specified with
* hint [[akka.persistence.query.RefreshInterval]].</li>
* <li>The stream is completed when it reaches the end of the currently stored events.
* This mode is specified with hint [[akka.persistence.query.NoRefresh]].</li>
* </ul>
*
* The LevelDB write journal is notifying the query side as soon as events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]]
* hint.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*
* == AllPersistenceIds ==
*
* [[akka.persistence.query.AllPersistenceIds]] is used for retrieving all `persistenceIds` of all
* persistent actors.
*
* The returned event stream is unordered and you can expect different order for multiple
* executions of the query.
*
* The query supports two different completion modes:
* <ul>
* <li>The stream is not completed when it reaches the end of the currently used `persistenceIds`,
* but it continues to push new `persistenceIds` when new persistent actors are created.
* This is the default mode that is used when no hints are given. It can also be specified with
* hint [[akka.persistence.query.RefreshInterval]].</li>
* <li>The stream is completed when it reaches the end of the currently used `persistenceIds`.
* This mode is specified with hint [[akka.persistence.query.NoRefresh]].</li>
* </ul>
*
* The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are
* created and there is no periodic polling or batching involved in this query.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*
* == EventsByTag ==
*
* [[akka.persistence.query.EventsByTag]] is used for retrieving events that were marked with
* a given tag, e.g. all events of an Aggregate Root type.
*
* To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events
* in a [[akka.persistence.journal.leveldb.Tagged]] with the given `tags`.
*
* You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all
* events with a given tag. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* The returned event stream is ordered by the offset (tag sequence number), which corresponds
* to the same order as the write journal stored the events. The same stream elements (in same order)
* are returned for multiple executions of the query. Deleted events are not deleted from the
* tagged event stream.
*
* The query supports two different completion modes:
* <ul>
* <li>The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted. This is the
* default mode that is used when no hints are given. It can also be specified with
* hint [[akka.persistence.query.RefreshInterval]].</li>
* <li>The stream is completed when it reaches the end of the currently stored events.
* This mode is specified with hint [[akka.persistence.query.NoRefresh]].</li>
* </ul>
*
* The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval` or given [[akka.persistence.query.RefreshInterval]]
* hint.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal {
private val serialization = SerializationExtension(system)

View file

@ -19,6 +19,7 @@ import akka.persistence.journal.leveldb.Tagged
import akka.persistence.journal.EventSeq
import akka.persistence.journal.EventAdapter
import akka.persistence.query.EventEnvelope
import akka.persistence.journal.WriteEventAdapter
object EventsByTagSpec {
val config = """
@ -38,7 +39,7 @@ object EventsByTagSpec {
}
class ColorTagger extends EventAdapter {
class ColorTagger extends WriteEventAdapter {
val colors = Set("green", "black", "blue")
override def toJournal(event: Any): Any = event match {
case s: String
@ -48,8 +49,6 @@ class ColorTagger extends EventAdapter {
case _ event
}
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
override def manifest(event: Any): String = ""
}

View file

@ -88,6 +88,8 @@ private[persistence] object LeveldbJournal {
* Subscribe the `sender` to changes (appended events) for a specific `tag`.
* Used by query-side. The journal will send [[TaggedEventAppended]] messages to
* the subscriber when `asyncWriteMessages` has been called.
* Events are tagged by wrapping in [[akka.persistence.journal.leveldb.Tagged]]
* via an [[akka.persistence.journal.EventAdapter]].
*/
final case class SubscribeTag(tag: String) extends SubscriptionCommand
final case class TaggedEventAppended(tag: String) extends DeadLetterSuppression

View file

@ -63,6 +63,9 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
}
if (tags.nonEmpty && hasTagSubscribers)
allTags ++= tags
require(!p2.persistenceId.startsWith(tagPersistenceIdPrefix),
s"persistenceId [${p.persistenceId}] must not start with $tagPersistenceIdPrefix")
addToMessageBatch(p2, tags, batch)
}
if (hasPersistenceIdSubscribers)

View file

@ -3,6 +3,9 @@
*/
package akka.persistence.journal.leveldb
import java.util.ArrayList
import scala.collection.JavaConverters._
/**
* The LevelDB journal supports tagging of events that are used
* by the `EventsByTag` query. To specify the tags you create an
@ -11,4 +14,12 @@ package akka.persistence.journal.leveldb
*
* The journal will unwrap the event and store the `payload`.
*/
case class Tagged(payload: Any, tags: Set[String])
case class Tagged(payload: Any, tags: Set[String]) {
/**
* Java API
*/
def this(payload: Any, tags: java.util.Set[String]) = {
this(payload, tags.asScala.toSet)
}
}