diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 96d4355614..1a76bc936f 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -102,11 +102,11 @@ public class PersistenceQueryDocTest { } @Override - public Source eventsByTag(String tag, Offset offset) { + public Source eventsByTag(String tag, Offset offset) { if(offset instanceof Sequence){ Sequence sequenceOffset = (Sequence) offset; final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); - return Source.actorPublisher(props). + return Source.actorPublisher(props). mapMaterializedValue(m -> NotUsed.getInstance()); } else @@ -160,7 +160,7 @@ public class PersistenceQueryDocTest { } @Override - public akka.stream.scaladsl.Source eventsByTag( + public akka.stream.scaladsl.Source eventsByTag( String tag, akka.persistence.query.Offset offset) { return javadslReadJournal.eventsByTag(tag, offset).asScala(); } @@ -256,7 +256,7 @@ public class PersistenceQueryDocTest { //#events-by-tag // assuming journal is able to work with numeric offsets we can: - final Source blueThings = + final Source blueThings = readJournal.eventsByTag("blue", new Sequence(0L)); // find top 10 blue things: @@ -270,7 +270,7 @@ public class PersistenceQueryDocTest { }, mat); // start another query, from the known offset - Source blue = readJournal.eventsByTag("blue", new Sequence(10)); + Source blue = readJournal.eventsByTag("blue", new Sequence(10)); //#events-by-tag } @@ -371,7 +371,7 @@ public class PersistenceQueryDocTest { this.name = name; } - public CompletionStage saveProgress(long offset) { + public CompletionStage saveProgress(Offset offset) { // ... //#projection-into-different-store return null; diff --git a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java index 5752c89979..e8d999b552 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java @@ -12,6 +12,7 @@ import akka.actor.ActorSystem; import akka.persistence.journal.WriteEventAdapter; import akka.persistence.journal.Tagged; import akka.persistence.query.EventEnvelope; +import akka.persistence.query.EventEnvelope2; import akka.persistence.query.Sequence; import akka.persistence.query.javadsl.*; import akka.persistence.query.PersistenceQuery; @@ -60,7 +61,7 @@ public class LeveldbPersistenceQueryDocTest { PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); - Source source = + Source source = queries.eventsByTag("green", new Sequence(0L)); //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index d3486daedf..0c28c2704c 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.NotUsed import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec -import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence } import akka.persistence.query.scaladsl._ import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.journal.Tagged @@ -81,7 +81,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]( LeveldbReadJournal.Identifier) - val src: Source[EventEnvelope, NotUsed] = + val src: Source[EventEnvelope2, NotUsed] = queries.eventsByTag(tag = "green", offset = Sequence(0L)) //#EventsByTag } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index a0f972e95c..0b2501c9e9 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.actor.Props import akka.persistence.PersistentRepr -import akka.persistence.query.EventEnvelope +import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Sequence } import akka.serialization.SerializationExtension import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } @@ -20,7 +20,7 @@ object MyEventsByTagPublisher { //#events-by-tag-publisher class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) - extends ActorPublisher[EventEnvelope] { + extends ActorPublisher[EventEnvelope2] { private case object Continue @@ -28,7 +28,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD private val Limit = 1000 private var currentOffset = offset - var buf = Vector.empty[EventEnvelope] + var buf = Vector.empty[EventEnvelope2] import context.dispatcher val continueTask = context.system.scheduler.schedule( @@ -81,7 +81,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD buf = result.map { case (id, bytes) => val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get - EventEnvelope(offset = id, p.persistenceId, p.sequenceNr, p.payload) + EventEnvelope2(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload) } } catch { case e: Exception => diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index c341a2652b..e749c11e49 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -55,11 +55,15 @@ object PersistenceQueryDocSpec { config.getDuration("refresh-interval", MILLISECONDS).millis override def eventsByTag( - tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { + tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match { case Sequence(offsetValue) ⇒ val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) Source.actorPublisher[EventEnvelope](props) .mapMaterializedValue(_ => NotUsed) + .map { + case EventEnvelope(offset, id, seqNr, event) => + EventEnvelope2(Sequence(offset), id, seqNr, event) + } case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } @@ -100,7 +104,7 @@ object PersistenceQueryDocSpec { with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { override def eventsByTag( - tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] = + tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope2, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByPersistenceId( @@ -225,7 +229,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#events-by-tag // assuming journal is able to work with numeric offsets we can: - val blueThings: Source[EventEnvelope, NotUsed] = + val blueThings: Source[EventEnvelope2, NotUsed] = readJournal.eventsByTag("blue") // find top 10 blue things: @@ -262,7 +266,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { //#projection-into-different-store class MyResumableProjection(name: String) { - def saveProgress(offset: Long): Future[Long] = ??? + def saveProgress(offset: Offset): Future[Long] = ??? def latestOffset: Future[Long] = ??? } //#projection-into-different-store diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index 67ac0c15ae..a84297a13a 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -12,3 +12,14 @@ final case class EventEnvelope( persistenceId: String, sequenceNr: Long, event: Any) + +/** + * Event wrapper adding meta data for the events in the result stream of + * [[akka.persistence.query.scaladsl.EventsByTagQuery2]] query, or similar queries. + */ +// TODO: Rename it to EventEnvelope in Akka 2.5 +final case class EventEnvelope2( + offset: Offset, + persistenceId: String, + sequenceNr: Long, + event: Any) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala index 9bb7956914..57cd48b7ba 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/CurrentEventsByTagQuery2.scala @@ -4,7 +4,7 @@ package akka.persistence.query.javadsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope2, Offset } import akka.stream.javadsl.Source /** @@ -18,7 +18,6 @@ trait CurrentEventsByTagQuery2 extends ReadJournal { * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] - + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala index d376316c48..6e01540845 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/EventsByTagQuery2.scala @@ -4,7 +4,7 @@ package akka.persistence.query.javadsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope2, Offset } import akka.stream.javadsl.Source /** @@ -36,6 +36,6 @@ trait EventsByTagQuery2 extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. */ - def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index b419674909..de424d3e68 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -4,8 +4,7 @@ package akka.persistence.query.journal.leveldb.javadsl import akka.NotUsed - -import akka.persistence.query.{ EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset } import akka.persistence.query.javadsl._ import akka.stream.javadsl.Source @@ -139,7 +138,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = + override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = @@ -150,7 +149,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = + override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] = scaladslReadJournal.currentEventsByTag(tag, offset).asJava override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index 8a37db3d22..f37a096301 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -9,14 +9,14 @@ import akka.NotUsed import scala.concurrent.duration._ import akka.actor.ExtendedActorSystem -import akka.persistence.query.{ EventEnvelope, Offset, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset, Sequence } import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher import akka.persistence.query.journal.leveldb.EventsByTagPublisher import akka.persistence.query.scaladsl._ import akka.persistence.query.scaladsl.ReadJournal import akka.serialization.SerializationExtension -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Flow, Source } import akka.util.ByteString import com.typesafe.config.Config @@ -49,6 +49,11 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re private val writeJournalPluginId: String = config.getString("write-plugin") private val maxBufSize: Int = config.getInt("max-buffer-size") + private val envelopetoEnvelope2 = Flow[EventEnvelope].map { + case EventEnvelope(offset, persistenceId, sequenceNr, event) ⇒ + EventEnvelope2(Sequence(offset), persistenceId, sequenceNr, event) + } + /** * `allPersistenceIds` is used for retrieving all `persistenceIds` of all * persistent actors. @@ -166,19 +171,18 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * The stream is completed with failure if there is a failure in executing the query in the * backend journal. */ - override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = + override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match { case Sequence(offsetValue) ⇒ - Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue, - refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) - .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + eventsByTag(tag, offsetValue).via(envelopetoEnvelope2) case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = { Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, - refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) + refreshInterval, maxBufSize, writeJournalPluginId)) + .mapMaterializedValue(_ ⇒ NotUsed) .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) } @@ -187,12 +191,11 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = + override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match { case Sequence(offsetValue) ⇒ - Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue, - None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) - .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2) + case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala index 3026fb78db..9d611488f0 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/CurrentEventsByTagQuery2.scala @@ -4,7 +4,7 @@ package akka.persistence.query.scaladsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope2, Offset } import akka.stream.scaladsl.Source /** @@ -18,7 +18,7 @@ trait CurrentEventsByTagQuery2 extends ReadJournal { * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ - def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala index e3727f9b7f..05c86f4ea2 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/EventsByTagQuery2.scala @@ -4,7 +4,7 @@ package akka.persistence.query.scaladsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope2, Offset } import akka.stream.scaladsl.Source /** @@ -36,7 +36,7 @@ trait EventsByTagQuery2 extends ReadJournal { * Corresponding query that is completed when it reaches the end of the currently * stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]]. */ - def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] + def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index e61f6bc70d..f6df58a329 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -6,7 +6,7 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ import akka.persistence.journal.Tagged import akka.persistence.journal.WriteEventAdapter -import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence } import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.query.scaladsl.EventsByTagQuery2 import akka.stream.ActorMaterializer @@ -75,17 +75,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) greenSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) - .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple")) + .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) .expectNoMsg(500.millis) .request(2) - .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L)) blackSrc.runWith(TestSink.probe[Any]) .request(5) - .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) + .expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car")) .expectComplete() } @@ -95,8 +95,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) - .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple")) + .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) .expectNoMsg(100.millis) c ! "a green cucumber" @@ -105,7 +105,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) probe .expectNoMsg(100.millis) .request(5) - .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() // green cucumber not seen } @@ -113,9 +113,9 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) - .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) - .expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber")) .expectComplete() } } @@ -127,7 +127,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L)) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) + .expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car")) .expectNoMsg(100.millis) d ! "a black dog" @@ -136,19 +136,19 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) expectMsg(s"a black night-done") probe - .expectNext(EventEnvelope(2L, "d", 1L, "a black dog")) + .expectNext(EventEnvelope2(Sequence(2L), "d", 1L, "a black dog")) .expectNoMsg(100.millis) .request(10) - .expectNext(EventEnvelope(3L, "d", 2L, "a black night")) + .expectNext(EventEnvelope2(Sequence(3L), "d", 2L, "a black night")) } "find events from offset" in { val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) - .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) - .expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber")) .expectNoMsg(100.millis) }