diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java new file mode 100644 index 0000000000..b90031fa53 --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.persistence; + +import akka.actor.*; +import akka.event.EventStreamSpec; +import akka.japi.Function; +import akka.japi.Procedure; +import akka.pattern.BackoffSupervisor; +import akka.persistence.*; +import akka.persistence.query.*; +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; +import akka.util.Timeout; +import docs.persistence.query.MyEventsByTagPublisher; +import scala.collection.Seq; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class PersistenceQueryDocTest { + + final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS)); + + //#my-read-journal + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; + + public MyReadJournal(ExtendedActorSystem system) { + this.system = system; + } + + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); + + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); + + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query $unsupported not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); + } + } + + private FiniteDuration refreshInterval(Hint[] hints) { + FiniteDuration ret = defaultRefreshInterval; + for (Hint hint : hints) + if (hint instanceof RefreshInterval) + ret = ((RefreshInterval) hint).interval(); + return ret; + } + + private akka.japi.function.Function noMaterializedValue () { + return param -> (M) null; + } + + } + //#my-read-journal +} diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java new file mode 100644 index 0000000000..b4353623ee --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query; + +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.PersistentRepr; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import akka.stream.actor.AbstractActorPublisher; +import scala.Int; + +import akka.actor.Props; +import akka.persistence.query.EventEnvelope; +import akka.stream.actor.ActorPublisherMessage.Cancel; + +import scala.concurrent.duration.FiniteDuration; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +//#events-by-tag-publisher +class MyEventsByTagJavaPublisher extends AbstractActorPublisher { + private final Serialization serialization = + SerializationExtension.get(context().system()); + + private final Connection connection; + + private final String tag; + + private final String CONTINUE = "CONTINUE"; + private final int LIMIT = 1000; + private long currentOffset; + private List buf = new ArrayList<>(); + + private Cancellable continueTask; + + public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) { + this.connection = connection; + this.tag = tag; + this.currentOffset = offset; + + this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self()); + receive(ReceiveBuilder + .matchEquals(CONTINUE, (in) -> { + query(); + deliverBuf(); + }) + .match(Cancel.class, (in) -> { + context().stop(self()); + }) + .build()); + } + + public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) { + return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); + } + + @Override + public void postStop() { + continueTask.cancel(); + } + + private void query() { + if (buf.isEmpty()) { + try { + PreparedStatement s = connection.prepareStatement( + "SELECT id, persistent_repr " + + "FROM journal WHERE tag = ? AND id >= ? " + + "ORDER BY id LIMIT ?"); + + s.setString(1, tag); + s.setLong(2, currentOffset); + s.setLong(3, LIMIT); + final ResultSet rs = s.executeQuery(); + + final List> res = new ArrayList<>(LIMIT); + while (rs.next()) + res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); + + if (!res.isEmpty()) { + currentOffset = res.get(res.size() - 1).first(); + } + + buf = res.stream().map(in -> { + final Long id = in.first(); + final byte[] bytes = in.second(); + + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + + return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + }).collect(toList()); + } catch(Exception e) { + onErrorThenStop(e); + } + } + } + + private void deliverBuf() { + while (totalDemand() > 0 && !buf.isEmpty()) + onNext(buf.remove(0)); + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 20b567295b..173a0fd58e 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -12,4 +12,5 @@ Actors routing fsm persistence + persistence-query testing diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst new file mode 100644 index 0000000000..cb816a1f26 --- /dev/null +++ b/akka-docs/rst/java/persistence-query.rst @@ -0,0 +1,234 @@ +.. _persistence-query-java: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-java` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries +to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more +complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some +transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query +- pulling out of one Journal and storing into another one. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage + +Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +``getJournalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +Again, specific capabilities are specific to the journal you are using, so you have to + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs + +.. _Reactive Streams: http://reactive-streams.org + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-java: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher + +Plugin TCK +---------- + +TODO, not available yet. + diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 71a2f8b2ce..8c7436d154 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -628,7 +628,7 @@ completely. Event Adapters help in situations where: -- **Version Migration** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, +- **Version Migrations** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as ``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index ac8dab1925..2f36623436 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -22,13 +22,13 @@ object MyEventsByTagPublisher { //#events-by-tag-publisher class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) extends ActorPublisher[EventEnvelope] { - import MyEventsByTagPublisher._ private case object Continue - private val limit = 1000 + private val connection: java.sql.Connection = ??? - private var currentId = 0L + private val Limit = 1000 + private var currentOffset = offset var buf = Vector.empty[EventEnvelope] import context.dispatcher @@ -48,14 +48,35 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD context.stop(self) } + object Select { + private def statement() = connection.prepareStatement( + """ + SELECT id, persistent_repr FROM journal + WHERE tag = ? AND id >= ? + ORDER BY id LIMIT ? + """) + + def run(tag: String, from: Long, limit: Int): Vector[(Long, Array[Byte])] = { + val s = statement() + try { + s.setString(1, tag) + s.setLong(2, from) + s.setLong(3, limit) + val rs = s.executeQuery() + + val b = Vector.newBuilder[(Long, Array[Byte])] + while (rs.next()) + b += (rs.getLong(1) -> rs.getBytes(2)) + b.result() + } finally s.close() + } + } + def query(): Unit = if (buf.isEmpty) { try { - // Could be an SQL query, for example: - // "SELECT id, persistent_repr FROM journal WHERE tag = like ? and " + - // "id >= ? ORDER BY id limit ?" - val result: Vector[(Long, Array[Byte])] = ??? - currentId = if (result.nonEmpty) result.last._1 else currentId + val result = Select.run(tag, currentOffset, Limit) + currentOffset = if (result.nonEmpty) result.last._1 else currentOffset val serialization = SerializationExtension(context.system) buf = result.map { diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index 0c4dc391b6..4d3963a04f 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -28,8 +28,7 @@ object PersistenceQueryDocSpec { //#my-read-journal class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { - // TODO from config - private val defaulRefreshInterval: FiniteDuration = 3.seconds + private val defaulRefreshInterval = 3.seconds override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 15435ba1e4..319e1ac56b 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -72,7 +72,7 @@ a journal is not obliged to implement all of them - for example because in a giv significantly inefficient. .. note:: - Refer to the documentation of the ``ReadJournal`` plugin you are using for a specific list of supported query types. + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. For example, Journal plugins should document their stream completion strategies. The predefined queries are: @@ -110,7 +110,7 @@ depends on the used journal. on relational databases, yet may be hard to implement efficiently on plain key-value datastores. In the example below we query all events which have been tagged (we assume this was performed by the write-side using an -:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-tag @@ -168,6 +168,8 @@ is as simple as, using the read-journal and feeding it into the databases driver .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs +.. _Reactive Streams: http://reactive-streams.org + Materialize view using mapAsync ------------------------------- @@ -225,24 +227,6 @@ And the ``EventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher -More journal example implementations ------------------------------------- - -In order to help implementers get get started with implementing read journals a number of reference implementaions -have been prepared, each highlighting a specific style a journal might need to be implemented in: - -* TODO LINK HERE – when the backing data store is unable to push events, nor does it expose an reactive streams interface, - yet has rich query capabilities (like an SQL database for example) -* TODO LINK HERE – when a `Reactive Streams`_ adapter or driver is available for the datastore, yet it is not able to handle - polling by itself. For example when using `Slick 3`_ along side with a typical SQL database. -* TODO LINK HERE – when the backing datastore already has a fully "reactive push/pull" adapter implemented, for example - such exist for Kafka (see the `Reactive Kafka`_ project by Krzysztof Ciesielski for details). - -.. _Reactive Kafka: https://github.com/softwaremill/reactive-kafka -.. _Reactive Streams: http://reactive-streams.org -.. _Slick 3: http://slick.typesafe.com/ - - Plugin TCK ---------- diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0289298ec2..753d10c7e0 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -638,8 +638,6 @@ Event Adapters help in situations where: understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the json instead of serializing the object to its binary representation. -.. image:: ../images/persistence-eventadapter.png - Implementing an EventAdapter is rather stright forward: .. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#identity-event-adapter diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index bb93297b33..0010e50d0d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -7,8 +7,9 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.event.Logging +import akka.stream.javadsl.Source -import scala.annotation.tailrec +import scala.annotation.{varargs, tailrec} import scala.util.Failure /** @@ -62,7 +63,12 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. */ final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = - new javadsl.ReadJournal(readJournalFor(readJournalPluginId)) + new javadsl.ReadJournal { + val backing = readJournalFor(readJournalPluginId) + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asJava + } + private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala index 7caef0a19b..c0edc644cd 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -29,7 +29,7 @@ import scala.annotation.varargs * }}} */ -final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { +trait ReadJournal { /** * Java API @@ -40,7 +40,7 @@ final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { * query, typically specific to the journal implementation. * */ - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - backing.query(q, hints: _*).asJava + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] + } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala