From 375c032604b2b9cfa924261622078d22e009bb58 Mon Sep 17 00:00:00 2001 From: Richard Imaoka Date: Mon, 17 Oct 2016 23:45:27 +0900 Subject: [PATCH] eventsByTag and currentEventsByTag to use Offset (#21615) * EventsByTagQuery2 and CurrentEventsByTagQuery2 to keep binary compatibility --- .../persistence/PersistenceQueryDocTest.java | 47 ++++++++----------- .../query/LeveldbPersistenceQueryDocTest.java | 4 +- .../LeveldbPersistenceQueryDocSpec.scala | 7 +-- .../query/PersistenceQueryDocSpec.scala | 28 ++++++----- .../scala/akka/persistence/query/Offset.scala | 23 +++++++++ .../javadsl/CurrentEventsByTagQuery.scala | 1 + .../javadsl/CurrentEventsByTagQuery2.scala | 24 ++++++++++ .../query/javadsl/EventsByTagQuery.scala | 1 + .../query/javadsl/EventsByTagQuery2.scala | 41 ++++++++++++++++ .../leveldb/javadsl/LeveldbReadJournal.scala | 12 ++++- .../leveldb/scaladsl/LeveldbReadJournal.scala | 31 ++++++++++-- .../scaladsl/CurrentEventsByTagQuery.scala | 1 + .../scaladsl/CurrentEventsByTagQuery2.scala | 24 ++++++++++ .../query/scaladsl/EventsByTagQuery.scala | 1 + .../query/scaladsl/EventsByTagQuery2.scala | 42 +++++++++++++++++ .../leveldb/EventsByPersistenceIdSpec.scala | 4 +- .../journal/leveldb/EventsByTagSpec.scala | 20 ++++---- 17 files changed, 247 insertions(+), 64 deletions(-) create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index da19d6d5b9..96d4355614 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -7,40 +7,26 @@ package docs.persistence; import static akka.pattern.PatternsCS.ask; import java.util.HashSet; import java.util.Set; -import java.util.Iterator; import akka.NotUsed; +import akka.persistence.query.Sequence; +import akka.persistence.query.Offset; import com.typesafe.config.Config; import akka.actor.*; -import akka.dispatch.Mapper; -import akka.event.EventStreamSpec; -import akka.japi.Function; -import akka.japi.Procedure; import akka.japi.pf.ReceiveBuilder; -import akka.pattern.BackoffSupervisor; -import akka.persistence.*; import akka.persistence.query.*; -import akka.persistence.query.javadsl.ReadJournal; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; -import docs.persistence.query.PersistenceQueryDocSpec; import org.reactivestreams.Subscriber; -import scala.collection.Seq; -import scala.collection.immutable.Vector; -import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import scala.runtime.Boxed; -import scala.runtime.BoxedUnit; -import java.io.Serializable; + import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -102,11 +88,11 @@ public class PersistenceQueryDocTest { //#my-read-journal public class MyJavadslReadJournal implements akka.persistence.query.javadsl.ReadJournal, - akka.persistence.query.javadsl.EventsByTagQuery, + akka.persistence.query.javadsl.EventsByTagQuery2, akka.persistence.query.javadsl.EventsByPersistenceIdQuery, akka.persistence.query.javadsl.AllPersistenceIdsQuery, akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { - + private final FiniteDuration refreshInterval; public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { @@ -116,10 +102,15 @@ public class PersistenceQueryDocTest { } @Override - public Source eventsByTag(String tag, long offset) { - final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval); - return Source.actorPublisher(props). + public Source eventsByTag(String tag, Offset offset) { + if(offset instanceof Sequence){ + Sequence sequenceOffset = (Sequence) offset; + final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); + return Source.actorPublisher(props). mapMaterializedValue(m -> NotUsed.getInstance()); + } + else + throw new IllegalArgumentException("MyJavadslReadJournal does not support " + offset.getClass().getName() + " offsets"); } @Override @@ -157,7 +148,7 @@ public class PersistenceQueryDocTest { //#my-read-journal public class MyScaladslReadJournal implements akka.persistence.query.scaladsl.ReadJournal, - akka.persistence.query.scaladsl.EventsByTagQuery, + akka.persistence.query.scaladsl.EventsByTagQuery2, akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, akka.persistence.query.scaladsl.AllPersistenceIdsQuery, akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { @@ -170,7 +161,7 @@ public class PersistenceQueryDocTest { @Override public akka.stream.scaladsl.Source eventsByTag( - String tag, long offset) { + String tag, akka.persistence.query.Offset offset) { return javadslReadJournal.eventsByTag(tag, offset).asScala(); } @@ -266,7 +257,7 @@ public class PersistenceQueryDocTest { //#events-by-tag // assuming journal is able to work with numeric offsets we can: final Source blueThings = - readJournal.eventsByTag("blue", 0L); + readJournal.eventsByTag("blue", new Sequence(0L)); // find top 10 blue things: final Future> top10BlueThings = @@ -279,7 +270,7 @@ public class PersistenceQueryDocTest { }, mat); // start another query, from the known offset - Source blue = readJournal.eventsByTag("blue", 10); + Source blue = readJournal.eventsByTag("blue", new Sequence(10)); //#events-by-tag } @@ -366,7 +357,7 @@ public class PersistenceQueryDocTest { final ExampleStore store = new ExampleStore(); readJournal - .eventsByTag("bid", 0L) + .eventsByTag("bid", new Sequence(0L)) .mapAsync(1, store::save) .runWith(Sink.ignore(), mat); //#projection-into-different-store-simple @@ -416,7 +407,7 @@ public class PersistenceQueryDocTest { long startFromOffset = bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS); readJournal - .eventsByTag("bid", startFromOffset) + .eventsByTag("bid", new Sequence(startFromOffset)) .mapAsync(8, envelope -> { final CompletionStage f = ask(writer, envelope.event(), timeout); return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher()); diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java index cfe5599698..5752c89979 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -12,6 +12,8 @@ import akka.actor.ActorSystem; import akka.persistence.journal.WriteEventAdapter; import akka.persistence.journal.Tagged; import akka.persistence.query.EventEnvelope; +import akka.persistence.query.Sequence; +import akka.persistence.query.javadsl.*; import akka.persistence.query.PersistenceQuery; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.stream.ActorMaterializer; @@ -59,7 +61,7 @@ public class LeveldbPersistenceQueryDocTest { LeveldbReadJournal.Identifier()); Source source = - queries.eventsByTag("green", 0); + queries.eventsByTag("green", new Sequence(0L)); //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index 35206b7cd8..d3486daedf 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -6,12 +6,13 @@ package docs.persistence.query import akka.NotUsed import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec -import akka.persistence.query.PersistenceQuery +import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.scaladsl._ import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.journal.Tagged import akka.stream.scaladsl.Source -import akka.persistence.query.EventEnvelope import akka.stream.ActorMaterializer + import scala.annotation.tailrec object LeveldbPersistenceQueryDocSpec { @@ -81,7 +82,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { LeveldbReadJournal.Identifier) val src: Source[EventEnvelope, NotUsed] = - queries.eventsByTag(tag = "green", offset = 0L) + queries.eventsByTag(tag = "green", offset = Sequence(0L)) //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index bc915b886d..c341a2652b 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -6,15 +6,16 @@ package docs.persistence.query import akka.NotUsed import akka.actor._ -import akka.persistence.{ Recovery, PersistentActor } +import akka.persistence.{ PersistentActor, Recovery } import akka.persistence.query._ -import akka.stream.{ FlowShape, ActorMaterializer } +import akka.stream.{ ActorMaterializer, FlowShape } import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.javadsl import akka.testkit.AkkaSpec import akka.util.Timeout -import docs.persistence.query.PersistenceQueryDocSpec.{ TheOneWhoWritesToQueryJournal } +import docs.persistence.query.PersistenceQueryDocSpec.TheOneWhoWritesToQueryJournal import org.reactivestreams.Subscriber + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration @@ -45,7 +46,7 @@ object PersistenceQueryDocSpec { class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config) extends akka.persistence.query.scaladsl.ReadJournal - with akka.persistence.query.scaladsl.EventsByTagQuery + with akka.persistence.query.scaladsl.EventsByTagQuery2 with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery with akka.persistence.query.scaladsl.AllPersistenceIdsQuery with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { @@ -54,10 +55,13 @@ object PersistenceQueryDocSpec { config.getDuration("refresh-interval", MILLISECONDS).millis override def eventsByTag( - tag: String, offset: Long = 0L): Source[EventEnvelope, NotUsed] = { - val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval) - Source.actorPublisher[EventEnvelope](props) - .mapMaterializedValue(_ => NotUsed) + tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { + case Sequence(offsetValue) ⇒ + val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) + Source.actorPublisher[EventEnvelope](props) + .mapMaterializedValue(_ => NotUsed) + case _ ⇒ + throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } override def eventsByPersistenceId( @@ -90,13 +94,13 @@ object PersistenceQueryDocSpec { class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal) extends akka.persistence.query.javadsl.ReadJournal - with akka.persistence.query.javadsl.EventsByTagQuery + with akka.persistence.query.javadsl.EventsByTagQuery2 with akka.persistence.query.javadsl.EventsByPersistenceIdQuery with akka.persistence.query.javadsl.AllPersistenceIdsQuery with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { override def eventsByTag( - tag: String, offset: Long = 0L): javadsl.Source[EventEnvelope, NotUsed] = + tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByPersistenceId( @@ -232,7 +236,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { .runFold(Vector.empty[Any])(_ :+ _) // start another query, from the known offset - val furtherBlueThings = readJournal.eventsByTag("blue", offset = 10) + val furtherBlueThings = readJournal.eventsByTag("blue", offset = Sequence(10)) //#events-by-tag //#events-by-persistent-id @@ -279,7 +283,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { bidProjection.latestOffset.foreach { startFromOffset => readJournal - .eventsByTag("bid", startFromOffset) + .eventsByTag("bid", Sequence(startFromOffset)) .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) } .mapAsync(1) { offset => bidProjection.saveProgress(offset) } .runWith(Sink.ignore) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala new file mode 100644 index 0000000000..ccafd770c9 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ + +package akka.persistence.query + +import java.util.UUID + +trait Offset + +final case class Sequence(val value: Long) extends Offset with Ordered[Sequence] { + override def compare(that: Sequence): Int = value.compare(that.value) +} + +final case class TimeBasedUUID(val value: UUID) extends Offset with Ordered[TimeBasedUUID] { + if (value == null || value.version != 1) { + throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID") + } + + override def compare(other: TimeBasedUUID): Int = value.compareTo(other.value) +} + +final case object NoOffset extends Offset \ No newline at end of file diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala index 8dc2e170cc..bab9e96c6c 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala @@ -10,6 +10,7 @@ import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this interface. */ +@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11") trait CurrentEventsByTagQuery extends ReadJournal { /** diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala new file mode 100644 index 0000000000..9bb7956914 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.persistence.query.javadsl + +import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } +import akka.stream.javadsl.Source + +/** + * A plugin may optionally support this query by implementing this interface. + */ +// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5 +trait CurrentEventsByTagQuery2 extends ReadJournal { + + /** + * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala index 383e68c935..3851751052 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala @@ -10,6 +10,7 @@ import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this interface. */ +@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11") trait EventsByTagQuery extends ReadJournal { /** diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala new file mode 100644 index 0000000000..d376316c48 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.persistence.query.javadsl + +import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } +import akka.stream.javadsl.Source + +/** + * A plugin may optionally support this query by implementing this interface. + */ +// TODO: Rename it to EventsByTagQuery in Akka 2.5 +trait EventsByTagQuery2 extends ReadJournal { + + /** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. + */ + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index 60027385f4..b419674909 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -5,7 +5,7 @@ package akka.persistence.query.journal.leveldb.javadsl import akka.NotUsed -import akka.persistence.query.EventEnvelope +import akka.persistence.query.{ EventEnvelope, Offset } import akka.persistence.query.javadsl._ import akka.stream.javadsl.Source @@ -32,7 +32,9 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery - with CurrentEventsByTagQuery { + with EventsByTagQuery2 + with CurrentEventsByTagQuery + with CurrentEventsByTagQuery2 { /** * `allPersistenceIds` is used for retrieving all `persistenceIds` of all @@ -137,6 +139,9 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ + override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = + scaladslReadJournal.eventsByTag(tag, offset).asJava + override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava @@ -145,6 +150,9 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ + override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = + scaladslReadJournal.currentEventsByTag(tag, offset).asJava + override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = scaladslReadJournal.currentEventsByTag(tag, offset).asJava } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index 0187eb46b6..8a37db3d22 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -8,9 +8,8 @@ import java.net.URLEncoder import akka.NotUsed import scala.concurrent.duration._ - import akka.actor.ExtendedActorSystem -import akka.persistence.query.EventEnvelope +import akka.persistence.query.{ EventEnvelope, Offset, Sequence } import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher import akka.persistence.query.journal.leveldb.EventsByTagPublisher @@ -41,7 +40,9 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery - with CurrentEventsByTagQuery { + with EventsByTagQuery2 + with CurrentEventsByTagQuery + with CurrentEventsByTagQuery2 { private val serialization = SerializationExtension(system) private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis) @@ -165,7 +166,17 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def eventsByTag(tag: String, offset: Long = 0L): Source[EventEnvelope, NotUsed] = { + override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = + offset match { + case Sequence(offsetValue) ⇒ + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue, + refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) + .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + case _ ⇒ + throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") + } + + override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = { Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) @@ -176,7 +187,17 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - override def currentEventsByTag(tag: String, offset: Long = 0L): Source[EventEnvelope, NotUsed] = { + override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = + offset match { + case Sequence(offsetValue) ⇒ + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue, + None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) + .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + case _ ⇒ + throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") + } + + override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = { Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala index 1ec601f826..d6db612c3b 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala @@ -10,6 +10,7 @@ import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this trait. */ +@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11") trait CurrentEventsByTagQuery extends ReadJournal { /** diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala new file mode 100644 index 0000000000..3026fb78db --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.persistence.query.scaladsl + +import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } +import akka.stream.scaladsl.Source + +/** + * A plugin may optionally support this query by implementing this trait. + */ +// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5 +trait CurrentEventsByTagQuery2 extends ReadJournal { + + /** + * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream + * is completed immediately when it reaches the end of the "result set". Events that are + * stored after the query is completed are not included in the event stream. + */ + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala index 673d775735..b699380350 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala @@ -10,6 +10,7 @@ import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this trait. */ +@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11") trait EventsByTagQuery extends ReadJournal { /** diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala new file mode 100644 index 0000000000..e3727f9b7f --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.persistence.query.scaladsl + +import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } +import akka.stream.scaladsl.Source + +/** + * A plugin may optionally support this query by implementing this trait. + */ +// TODO: Rename it to EventsByTagQuery in Akka 2.5 +trait EventsByTagQuery2 extends ReadJournal { + + /** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * The stream is not completed when it reaches the end of the currently stored events, + * but it continues to push new events when new events are persisted. + * Corresponding query that is completed when it reaches the end of the currently + * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. + */ + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + +} + diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index c323118942..61073986de 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.actor.ActorRef import akka.persistence.query.PersistenceQuery import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal -import akka.persistence.query.scaladsl.EventsByTagQuery +import akka.persistence.query.scaladsl.EventsByTagQuery2 import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec @@ -49,7 +49,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "Leveldb query EventsByPersistenceId" must { "implement standard EventsByTagQuery" in { - queries.isInstanceOf[EventsByTagQuery] should ===(true) + queries.isInstanceOf[EventsByTagQuery2] should ===(true) } "find existing events" in { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index 1272baba50..e61f6bc70d 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -4,13 +4,11 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ - import akka.persistence.journal.Tagged import akka.persistence.journal.WriteEventAdapter -import akka.persistence.query.EventEnvelope -import akka.persistence.query.PersistenceQuery +import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal -import akka.persistence.query.scaladsl.EventsByTagQuery +import akka.persistence.query.scaladsl.EventsByTagQuery2 import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec @@ -57,7 +55,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "Leveldb query EventsByTag" must { "implement standard EventsByTagQuery" in { - queries.isInstanceOf[EventsByTagQuery] should ===(true) + queries.isInstanceOf[EventsByTagQuery2] should ===(true) } "find existing events" in { @@ -74,7 +72,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) b ! "a green leaf" expectMsg(s"a green leaf-done") - val greenSrc = queries.currentEventsByTag(tag = "green", offset = 0L) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) greenSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) @@ -84,7 +82,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) .expectComplete() - val blackSrc = queries.currentEventsByTag(tag = "black", offset = 0L) + val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L)) blackSrc.runWith(TestSink.probe[Any]) .request(5) .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) @@ -94,7 +92,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "not see new events after demand request" in { val c = system.actorOf(TestActor.props("c")) - val greenSrc = queries.currentEventsByTag(tag = "green", offset = 0L) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) @@ -112,7 +110,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) } "find events from offset" in { - val greenSrc = queries.currentEventsByTag(tag = "green", offset = 2L) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) @@ -126,7 +124,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "find new events" in { val d = system.actorOf(TestActor.props("d")) - val blackSrc = queries.eventsByTag(tag = "black", offset = 0L) + val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L)) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) @@ -145,7 +143,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) } "find events from offset" in { - val greenSrc = queries.eventsByTag(tag = "green", offset = 2L) + val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) .expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))