Merge pull request #18290 from akka/wip-18288-leveldb-query-doc-patriknw

=per #18288 Add docs of leveldb queries
This commit is contained in:
Patrik Nordwall 2015-08-21 15:37:49 +02:00
commit 3d4b5f57b0
17 changed files with 683 additions and 30 deletions

View file

@ -0,0 +1,95 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence.query;
import java.util.HashSet;
import java.util.Set;
import scala.runtime.BoxedUnit;
import akka.actor.ActorSystem;
import akka.persistence.journal.WriteEventAdapter;
import akka.persistence.journal.EventSeq;
import akka.persistence.journal.leveldb.Tagged;
import akka.persistence.query.AllPersistenceIds;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.EventsByPersistenceId;
import akka.persistence.query.EventsByTag;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.persistence.query.journal.leveldb.LeveldbReadJournal;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
public class LeveldbPersistenceQueryDocTest {
final ActorSystem system = ActorSystem.create();
public void demonstrateReadJournal() {
//#get-read-journal
final ActorMaterializer mat = ActorMaterializer.create(system);
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
//#get-read-journal
}
public void demonstrateEventsByPersistenceId() {
//#EventsByPersistenceId
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<EventEnvelope, BoxedUnit> source =
queries.query(EventsByPersistenceId.create("some-persistence-id", 0, Long.MAX_VALUE));
//#EventsByPersistenceId
}
public void demonstrateAllPersistenceIds() {
//#AllPersistenceIds
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<String, BoxedUnit> source =
queries.query(AllPersistenceIds.getInstance());
//#AllPersistenceIds
}
public void demonstrateEventsByTag() {
//#EventsByTag
ReadJournal queries =
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
Source<EventEnvelope, BoxedUnit> source =
queries.query(EventsByTag.create("green", 0));
//#EventsByTag
}
static
//#tagger
public class MyEventAdapter implements WriteEventAdapter {
@Override
public Object toJournal(Object event) {
if (event instanceof String) {
String s = (String) event;
Set<String> tags = new HashSet<String>();
if (s.contains("green")) tags.add("green");
if (s.contains("black")) tags.add("black");
if (s.contains("blue")) tags.add("blue");
if (tags.isEmpty())
return event;
else
return new Tagged(event, tags);
} else {
return event;
}
}
@Override
public String manifest(Object event) {
return "";
}
}
//#tagger
}

View file

@ -13,4 +13,5 @@ Actors
fsm
persistence
persistence-query
persistence-query-leveldb
testing

View file

@ -331,7 +331,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u
persistent actors may call the ``deleteMessages`` method.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.

View file

@ -0,0 +1,161 @@
.. _persistence-query-leveldb-java:
#############################
Persistence Query for LevelDB
#############################
This is documentation for the LevelDB implementation of the :ref:`persistence-query-java` API.
Note that implementations for other journals may have different semantics.
.. warning::
This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
improve this API based on our users feedback, which implies that while we try to keep incompatible
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
contents of the ``akka.persistence.query`` package.
Dependencies
============
Akka persistence LevelDB query implementation is bundled in the ``akka-persistence-query-experimental`` artifact.
Make sure that you have the following dependency in your project::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-query-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
How to get the ReadJournal
==========================
The ``ReadJournal`` is retrieved via the ``akka.persistence.query.PersistenceQuery``
extension:
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#get-read-journal
Supported Queries
=================
EventsByPersistenceId
---------------------
``EventsByPersistenceId`` is used for retrieving events for a specific ``PersistentActor``
identified by ``persistenceId``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByPersistenceId
You can retrieve a subset of all events by specifying ``fromSequenceNr`` and ``toSequenceNr``
or use ``0L`` and ``Long.MAX_VALUE`` respectively to retrieve all events. Note that
the corresponding sequence number of each event is provided in the ``EventEnvelope``,
which makes it possible to resume the stream at a later point from a given sequence number.
The returned event stream is ordered by sequence number, i.e. the same order as the
``PersistentActor`` persisted the events. The same prefix of stream elements (in same order)
are returned for multiple executions of the query, except for when events have been deleted.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
AllPersistenceIds
-----------------
``AllPersistenceIds`` is used for retrieving all ``persistenceIds`` of all persistent actors.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#AllPersistenceIds
The returned event stream is unordered and you can expect different order for multiple
executions of the query.
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently used ``persistenceIds``,
but it continues to push new ``persistenceIds`` when new persistent actors are created.
This is the default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently used ``persistenceIds``.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as new ``persistenceIds`` are
created and there is no periodic polling or batching involved in this query.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
EventsByTag
-----------
``EventsByTag`` is used for retrieving events that were marked with a given tag, e.g.
all domain events of an Aggregate Root type.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#EventsByTag
To tag events you create an :ref:`event-adapters-java` that wraps the events in a ``akka.persistence.journal.leveldb.Tagged``
with the given ``tags``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger
You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all
events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag.
Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible
to resume the stream at a later point from a given offset.
In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr``
for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the
``persistenceId`` that persisted the event. The ``persistenceId`` + ``sequenceNr`` is an unique
identifier for the event.
The returned event stream is ordered by the offset (tag sequence number), which corresponds
to the same order as the write journal stored the events. The same stream elements (in same order)
are returned for multiple executions of the query. Deleted events are not deleted from the
tagged event stream.
.. note::
Events deleted using ``deleteMessages(toSequenceNr)`` are not deleted from the "tagged stream".
The query supports two different completion modes:
* The stream is not completed when it reaches the end of the currently stored events,
but it continues to push new events when new events are persisted. This is the
default mode that is used when no hints are given. It can also be specified with
hint ``RefreshInterval``.
* The stream is completed when it reaches the end of the currently stored events.
This mode is specified with hint ``NoRefresh``.
The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
efficiency reasons the query side retrieves the events in batches that sometimes can
be delayed up to the configured ``refresh-interval`` or given ``RefreshInterval``
hint.
The stream is completed with failure if there is a failure in executing the query in the
backend journal.
Configuration
=============
Configuration settings can be defined in the configuration section with the
absolute path corresponding to the identifier, which is ``"akka.persistence.query.journal.leveldb"``
for the default ``LeveldbReadJournal.Identifier``.
It can be configured with the following properties:
.. includecode:: ../../../akka-persistence-query/src/main/resources/reference.conf#query-leveldb

View file

@ -26,7 +26,11 @@ Dependencies
Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project::
"com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-query-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
Design overview
===============
@ -201,6 +205,8 @@ Query plugins
Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds
of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page.
The plugin for LevelDB is described in :ref:`persistence-query-leveldb-java`.
This section aims to provide tips and guide plugin developers through implementing a custom query plugin.
Most users will not need to implement journals themselves, except if targeting a not yet supported datastore.

View file

@ -334,7 +334,7 @@ It is possible to delete all messages (journaled by a single persistent actor) u
persistent actors may call the ``deleteMessages`` method.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessagess(toSequenceNr)``
:ref:`snapshotting <snapshots>`, i.e. after a snapshot has been successfully stored, a ``deleteMessages(toSequenceNr)``
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
while still having access to the accumulated state during replays - by loading the snapshot.