From abaa8f394e8a3eea060d76bffead88c2c818f653 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 14 Dec 2016 14:04:55 +0100 Subject: [PATCH] =peq #21724 #19174 unify naming of query methods --- .../persistence/PersistenceQueryDocTest.java | 26 ++++++------ .../query/LeveldbPersistenceQueryDocTest.java | 6 +-- .../LeveldbPersistenceQueryDocSpec.scala | 6 +-- .../query/MyEventsByTagPublisher.scala | 8 ++-- .../query/PersistenceQueryDocSpec.scala | 24 +++++------ .../persistence/query/EventEnvelope.scala | 11 ----- .../scala/akka/persistence/query/Offset.scala | 6 +-- .../javadsl/CurrentEventsByTagQuery.scala | 6 +-- .../javadsl/CurrentEventsByTagQuery2.scala | 23 ---------- .../javadsl/CurrentPersistenceIdsQuery.scala | 2 +- .../query/javadsl/EventsByTagQuery.scala | 5 +-- .../query/javadsl/EventsByTagQuery2.scala | 41 ------------------ ...sQuery.scala => PersistenceIdsQuery.scala} | 4 +- .../query/javadsl/ReadJournal.scala | 2 +- .../leveldb/javadsl/LeveldbReadJournal.scala | 16 +++---- .../leveldb/scaladsl/LeveldbReadJournal.scala | 16 +++---- .../scaladsl/CurrentEventsByTagQuery.scala | 5 +-- .../scaladsl/CurrentEventsByTagQuery2.scala | 24 ----------- .../scaladsl/CurrentPersistenceIdsQuery.scala | 2 +- .../query/scaladsl/EventsByTagQuery.scala | 5 +-- .../query/scaladsl/EventsByTagQuery2.scala | 42 ------------------- ...sQuery.scala => PersistenceIdsQuery.scala} | 4 +- .../query/scaladsl/ReadJournal.scala | 2 +- .../query/DummyJavaReadJournal.java | 6 +-- .../query/DummyJavaReadJournalForScala.java | 6 +-- .../query/PersistenceQueryTest.java | 2 +- .../persistence/query/DummyReadJournal.scala | 10 ++--- .../leveldb/AllPersistenceIdsSpec.scala | 6 +-- .../leveldb/EventsByPersistenceIdSpec.scala | 4 +- .../journal/leveldb/EventsByTagSpec.scala | 38 ++++++++--------- 30 files changed, 105 insertions(+), 253 deletions(-) delete mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala delete mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala rename akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/{AllPersistenceIdsQuery.scala => PersistenceIdsQuery.scala} (88%) delete mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala delete mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala rename akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/{AllPersistenceIdsQuery.scala => PersistenceIdsQuery.scala} (88%) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 1a76bc936f..c55689731c 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -88,9 +88,9 @@ public class PersistenceQueryDocTest { //#my-read-journal public class MyJavadslReadJournal implements akka.persistence.query.javadsl.ReadJournal, - akka.persistence.query.javadsl.EventsByTagQuery2, + akka.persistence.query.javadsl.EventsByTagQuery, akka.persistence.query.javadsl.EventsByPersistenceIdQuery, - akka.persistence.query.javadsl.AllPersistenceIdsQuery, + akka.persistence.query.javadsl.PersistenceIdsQuery, akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { private final FiniteDuration refreshInterval; @@ -102,11 +102,11 @@ public class PersistenceQueryDocTest { } @Override - public Source eventsByTag(String tag, Offset offset) { + public Source eventsByTag(String tag, Offset offset) { if(offset instanceof Sequence){ Sequence sequenceOffset = (Sequence) offset; final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); - return Source.actorPublisher(props). + return Source.actorPublisher(props). mapMaterializedValue(m -> NotUsed.getInstance()); } else @@ -121,7 +121,7 @@ public class PersistenceQueryDocTest { } @Override - public Source allPersistenceIds() { + public Source persistenceIds() { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } @@ -148,9 +148,9 @@ public class PersistenceQueryDocTest { //#my-read-journal public class MyScaladslReadJournal implements akka.persistence.query.scaladsl.ReadJournal, - akka.persistence.query.scaladsl.EventsByTagQuery2, + akka.persistence.query.scaladsl.EventsByTagQuery, akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, - akka.persistence.query.scaladsl.AllPersistenceIdsQuery, + akka.persistence.query.scaladsl.PersistenceIdsQuery, akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { private final MyJavadslReadJournal javadslReadJournal; @@ -160,7 +160,7 @@ public class PersistenceQueryDocTest { } @Override - public akka.stream.scaladsl.Source eventsByTag( + public akka.stream.scaladsl.Source eventsByTag( String tag, akka.persistence.query.Offset offset) { return javadslReadJournal.eventsByTag(tag, offset).asScala(); } @@ -173,8 +173,8 @@ public class PersistenceQueryDocTest { } @Override - public akka.stream.scaladsl.Source allPersistenceIds() { - return javadslReadJournal.allPersistenceIds().asScala(); + public akka.stream.scaladsl.Source persistenceIds() { + return javadslReadJournal.persistenceIds().asScala(); } @Override @@ -218,7 +218,7 @@ public class PersistenceQueryDocTest { "akka.persistence.query.my-read-journal"); //#all-persistence-ids-live - readJournal.allPersistenceIds(); + readJournal.persistenceIds(); //#all-persistence-ids-live } @@ -256,7 +256,7 @@ public class PersistenceQueryDocTest { //#events-by-tag // assuming journal is able to work with numeric offsets we can: - final Source blueThings = + final Source blueThings = readJournal.eventsByTag("blue", new Sequence(0L)); // find top 10 blue things: @@ -270,7 +270,7 @@ public class PersistenceQueryDocTest { }, mat); // start another query, from the known offset - Source blue = readJournal.eventsByTag("blue", new Sequence(10)); + Source blue = readJournal.eventsByTag("blue", new Sequence(10)); //#events-by-tag } diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java index e8d999b552..711fbcf2fe 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -12,9 +12,7 @@ import akka.actor.ActorSystem; import akka.persistence.journal.WriteEventAdapter; import akka.persistence.journal.Tagged; import akka.persistence.query.EventEnvelope; -import akka.persistence.query.EventEnvelope2; import akka.persistence.query.Sequence; -import akka.persistence.query.javadsl.*; import akka.persistence.query.PersistenceQuery; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.stream.ActorMaterializer; @@ -51,7 +49,7 @@ public class LeveldbPersistenceQueryDocTest { PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); - Source source = queries.allPersistenceIds(); + Source source = queries.persistenceIds(); //#AllPersistenceIds } @@ -61,7 +59,7 @@ public class LeveldbPersistenceQueryDocTest { PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); - Source source = + Source source = queries.eventsByTag("green", new Sequence(0L)); //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index 0c28c2704c..e630e56bff 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.NotUsed import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec -import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.scaladsl._ import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.journal.Tagged @@ -71,7 +71,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( LeveldbReadJournal.Identifier) - val src: Source[String, NotUsed] = queries.allPersistenceIds() + val src: Source[String, NotUsed] = queries.persistenceIds() //#AllPersistenceIds } @@ -81,7 +81,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( LeveldbReadJournal.Identifier) - val src: Source[EventEnvelope2, NotUsed] = + val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag(tag = "green", offset = Sequence(0L)) //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index 0b2501c9e9..92e941f143 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.actor.Props import akka.persistence.PersistentRepr -import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope, Sequence } import akka.serialization.SerializationExtension import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } @@ -20,7 +20,7 @@ object MyEventsByTagPublisher { //#events-by-tag-publisher class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) - extends ActorPublisher[EventEnvelope2] { + extends ActorPublisher[EventEnvelope] { private case object Continue @@ -28,7 +28,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD private val Limit = 1000 private var currentOffset = offset - var buf = Vector.empty[EventEnvelope2] + var buf = Vector.empty[EventEnvelope] import context.dispatcher val continueTask = context.system.scheduler.schedule( @@ -81,7 +81,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD buf = result.map { case (id, bytes) => val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get - EventEnvelope2(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload) + EventEnvelope(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload) } } catch { case e: Exception => diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index e749c11e49..e69c304b4e 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -46,23 +46,23 @@ object PersistenceQueryDocSpec { class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config) extends akka.persistence.query.scaladsl.ReadJournal - with akka.persistence.query.scaladsl.EventsByTagQuery2 + with akka.persistence.query.scaladsl.EventsByTagQuery with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery - with akka.persistence.query.scaladsl.AllPersistenceIdsQuery + with akka.persistence.query.scaladsl.PersistenceIdsQuery with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { private val refreshInterval: FiniteDuration = config.getDuration("refresh-interval", MILLISECONDS).millis override def eventsByTag( - tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match { + tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) ⇒ val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) Source.actorPublisher[EventEnvelope](props) .mapMaterializedValue(_ => NotUsed) .map { case EventEnvelope(offset, id, seqNr, event) => - EventEnvelope2(Sequence(offset), id, seqNr, event) + EventEnvelope(Sequence(offset), id, seqNr, event) } case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") @@ -75,7 +75,7 @@ object PersistenceQueryDocSpec { ??? } - override def allPersistenceIds(): Source[String, NotUsed] = { + override def persistenceIds(): Source[String, NotUsed] = { // implement in a similar way as eventsByTag ??? } @@ -98,13 +98,13 @@ object PersistenceQueryDocSpec { class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal) extends akka.persistence.query.javadsl.ReadJournal - with akka.persistence.query.javadsl.EventsByTagQuery2 + with akka.persistence.query.javadsl.EventsByTagQuery with akka.persistence.query.javadsl.EventsByPersistenceIdQuery - with akka.persistence.query.javadsl.AllPersistenceIdsQuery + with akka.persistence.query.javadsl.PersistenceIdsQuery with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { override def eventsByTag( - tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope2, NotUsed] = + tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByPersistenceId( @@ -113,8 +113,8 @@ object PersistenceQueryDocSpec { scaladslReadJournal.eventsByPersistenceId( persistenceId, fromSequenceNr, toSequenceNr).asJava - override def allPersistenceIds(): javadsl.Source[String, NotUsed] = - scaladslReadJournal.allPersistenceIds().asJava + override def persistenceIds(): javadsl.Source[String, NotUsed] = + scaladslReadJournal.persistenceIds().asJava override def currentPersistenceIds(): javadsl.Source[String, NotUsed] = scaladslReadJournal.currentPersistenceIds().asJava @@ -219,7 +219,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#basic-usage //#all-persistence-ids-live - readJournal.allPersistenceIds() + readJournal.persistenceIds() //#all-persistence-ids-live //#all-persistence-ids-snap @@ -229,7 +229,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#events-by-tag // assuming journal is able to work with numeric offsets we can: - val blueThings: Source[EventEnvelope2, NotUsed] = + val blueThings: Source[EventEnvelope, NotUsed] = readJournal.eventsByTag("blue") // find top 10 blue things: diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index a84297a13a..29faef7457 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -8,17 +8,6 @@ package akka.persistence.query * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries. */ final case class EventEnvelope( - offset: Long, - persistenceId: String, - sequenceNr: Long, - event: Any) - -/** - * Event wrapper adding meta data for the events in the result stream of - * [[akka.persistence.query.scaladsl.EventsByTagQuery2]] query, or similar queries. - */ -// TODO: Rename it to EventEnvelope in Akka 2.5 -final case class EventEnvelope2( offset: Offset, persistenceId: String, sequenceNr: Long, diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala index bf96455b3f..75a61b3de3 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala @@ -17,11 +17,11 @@ object Offset { trait Offset -final case class Sequence(val value: Long) extends Offset with Ordered[Sequence] { +final case class Sequence(value: Long) extends Offset with Ordered[Sequence] { override def compare(that: Sequence): Int = value.compare(that.value) } -final case class TimeBasedUUID(val value: UUID) extends Offset with Ordered[TimeBasedUUID] { +final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] { if (value == null || value.version != 1) { throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID") } @@ -34,4 +34,4 @@ final case object NoOffset extends Offset { * Java API: */ def getInstance: Offset = this -} \ No newline at end of file +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala index bab9e96c6c..a8ba1b61c2 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery.scala @@ -4,13 +4,12 @@ package akka.persistence.query.javadsl import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } import akka.stream.javadsl.Source -import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this interface. */ -@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11") trait CurrentEventsByTagQuery extends ReadJournal { /** @@ -18,7 +17,6 @@ trait CurrentEventsByTagQuery extends ReadJournal { * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] - + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala deleted file mode 100644 index 57cd48b7ba..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.persistence.query.javadsl - -import akka.NotUsed -import akka.persistence.query.{ EventEnvelope2, Offset } -import akka.stream.javadsl.Source - -/** - * A plugin may optionally support this query by implementing this interface. - */ -// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5 -trait CurrentEventsByTagQuery2 extends ReadJournal { - - /** - * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream - * is completed immediately when it reaches the end of the "result set". Events that are - * stored after the query is completed are not included in the event stream. - */ - def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] -} - diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala index 66cd73b4f5..7484e2d3b6 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentPersistenceIdsQuery.scala @@ -12,7 +12,7 @@ import akka.stream.javadsl.Source trait CurrentPersistenceIdsQuery extends ReadJournal { /** - * Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream + * Same type of query as [[PersistenceIdsQuery#allPersistenceIds]] but the stream * is completed immediately when it reaches the end of the "result set". Persistent * actors that are created after the query is completed are not included in the stream. */ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala index 3851751052..5aa4577e26 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery.scala @@ -4,13 +4,12 @@ package akka.persistence.query.javadsl import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } import akka.stream.javadsl.Source -import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this interface. */ -@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11") trait EventsByTagQuery extends ReadJournal { /** @@ -36,6 +35,6 @@ trait EventsByTagQuery extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. */ - def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala deleted file mode 100644 index 6e01540845..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.persistence.query.javadsl - -import akka.NotUsed -import akka.persistence.query.{ EventEnvelope2, Offset } -import akka.stream.javadsl.Source - -/** - * A plugin may optionally support this query by implementing this interface. - */ -// TODO: Rename it to EventsByTagQuery in Akka 2.5 -trait EventsByTagQuery2 extends ReadJournal { - - /** - * Query events that have a specific tag. A tag can for example correspond to an - * aggregate root type (in DDD terminology). - * - * The consumer can keep track of its current position in the event stream by storing the - * `offset` and restart the query from a given `offset` after a crash/restart. - * - * The exact meaning of the `offset` depends on the journal and must be documented by the - * read journal plugin. It may be a sequential id number that uniquely identifies the - * position of each event within the event stream. Distributed data stores cannot easily - * support those semantics and they may use a weaker meaning. For example it may be a - * timestamp (taken when the event was created or stored). Timestamps are not unique and - * not strictly ordered, since clocks on different machines may not be synchronized. - * - * The returned event stream should be ordered by `offset` if possible, but this can also be - * difficult to fulfill for a distributed data store. The order must be documented by the - * read journal plugin. - * - * The stream is not completed when it reaches the end of the currently stored events, - * but it continues to push new events when new events are persisted. - * Corresponding query that is completed when it reaches the end of the currently - * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. - */ - def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] - -} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/PersistenceIdsQuery.scala similarity index 88% rename from akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala rename to akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/PersistenceIdsQuery.scala index 6594631cc6..e924afa72e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/AllPersistenceIdsQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/PersistenceIdsQuery.scala @@ -9,7 +9,7 @@ import akka.stream.javadsl.Source /** * A plugin may optionally support this query by implementing this interface. */ -trait AllPersistenceIdsQuery extends ReadJournal { +trait PersistenceIdsQuery extends ReadJournal { /** * Query all `PersistentActor` identifiers, i.e. as defined by the @@ -20,6 +20,6 @@ trait AllPersistenceIdsQuery extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]]. */ - def allPersistenceIds(): Source[String, NotUsed] + def persistenceIds(): Source[String, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala index 0bd21fec26..fbbff61dcf 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -12,7 +12,7 @@ package akka.persistence.query.javadsl * The interface is very open so that different journals may implement specific queries. * * There are a few pre-defined queries that a query implementation may implement, - * such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]] + * such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]] * Implementation of these queries are optional and query (journal) plugins may define * their own specialized queries by implementing other methods. * diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index de424d3e68..635a62f75a 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -4,7 +4,7 @@ package akka.persistence.query.journal.leveldb.javadsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset } +import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset } import akka.persistence.query.javadsl._ import akka.stream.javadsl.Source @@ -26,14 +26,14 @@ import akka.stream.javadsl.Source */ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal) extends ReadJournal - with AllPersistenceIdsQuery + with PersistenceIdsQuery with CurrentPersistenceIdsQuery with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery - with EventsByTagQuery2 + with EventsByTagQuery with CurrentEventsByTagQuery - with CurrentEventsByTagQuery2 { + with CurrentEventsByTagQuery { /** * `allPersistenceIds` is used for retrieving all `persistenceIds` of all @@ -53,8 +53,8 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def allPersistenceIds(): Source[String, NotUsed] = - scaladslReadJournal.allPersistenceIds().asJava + override def persistenceIds(): Source[String, NotUsed] = + scaladslReadJournal.persistenceIds().asJava /** * Same type of query as [[#allPersistenceIds]] but the stream @@ -138,7 +138,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] = + override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = @@ -149,7 +149,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] = + override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = scaladslReadJournal.currentEventsByTag(tag, offset).asJava override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index f37a096301..d32ec788c3 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -9,7 +9,7 @@ import akka.NotUsed import scala.concurrent.duration._ import akka.actor.ExtendedActorSystem -import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset, Sequence } import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher import akka.persistence.query.journal.leveldb.EventsByTagPublisher @@ -35,14 +35,14 @@ import com.typesafe.config.Config * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. */ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal - with AllPersistenceIdsQuery + with PersistenceIdsQuery with CurrentPersistenceIdsQuery with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery - with EventsByTagQuery2 + with EventsByTagQuery with CurrentEventsByTagQuery - with CurrentEventsByTagQuery2 { + with CurrentEventsByTagQuery { private val serialization = SerializationExtension(system) private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis) @@ -51,7 +51,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re private val envelopetoEnvelope2 = Flow[EventEnvelope].map { case EventEnvelope(offset, persistenceId, sequenceNr, event) ⇒ - EventEnvelope2(Sequence(offset), persistenceId, sequenceNr, event) + EventEnvelope(Sequence(offset), persistenceId, sequenceNr, event) } /** @@ -72,7 +72,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def allPersistenceIds(): Source[String, NotUsed] = { + override def persistenceIds(): Source[String, NotUsed] = { // no polling for this query, the write journal will push all changes, i.e. // no refreshInterval Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId)) @@ -171,7 +171,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = + override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) ⇒ eventsByTag(tag, offsetValue).via(envelopetoEnvelope2) @@ -191,7 +191,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = + override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) ⇒ currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala index d6db612c3b..2b65192377 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery.scala @@ -4,13 +4,12 @@ package akka.persistence.query.scaladsl import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } import akka.stream.scaladsl.Source -import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this trait. */ -@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11") trait CurrentEventsByTagQuery extends ReadJournal { /** @@ -18,7 +17,7 @@ trait CurrentEventsByTagQuery extends ReadJournal { * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala deleted file mode 100644 index 9d611488f0..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.persistence.query.scaladsl - -import akka.NotUsed -import akka.persistence.query.{ EventEnvelope2, Offset } -import akka.stream.scaladsl.Source - -/** - * A plugin may optionally support this query by implementing this trait. - */ -// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5 -trait CurrentEventsByTagQuery2 extends ReadJournal { - - /** - * Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream - * is completed immediately when it reaches the end of the "result set". Events that are - * stored after the query is completed are not included in the event stream. - */ - def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] - -} - diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala index f4ea569a3d..0dfabfe034 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentPersistenceIdsQuery.scala @@ -12,7 +12,7 @@ import akka.stream.scaladsl.Source trait CurrentPersistenceIdsQuery extends ReadJournal { /** - * Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream + * Same type of query as [[PersistenceIdsQuery#allPersistenceIds]] but the stream * is completed immediately when it reaches the end of the "result set". Persistent * actors that are created after the query is completed are not included in the stream. */ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala index b699380350..ea5e8ec4ca 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery.scala @@ -4,13 +4,12 @@ package akka.persistence.query.scaladsl import akka.NotUsed +import akka.persistence.query.{ EventEnvelope, Offset } import akka.stream.scaladsl.Source -import akka.persistence.query.EventEnvelope /** * A plugin may optionally support this query by implementing this trait. */ -@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11") trait EventsByTagQuery extends ReadJournal { /** @@ -36,7 +35,7 @@ trait EventsByTagQuery extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. */ - def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala deleted file mode 100644 index 05c86f4ea2..0000000000 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.persistence.query.scaladsl - -import akka.NotUsed -import akka.persistence.query.{ EventEnvelope2, Offset } -import akka.stream.scaladsl.Source - -/** - * A plugin may optionally support this query by implementing this trait. - */ -// TODO: Rename it to EventsByTagQuery in Akka 2.5 -trait EventsByTagQuery2 extends ReadJournal { - - /** - * Query events that have a specific tag. A tag can for example correspond to an - * aggregate root type (in DDD terminology). - * - * The consumer can keep track of its current position in the event stream by storing the - * `offset` and restart the query from a given `offset` after a crash/restart. - * - * The exact meaning of the `offset` depends on the journal and must be documented by the - * read journal plugin. It may be a sequential id number that uniquely identifies the - * position of each event within the event stream. Distributed data stores cannot easily - * support those semantics and they may use a weaker meaning. For example it may be a - * timestamp (taken when the event was created or stored). Timestamps are not unique and - * not strictly ordered, since clocks on different machines may not be synchronized. - * - * The returned event stream should be ordered by `offset` if possible, but this can also be - * difficult to fulfill for a distributed data store. The order must be documented by the - * read journal plugin. - * - * The stream is not completed when it reaches the end of the currently stored events, - * but it continues to push new events when new events are persisted. - * Corresponding query that is completed when it reaches the end of the currently - * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. - */ - def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] - -} - diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/PersistenceIdsQuery.scala similarity index 88% rename from akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala rename to akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/PersistenceIdsQuery.scala index 28ebd5a1e9..b66342321b 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/AllPersistenceIdsQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/PersistenceIdsQuery.scala @@ -9,7 +9,7 @@ import akka.stream.scaladsl.Source /** * A plugin may optionally support this query by implementing this trait. */ -trait AllPersistenceIdsQuery extends ReadJournal { +trait PersistenceIdsQuery extends ReadJournal { /** * Query all `PersistentActor` identifiers, i.e. as defined by the @@ -20,6 +20,6 @@ trait AllPersistenceIdsQuery extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]]. */ - def allPersistenceIds(): Source[String, NotUsed] + def persistenceIds(): Source[String, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala index 88bcaafc26..889b5b5b46 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -12,7 +12,7 @@ package akka.persistence.query.scaladsl * The interface is very open so that different journals may implement specific queries. * * There are a few pre-defined queries that a query implementation may implement, - * such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]] + * such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]] * Implementation of these queries are optional and query (journal) plugins may define * their own specialized queries by implementing other methods. * diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java index 3d05fd8d02..9a6c8b4a6d 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournal.java @@ -7,7 +7,7 @@ package akka.persistence.query; import java.util.Iterator; import akka.NotUsed; -import akka.persistence.query.javadsl.AllPersistenceIdsQuery; +import akka.persistence.query.javadsl.PersistenceIdsQuery; import akka.persistence.query.javadsl.ReadJournal; import akka.stream.javadsl.Source; @@ -15,12 +15,12 @@ import akka.stream.javadsl.Source; * Use for tests only! * Emits infinite stream of strings (representing queried for events). */ -public class DummyJavaReadJournal implements ReadJournal, AllPersistenceIdsQuery { +public class DummyJavaReadJournal implements ReadJournal, PersistenceIdsQuery { public static final String Identifier = "akka.persistence.query.journal.dummy-java"; @Override - public Source allPersistenceIds() { + public Source persistenceIds() { return Source.fromIterator(() -> new Iterator() { private int i = 0; @Override public boolean hasNext() { return true; } diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java index b8a05e42bd..458df25caa 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/DummyJavaReadJournalForScala.java @@ -11,7 +11,7 @@ import akka.NotUsed; * Emits infinite stream of strings (representing queried for events). */ public class DummyJavaReadJournalForScala implements akka.persistence.query.scaladsl.ReadJournal, - akka.persistence.query.scaladsl.AllPersistenceIdsQuery { + akka.persistence.query.scaladsl.PersistenceIdsQuery { public static final String Identifier = DummyJavaReadJournal.Identifier; @@ -22,8 +22,8 @@ public class DummyJavaReadJournalForScala implements akka.persistence.query.scal } @Override - public akka.stream.scaladsl.Source allPersistenceIds() { - return readJournal.allPersistenceIds().asScala(); + public akka.stream.scaladsl.Source persistenceIds() { + return readJournal.persistenceIds().asScala(); } } diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java index 6d13dc407e..01c3ac28bd 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -23,6 +23,6 @@ public class PersistenceQueryTest { public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception { final DummyJavaReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class, "noop-journal"); - final akka.stream.javadsl.Source ids = readJournal.allPersistenceIds(); + final akka.stream.javadsl.Source ids = readJournal.persistenceIds(); } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala index ddcb824d5b..4d71f3ab0e 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -12,8 +12,8 @@ import com.typesafe.config.{ Config, ConfigFactory } * Use for tests only! * Emits infinite stream of strings (representing queried for events). */ -class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.AllPersistenceIdsQuery { - override def allPersistenceIds(): Source[String, NotUsed] = +class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery { + override def persistenceIds(): Source[String, NotUsed] = Source.fromIterator(() ⇒ Iterator.from(0)).map(_.toString) } @@ -21,9 +21,9 @@ object DummyReadJournal { final val Identifier = "akka.persistence.query.journal.dummy" } -class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.ReadJournal with javadsl.AllPersistenceIdsQuery { - override def allPersistenceIds(): akka.stream.javadsl.Source[String, NotUsed] = - readJournal.allPersistenceIds().asJava +class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.ReadJournal with javadsl.PersistenceIdsQuery { + override def persistenceIds(): akka.stream.javadsl.Source[String, NotUsed] = + readJournal.persistenceIds().asJava } object DummyReadJournalProvider { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala index 937043f68c..90a2f232c6 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala @@ -7,7 +7,7 @@ import scala.concurrent.duration._ import akka.persistence.query.PersistenceQuery import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal -import akka.persistence.query.scaladsl.AllPersistenceIdsQuery +import akka.persistence.query.scaladsl.PersistenceIdsQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec @@ -32,7 +32,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) "Leveldb query AllPersistenceIds" must { "implement standard AllPersistenceIdsQuery" in { - queries.isInstanceOf[AllPersistenceIdsQuery] should ===(true) + queries.isInstanceOf[PersistenceIdsQuery] should ===(true) } "find existing persistenceIds" in { @@ -57,7 +57,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) system.actorOf(TestActor.props("d")) ! "d1" expectMsg("d1-done") - val src = queries.allPersistenceIds() + val src = queries.persistenceIds() val probe = src.runWith(TestSink.probe[String]) probe.within(10.seconds) { probe.request(5) diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 61073986de..c323118942 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.actor.ActorRef import akka.persistence.query.PersistenceQuery import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal -import akka.persistence.query.scaladsl.EventsByTagQuery2 +import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec @@ -49,7 +49,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi "Leveldb query EventsByPersistenceId" must { "implement standard EventsByTagQuery" in { - queries.isInstanceOf[EventsByTagQuery2] should ===(true) + queries.isInstanceOf[EventsByTagQuery] should ===(true) } "find existing events" in { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index f6df58a329..ea432a466f 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -6,9 +6,9 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ import akka.persistence.journal.Tagged import akka.persistence.journal.WriteEventAdapter -import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal -import akka.persistence.query.scaladsl.EventsByTagQuery2 +import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec @@ -55,7 +55,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "Leveldb query EventsByTag" must { "implement standard EventsByTagQuery" in { - queries.isInstanceOf[EventsByTagQuery2] should ===(true) + queries.isInstanceOf[EventsByTagQuery] should ===(true) } "find existing events" in { @@ -75,17 +75,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) greenSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple")) - .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) .expectNoMsg(500.millis) .request(2) - .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L)) blackSrc.runWith(TestSink.probe[Any]) .request(5) - .expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car")) + .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")) .expectComplete() } @@ -95,8 +95,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple")) - .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) .expectNoMsg(100.millis) c ! "a green cucumber" @@ -105,7 +105,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) probe .expectNoMsg(100.millis) .request(5) - .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() // green cucumber not seen } @@ -113,9 +113,9 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) - .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) - .expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) .expectComplete() } } @@ -127,7 +127,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L)) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car")) + .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")) .expectNoMsg(100.millis) d ! "a black dog" @@ -136,19 +136,19 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) expectMsg(s"a black night-done") probe - .expectNext(EventEnvelope2(Sequence(2L), "d", 1L, "a black dog")) + .expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog")) .expectNoMsg(100.millis) .request(10) - .expectNext(EventEnvelope2(Sequence(3L), "d", 2L, "a black night")) + .expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night")) } "find events from offset" in { val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) - .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) - .expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) .expectNoMsg(100.millis) }