From 3314de4cb92fe61559937a7c07e853755d9fa561 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH] +per #16541 add missing java samples for persistence query --- .../persistence/PersistenceQueryDocTest.java | 364 ++++++++++++++++-- .../query/MyEventsByTagJavaPublisher.java | 63 +-- akka-docs/rst/java/persistence-query.rst | 19 +- .../query/MyEventsByTagPublisher.scala | 9 +- akka-docs/rst/scala/persistence-query.rst | 17 +- .../query/javadsl/ReadJournal.java | 2 +- .../scala/akka/persistence/query/Hint.scala | 8 + .../persistence/query/PersistenceQuery.scala | 3 +- .../scala/akka/persistence/query/Query.scala | 24 +- lol | 172 +++++++++ 10 files changed, 589 insertions(+), 92 deletions(-) create mode 100644 lol diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 2af6572628..7da4adfb31 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -4,71 +4,373 @@ package docs.persistence; +import static akka.pattern.Patterns.ask; + import akka.actor.*; +import akka.dispatch.Mapper; import akka.event.EventStreamSpec; import akka.japi.Function; import akka.japi.Procedure; +import akka.japi.pf.ReceiveBuilder; import akka.pattern.BackoffSupervisor; import akka.persistence.*; import akka.persistence.query.*; import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; +import docs.persistence.query.PersistenceQueryDocSpec; +import org.reactivestreams.Subscriber; import scala.collection.Seq; +import scala.collection.immutable.Vector; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.Boxed; import scala.runtime.BoxedUnit; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; public class PersistenceQueryDocTest { - final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS)); + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); //#my-read-journal - class MyReadJournal implements ReadJournal { - private final ExtendedActorSystem system; + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; public MyReadJournal(ExtendedActorSystem system) { this.system = system; } - final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); - @SuppressWarnings("unchecked") - public Source query(Query q, Hint... hints) { - if (q instanceof EventsByTag) { - final EventsByTag eventsByTag = (EventsByTag) q; - final String tag = eventsByTag.tag(); - long offset = eventsByTag.offset(); + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); - final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); - return (Source) Source.actorPublisher(props) - .mapMaterializedValue(noMaterializedValue()); - } else { - // unsuported - return Source.failed( - new UnsupportedOperationException( - "Query $unsupported not supported by " + getClass().getName())) - .mapMaterializedValue(noMaterializedValue()); - } + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query " + q + " not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); } + } - private FiniteDuration refreshInterval(Hint[] hints) { - FiniteDuration ret = defaultRefreshInterval; - for (Hint hint : hints) - if (hint instanceof RefreshInterval) - ret = ((RefreshInterval) hint).interval(); - return ret; - } + private FiniteDuration refreshInterval(Hint[] hints) { + for (Hint hint : hints) + if (hint instanceof RefreshInterval) + return ((RefreshInterval) hint).interval(); - private akka.japi.function.Function noMaterializedValue () { - return param -> (M) null; - } + return defaultRefreshInterval; + } + + private akka.japi.function.Function noMaterializedValue() { + return param -> (M) null; + } } - //#my-read-journal + //#my-read-journal + + void demonstrateBasicUsage() { + final ActorSystem system = ActorSystem.create(); + + //#basic-usage + // obtain read journal by plugin id + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + // issue query to journal + Source source = + readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); + + // materialize stream, consuming events + ActorMaterializer mat = ActorMaterializer.create(system); + source.runForeach(event -> System.out.println("Event: " + event), mat); + //#basic-usage + } + + void demonstrateAllPersistenceIdsLive() { + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds.getInstance()); + //#all-persistence-ids-live + } + + void demonstrateNoRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance()); + //#all-persistence-ids-snap + } + + void demonstrateRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-persistent-id-refresh + final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS); + readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh); + //#events-by-persistent-id-refresh + } + + void demonstrateEventsByTag() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + final Source blueThings = + readJournal.query(EventsByTag.create("blue")); + + // find top 10 blue things: + final Future> top10BlueThings = + (Future>) blueThings + .map(t -> t.event()) + .take(10) // cancels the query stream after pulling 10 elements + .>runFold(new ArrayList<>(10), (acc, e) -> { + acc.add(e); + return acc; + }, mat); + + // start another query, from the known offset + Source blue = readJournal.query(EventsByTag.create("blue", 10)); + //#events-by-tag + } + //#materialized-query-metadata-classes + // a plugin can provide: + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class QueryMetadata { + public final boolean deterministicOrder; + public final boolean infinite; + + public QueryMetadata(Boolean deterministicOrder, Boolean infinite) { + this.deterministicOrder = deterministicOrder; + this.infinite = infinite; + } + } + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class AllEvents implements Query { + private AllEvents() {} + private static AllEvents INSTANCE = new AllEvents(); + } + + //#materialized-query-metadata-classes + + void demonstrateMaterializedQueryValues() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#materialized-query-metadata + + final Source events = readJournal.query(AllEvents.INSTANCE); + + events.mapMaterializedValue(meta -> { + System.out.println("The query is: " + + "ordered deterministically: " + meta.deterministicOrder + " " + + "infinite: " + meta.infinite); + return meta; + }); + //#materialized-query-metadata + } + + class ReactiveStreamsCompatibleDBDriver { + Subscriber> batchWriter() { + return null; + } + } + + void demonstrateWritingIntoDifferentStore() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-rs + final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver(); + final Subscriber> dbBatchWriter = driver.batchWriter(); + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId.create("user-1337")) + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-simple-classes + class ExampleStore { + Future save(Object any) { + // ... + //#projection-into-different-store-simple-classes + return null; + //#projection-into-different-store-simple-classes + } + } + //#projection-into-different-store-simple-classes + + void demonstrateWritingIntoDifferentStoreWithMapAsync() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-simple + final ExampleStore store = new ExampleStore(); + + readJournal + .query(EventsByTag.create("bid")) + .mapAsync(1, store::save) + .runWith(Sink.ignore(), mat); + //#projection-into-different-store-simple + } + + //#projection-into-different-store + class MyResumableProjection { + private final String name; + + public MyResumableProjection(String name) { + this.name = name; + } + + public Future saveProgress(long offset) { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + public Future latestOffset() { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + } + //#projection-into-different-store + + + void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-actor-run + final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); + + final MyResumableProjection bidProjection = new MyResumableProjection("bid"); + + final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); + final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); + + long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration()); + + readJournal + .query(EventsByTag.create("bid", startFromOffset)) + .mapAsync(8, envelope -> { + final Future f = ask(writer, envelope.event(), timeout); + return f.map(new Mapper() { + @Override public Long apply(Object in) { + return envelope.offset(); + } + }, system.dispatcher()); + }) + .mapAsync(1, offset -> bidProjection.saveProgress(offset)) + .runWith(Sink.ignore(), mat); + } + + //#projection-into-different-store-actor-run + + class ComplexState { + + boolean readyToSave() { + return false; + } + } + + static class Record { + static Record of(Object any) { + return new Record(); + } + } + + //#projection-into-different-store-actor + final class TheOneWhoWritesToQueryJournal extends AbstractActor { + private final ExampleStore store; + + private ComplexState state = new ComplexState(); + + public TheOneWhoWritesToQueryJournal() { + store = new ExampleStore(); + + receive(ReceiveBuilder.matchAny(message -> { + state = updateState(state, message); + + // example saving logic that requires state to become ready: + if (state.readyToSave()) + store.save(Record.of(state)); + + }).build()); + } + + + ComplexState updateState(ComplexState state, Object msg) { + // some complicated aggregation logic here ... + return state; + } + } + //#projection-into-different-store-actor + } diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java index b4353623ee..0053246d0f 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -5,6 +5,7 @@ package docs.persistence.query; import akka.actor.Cancellable; +import akka.actor.Scheduler; import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; import akka.persistence.PersistentRepr; @@ -23,6 +24,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import static java.util.stream.Collectors.toList; @@ -39,21 +41,28 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { private final String CONTINUE = "CONTINUE"; private final int LIMIT = 1000; private long currentOffset; - private List buf = new ArrayList<>(); + private List buf = new LinkedList<>(); private Cancellable continueTask; - public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) { + public MyEventsByTagJavaPublisher(Connection connection, + String tag, + Long offset, + FiniteDuration refreshInterval) { this.connection = connection; this.tag = tag; this.currentOffset = offset; - this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self()); + final Scheduler scheduler = context().system().scheduler(); + this.continueTask = scheduler + .schedule(refreshInterval, refreshInterval, self(), CONTINUE, + context().dispatcher(), self()); + receive(ReceiveBuilder - .matchEquals(CONTINUE, (in) -> { - query(); - deliverBuf(); - }) + .matchEquals(CONTINUE, (in) -> { + query(); + deliverBuf(); + }) .match(Cancel.class, (in) -> { context().stop(self()); }) @@ -71,33 +80,33 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { private void query() { if (buf.isEmpty()) { - try { - PreparedStatement s = connection.prepareStatement( - "SELECT id, persistent_repr " + - "FROM journal WHERE tag = ? AND id >= ? " + - "ORDER BY id LIMIT ?"); + final String query = "SELECT id, persistent_repr " + + "FROM journal WHERE tag = ? AND id >= ? " + + "ORDER BY id LIMIT ?"; + try (PreparedStatement s = connection.prepareStatement(query)) { s.setString(1, tag); s.setLong(2, currentOffset); s.setLong(3, LIMIT); - final ResultSet rs = s.executeQuery(); + try (ResultSet rs = s.executeQuery()) { - final List> res = new ArrayList<>(LIMIT); - while (rs.next()) - res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); + final List> res = new ArrayList<>(LIMIT); + while (rs.next()) + res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); - if (!res.isEmpty()) { - currentOffset = res.get(res.size() - 1).first(); + if (!res.isEmpty()) { + currentOffset = res.get(res.size() - 1).first(); + } + + buf = res.stream().map(in -> { + final Long id = in.first(); + final byte[] bytes = in.second(); + + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + + return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + }).collect(toList()); } - - buf = res.stream().map(in -> { - final Long id = in.first(); - final byte[] bytes = in.second(); - - final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); - - return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); - }).collect(toList()); } catch(Exception e) { onErrorThenStop(e); } diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index cb816a1f26..b35f934325 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. -While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries -to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more -complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some -transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query -- pulling out of one Journal and storing into another one. - .. warning:: This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to @@ -58,7 +52,7 @@ journal is as simple as: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage -Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via ``getJournalFor(NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. -Again, specific capabilities are specific to the journal you are using, so you have to Materialized values of queries @@ -133,6 +126,7 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their specialised query object, as demonstrated in the sample below: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata .. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values @@ -152,18 +146,18 @@ means that data stores which are able to scale to accomodate these requirements On the other hand the same application may have some complex statistics view or we may have analists working with the data to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like -for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be projected into the other read-optimised datastore. .. note:: When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". - In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format it may be more efficient or interesting to query it (instead of the source events directly). Materialize view to Reactive Streams compatible datastore --------------------------------------------------------- -If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs @@ -179,6 +173,7 @@ you may have to implement the write logic using plain functions or Actors instea In case your write logic is state-less and you just need to convert the events from one data data type to another before writing into the alternative datastore, then the projection is as simple as: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple Resumable projections diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index 2f36623436..69d7a70047 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -11,7 +11,6 @@ import akka.serialization.SerializationExtension import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } -import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration object MyEventsByTagPublisher { @@ -90,17 +89,15 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD } } - @tailrec final def deliverBuf(): Unit = + final def deliverBuf(): Unit = if (totalDemand > 0 && buf.nonEmpty) { if (totalDemand <= Int.MaxValue) { val (use, keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { - val (use, keep) = buf.splitAt(Int.MaxValue) - buf = keep - use foreach onNext - deliverBuf() + buf foreach onNext + buf = Vector.empty } } } diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 319e1ac56b..b5e9023f18 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. -While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries -to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more -complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some -transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query -- pulling out of one Journal and storing into another one. - .. warning:: This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to @@ -58,7 +52,7 @@ journal is as simple as: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage -Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via ``journalFor(NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. -Again, specific capabilities are specific to the journal you are using, so you have to Materialized values of queries @@ -152,18 +145,18 @@ means that data stores which are able to scale to accomodate these requirements On the other hand the same application may have some complex statistics view or we may have analists working with the data to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like -for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be projected into the other read-optimised datastore. .. note:: When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". - In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format it may be more efficient or interesting to query it (instead of the source events directly). Materialize view to Reactive Streams compatible datastore --------------------------------------------------------- -If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java index c776b92c37..6bf5be87f8 100644 --- a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java @@ -26,7 +26,7 @@ import scala.annotation.varargs; * * final Source<EventEnvelope, ?> events = * journal.query(new EventsByTag("mytag", 0L)); - * + * */ public interface ReadJournal { diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala index 5b6e35e263..62a48a942d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -3,6 +3,8 @@ */ package akka.persistence.query +import java.util.concurrent.TimeUnit + import scala.concurrent.duration.FiniteDuration /** @@ -21,6 +23,12 @@ trait Hint * A plugin may optionally support this [[Hint]] for defining such a refresh interval. */ final case class RefreshInterval(interval: FiniteDuration) extends Hint +object RefreshInterval { + /** Java API */ + def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit)) + /** Java API */ + def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval) +} /** * Indicates that the event stream is supposed to be completed immediately when it diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 57ec6fc9d8..9eb150e62c 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -67,10 +67,9 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") - val pluginActorName = configPath val pluginConfig = system.settings.config.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") - log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") + log.debug(s"Create plugin: ${configPath} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get // TODO remove duplication diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala index fdca0ec0c3..f63b263e6e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -20,7 +20,11 @@ trait Query[T, M] * * A plugin may optionally support this [[Query]]. */ -final case object AllPersistenceIds extends Query[String, Unit] +final case object AllPersistenceIds extends AllPersistenceIds { + /** Java API */ + final def getInstance: AllPersistenceIds = this +} +abstract class AllPersistenceIds extends Query[String, Unit] /** * Query events for a specific `PersistentActor` identified by `persistenceId`. @@ -34,7 +38,19 @@ final case object AllPersistenceIds extends Query[String, Unit] */ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) extends Query[Any, Unit] +object EventsByPersistenceId { + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr) + + /** Java API */ + def create(persistenceId: String): EventsByPersistenceId = + EventsByPersistenceId(persistenceId) +} /** * Query events that have a specific tag. A tag can for example correspond to an * aggregate root type (in DDD terminology). @@ -56,6 +72,12 @@ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Lo * A plugin may optionally support this [[Query]]. */ final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] +object EventsByTag { + /** Java API */ + def create(tag: String): EventsByTag = EventsByTag(tag) + /** Java API */ + def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag) +} /** * Event wrapper adding meta data for the events in the result stream of diff --git a/lol b/lol new file mode 100644 index 0000000000..6c9058697d --- /dev/null +++ b/lol @@ -0,0 +1,172 @@ +adoc-api/akka/persistence/query/scaladsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/scaladsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for q +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for hints +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @return +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:23: error: end tag missing: +[error] *

+[error]         ^
+[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:29: error: unexpected end tag: 
+[error]  * 
+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/AllPersistenceIds.java:4: warning: no @return +[error] static public final akka.persistence.query.AllPersistenceIds getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for toSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for toSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for offset +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for offset +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Hint.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh.java:4: warning: no @return +[error] static public akka.persistence.query.NoRefresh getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh$.java:5: warning: empty

tag +[error] *

+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Query.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for length +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for unit +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @param for interval +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for length +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for unit +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @param for interval +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ \ No newline at end of file