parent
e29e06f850
commit
abaa8f394e
30 changed files with 105 additions and 253 deletions
|
|
@ -88,9 +88,9 @@ public class PersistenceQueryDocTest {
|
||||||
//#my-read-journal
|
//#my-read-journal
|
||||||
public class MyJavadslReadJournal implements
|
public class MyJavadslReadJournal implements
|
||||||
akka.persistence.query.javadsl.ReadJournal,
|
akka.persistence.query.javadsl.ReadJournal,
|
||||||
akka.persistence.query.javadsl.EventsByTagQuery2,
|
akka.persistence.query.javadsl.EventsByTagQuery,
|
||||||
akka.persistence.query.javadsl.EventsByPersistenceIdQuery,
|
akka.persistence.query.javadsl.EventsByPersistenceIdQuery,
|
||||||
akka.persistence.query.javadsl.AllPersistenceIdsQuery,
|
akka.persistence.query.javadsl.PersistenceIdsQuery,
|
||||||
akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
|
akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
|
||||||
|
|
||||||
private final FiniteDuration refreshInterval;
|
private final FiniteDuration refreshInterval;
|
||||||
|
|
@ -102,11 +102,11 @@ public class PersistenceQueryDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Source<EventEnvelope2, NotUsed> eventsByTag(String tag, Offset offset) {
|
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
|
||||||
if(offset instanceof Sequence){
|
if(offset instanceof Sequence){
|
||||||
Sequence sequenceOffset = (Sequence) offset;
|
Sequence sequenceOffset = (Sequence) offset;
|
||||||
final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval);
|
final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval);
|
||||||
return Source.<EventEnvelope2>actorPublisher(props).
|
return Source.<EventEnvelope>actorPublisher(props).
|
||||||
mapMaterializedValue(m -> NotUsed.getInstance());
|
mapMaterializedValue(m -> NotUsed.getInstance());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
@ -121,7 +121,7 @@ public class PersistenceQueryDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Source<String, NotUsed> allPersistenceIds() {
|
public Source<String, NotUsed> persistenceIds() {
|
||||||
// implement in a similar way as eventsByTag
|
// implement in a similar way as eventsByTag
|
||||||
throw new UnsupportedOperationException("Not implemented yet");
|
throw new UnsupportedOperationException("Not implemented yet");
|
||||||
}
|
}
|
||||||
|
|
@ -148,9 +148,9 @@ public class PersistenceQueryDocTest {
|
||||||
//#my-read-journal
|
//#my-read-journal
|
||||||
public class MyScaladslReadJournal implements
|
public class MyScaladslReadJournal implements
|
||||||
akka.persistence.query.scaladsl.ReadJournal,
|
akka.persistence.query.scaladsl.ReadJournal,
|
||||||
akka.persistence.query.scaladsl.EventsByTagQuery2,
|
akka.persistence.query.scaladsl.EventsByTagQuery,
|
||||||
akka.persistence.query.scaladsl.EventsByPersistenceIdQuery,
|
akka.persistence.query.scaladsl.EventsByPersistenceIdQuery,
|
||||||
akka.persistence.query.scaladsl.AllPersistenceIdsQuery,
|
akka.persistence.query.scaladsl.PersistenceIdsQuery,
|
||||||
akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
|
akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
|
||||||
|
|
||||||
private final MyJavadslReadJournal javadslReadJournal;
|
private final MyJavadslReadJournal javadslReadJournal;
|
||||||
|
|
@ -160,7 +160,7 @@ public class PersistenceQueryDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public akka.stream.scaladsl.Source<EventEnvelope2, NotUsed> eventsByTag(
|
public akka.stream.scaladsl.Source<EventEnvelope, NotUsed> eventsByTag(
|
||||||
String tag, akka.persistence.query.Offset offset) {
|
String tag, akka.persistence.query.Offset offset) {
|
||||||
return javadslReadJournal.eventsByTag(tag, offset).asScala();
|
return javadslReadJournal.eventsByTag(tag, offset).asScala();
|
||||||
}
|
}
|
||||||
|
|
@ -173,8 +173,8 @@ public class PersistenceQueryDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public akka.stream.scaladsl.Source<String, NotUsed> allPersistenceIds() {
|
public akka.stream.scaladsl.Source<String, NotUsed> persistenceIds() {
|
||||||
return javadslReadJournal.allPersistenceIds().asScala();
|
return javadslReadJournal.persistenceIds().asScala();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -218,7 +218,7 @@ public class PersistenceQueryDocTest {
|
||||||
"akka.persistence.query.my-read-journal");
|
"akka.persistence.query.my-read-journal");
|
||||||
|
|
||||||
//#all-persistence-ids-live
|
//#all-persistence-ids-live
|
||||||
readJournal.allPersistenceIds();
|
readJournal.persistenceIds();
|
||||||
//#all-persistence-ids-live
|
//#all-persistence-ids-live
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -256,7 +256,7 @@ public class PersistenceQueryDocTest {
|
||||||
|
|
||||||
//#events-by-tag
|
//#events-by-tag
|
||||||
// assuming journal is able to work with numeric offsets we can:
|
// assuming journal is able to work with numeric offsets we can:
|
||||||
final Source<EventEnvelope2, NotUsed> blueThings =
|
final Source<EventEnvelope, NotUsed> blueThings =
|
||||||
readJournal.eventsByTag("blue", new Sequence(0L));
|
readJournal.eventsByTag("blue", new Sequence(0L));
|
||||||
|
|
||||||
// find top 10 blue things:
|
// find top 10 blue things:
|
||||||
|
|
@ -270,7 +270,7 @@ public class PersistenceQueryDocTest {
|
||||||
}, mat);
|
}, mat);
|
||||||
|
|
||||||
// start another query, from the known offset
|
// start another query, from the known offset
|
||||||
Source<EventEnvelope2, NotUsed> blue = readJournal.eventsByTag("blue", new Sequence(10));
|
Source<EventEnvelope, NotUsed> blue = readJournal.eventsByTag("blue", new Sequence(10));
|
||||||
//#events-by-tag
|
//#events-by-tag
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,9 +12,7 @@ import akka.actor.ActorSystem;
|
||||||
import akka.persistence.journal.WriteEventAdapter;
|
import akka.persistence.journal.WriteEventAdapter;
|
||||||
import akka.persistence.journal.Tagged;
|
import akka.persistence.journal.Tagged;
|
||||||
import akka.persistence.query.EventEnvelope;
|
import akka.persistence.query.EventEnvelope;
|
||||||
import akka.persistence.query.EventEnvelope2;
|
|
||||||
import akka.persistence.query.Sequence;
|
import akka.persistence.query.Sequence;
|
||||||
import akka.persistence.query.javadsl.*;
|
|
||||||
import akka.persistence.query.PersistenceQuery;
|
import akka.persistence.query.PersistenceQuery;
|
||||||
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
|
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.stream.ActorMaterializer;
|
||||||
|
|
@ -51,7 +49,7 @@ public class LeveldbPersistenceQueryDocTest {
|
||||||
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
|
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
|
||||||
LeveldbReadJournal.Identifier());
|
LeveldbReadJournal.Identifier());
|
||||||
|
|
||||||
Source<String, NotUsed> source = queries.allPersistenceIds();
|
Source<String, NotUsed> source = queries.persistenceIds();
|
||||||
//#AllPersistenceIds
|
//#AllPersistenceIds
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -61,7 +59,7 @@ public class LeveldbPersistenceQueryDocTest {
|
||||||
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
|
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
|
||||||
LeveldbReadJournal.Identifier());
|
LeveldbReadJournal.Identifier());
|
||||||
|
|
||||||
Source<EventEnvelope2, NotUsed> source =
|
Source<EventEnvelope, NotUsed> source =
|
||||||
queries.eventsByTag("green", new Sequence(0L));
|
queries.eventsByTag("green", new Sequence(0L));
|
||||||
//#EventsByTag
|
//#EventsByTag
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package docs.persistence.query
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence }
|
import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence }
|
||||||
import akka.persistence.query.scaladsl._
|
import akka.persistence.query.scaladsl._
|
||||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||||
import akka.persistence.journal.Tagged
|
import akka.persistence.journal.Tagged
|
||||||
|
|
@ -71,7 +71,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
|
||||||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
|
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
|
||||||
LeveldbReadJournal.Identifier)
|
LeveldbReadJournal.Identifier)
|
||||||
|
|
||||||
val src: Source[String, NotUsed] = queries.allPersistenceIds()
|
val src: Source[String, NotUsed] = queries.persistenceIds()
|
||||||
//#AllPersistenceIds
|
//#AllPersistenceIds
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,7 +81,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
|
||||||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
|
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
|
||||||
LeveldbReadJournal.Identifier)
|
LeveldbReadJournal.Identifier)
|
||||||
|
|
||||||
val src: Source[EventEnvelope2, NotUsed] =
|
val src: Source[EventEnvelope, NotUsed] =
|
||||||
queries.eventsByTag(tag = "green", offset = Sequence(0L))
|
queries.eventsByTag(tag = "green", offset = Sequence(0L))
|
||||||
//#EventsByTag
|
//#EventsByTag
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package docs.persistence.query
|
||||||
|
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.persistence.PersistentRepr
|
import akka.persistence.PersistentRepr
|
||||||
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Sequence }
|
import akka.persistence.query.{ EventEnvelope, EventEnvelope, Sequence }
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import akka.stream.actor.ActorPublisher
|
import akka.stream.actor.ActorPublisher
|
||||||
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
|
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
|
||||||
|
|
@ -20,7 +20,7 @@ object MyEventsByTagPublisher {
|
||||||
|
|
||||||
//#events-by-tag-publisher
|
//#events-by-tag-publisher
|
||||||
class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration)
|
class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration)
|
||||||
extends ActorPublisher[EventEnvelope2] {
|
extends ActorPublisher[EventEnvelope] {
|
||||||
|
|
||||||
private case object Continue
|
private case object Continue
|
||||||
|
|
||||||
|
|
@ -28,7 +28,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
|
||||||
|
|
||||||
private val Limit = 1000
|
private val Limit = 1000
|
||||||
private var currentOffset = offset
|
private var currentOffset = offset
|
||||||
var buf = Vector.empty[EventEnvelope2]
|
var buf = Vector.empty[EventEnvelope]
|
||||||
|
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
val continueTask = context.system.scheduler.schedule(
|
val continueTask = context.system.scheduler.schedule(
|
||||||
|
|
@ -81,7 +81,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
|
||||||
buf = result.map {
|
buf = result.map {
|
||||||
case (id, bytes) =>
|
case (id, bytes) =>
|
||||||
val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get
|
val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get
|
||||||
EventEnvelope2(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload)
|
EventEnvelope(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
|
|
|
||||||
|
|
@ -46,23 +46,23 @@ object PersistenceQueryDocSpec {
|
||||||
|
|
||||||
class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
|
class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
|
||||||
extends akka.persistence.query.scaladsl.ReadJournal
|
extends akka.persistence.query.scaladsl.ReadJournal
|
||||||
with akka.persistence.query.scaladsl.EventsByTagQuery2
|
with akka.persistence.query.scaladsl.EventsByTagQuery
|
||||||
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
|
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
|
||||||
with akka.persistence.query.scaladsl.AllPersistenceIdsQuery
|
with akka.persistence.query.scaladsl.PersistenceIdsQuery
|
||||||
with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
|
with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
|
||||||
|
|
||||||
private val refreshInterval: FiniteDuration =
|
private val refreshInterval: FiniteDuration =
|
||||||
config.getDuration("refresh-interval", MILLISECONDS).millis
|
config.getDuration("refresh-interval", MILLISECONDS).millis
|
||||||
|
|
||||||
override def eventsByTag(
|
override def eventsByTag(
|
||||||
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match {
|
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match {
|
||||||
case Sequence(offsetValue) ⇒
|
case Sequence(offsetValue) ⇒
|
||||||
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval)
|
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval)
|
||||||
Source.actorPublisher[EventEnvelope](props)
|
Source.actorPublisher[EventEnvelope](props)
|
||||||
.mapMaterializedValue(_ => NotUsed)
|
.mapMaterializedValue(_ => NotUsed)
|
||||||
.map {
|
.map {
|
||||||
case EventEnvelope(offset, id, seqNr, event) =>
|
case EventEnvelope(offset, id, seqNr, event) =>
|
||||||
EventEnvelope2(Sequence(offset), id, seqNr, event)
|
EventEnvelope(Sequence(offset), id, seqNr, event)
|
||||||
}
|
}
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
|
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
|
||||||
|
|
@ -75,7 +75,7 @@ object PersistenceQueryDocSpec {
|
||||||
???
|
???
|
||||||
}
|
}
|
||||||
|
|
||||||
override def allPersistenceIds(): Source[String, NotUsed] = {
|
override def persistenceIds(): Source[String, NotUsed] = {
|
||||||
// implement in a similar way as eventsByTag
|
// implement in a similar way as eventsByTag
|
||||||
???
|
???
|
||||||
}
|
}
|
||||||
|
|
@ -98,13 +98,13 @@ object PersistenceQueryDocSpec {
|
||||||
|
|
||||||
class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
|
class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
|
||||||
extends akka.persistence.query.javadsl.ReadJournal
|
extends akka.persistence.query.javadsl.ReadJournal
|
||||||
with akka.persistence.query.javadsl.EventsByTagQuery2
|
with akka.persistence.query.javadsl.EventsByTagQuery
|
||||||
with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
|
with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
|
||||||
with akka.persistence.query.javadsl.AllPersistenceIdsQuery
|
with akka.persistence.query.javadsl.PersistenceIdsQuery
|
||||||
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
|
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
|
||||||
|
|
||||||
override def eventsByTag(
|
override def eventsByTag(
|
||||||
tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope2, NotUsed] =
|
tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] =
|
||||||
scaladslReadJournal.eventsByTag(tag, offset).asJava
|
scaladslReadJournal.eventsByTag(tag, offset).asJava
|
||||||
|
|
||||||
override def eventsByPersistenceId(
|
override def eventsByPersistenceId(
|
||||||
|
|
@ -113,8 +113,8 @@ object PersistenceQueryDocSpec {
|
||||||
scaladslReadJournal.eventsByPersistenceId(
|
scaladslReadJournal.eventsByPersistenceId(
|
||||||
persistenceId, fromSequenceNr, toSequenceNr).asJava
|
persistenceId, fromSequenceNr, toSequenceNr).asJava
|
||||||
|
|
||||||
override def allPersistenceIds(): javadsl.Source[String, NotUsed] =
|
override def persistenceIds(): javadsl.Source[String, NotUsed] =
|
||||||
scaladslReadJournal.allPersistenceIds().asJava
|
scaladslReadJournal.persistenceIds().asJava
|
||||||
|
|
||||||
override def currentPersistenceIds(): javadsl.Source[String, NotUsed] =
|
override def currentPersistenceIds(): javadsl.Source[String, NotUsed] =
|
||||||
scaladslReadJournal.currentPersistenceIds().asJava
|
scaladslReadJournal.currentPersistenceIds().asJava
|
||||||
|
|
@ -219,7 +219,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
||||||
//#basic-usage
|
//#basic-usage
|
||||||
|
|
||||||
//#all-persistence-ids-live
|
//#all-persistence-ids-live
|
||||||
readJournal.allPersistenceIds()
|
readJournal.persistenceIds()
|
||||||
//#all-persistence-ids-live
|
//#all-persistence-ids-live
|
||||||
|
|
||||||
//#all-persistence-ids-snap
|
//#all-persistence-ids-snap
|
||||||
|
|
@ -229,7 +229,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
||||||
//#events-by-tag
|
//#events-by-tag
|
||||||
// assuming journal is able to work with numeric offsets we can:
|
// assuming journal is able to work with numeric offsets we can:
|
||||||
|
|
||||||
val blueThings: Source[EventEnvelope2, NotUsed] =
|
val blueThings: Source[EventEnvelope, NotUsed] =
|
||||||
readJournal.eventsByTag("blue")
|
readJournal.eventsByTag("blue")
|
||||||
|
|
||||||
// find top 10 blue things:
|
// find top 10 blue things:
|
||||||
|
|
|
||||||
|
|
@ -8,17 +8,6 @@ package akka.persistence.query
|
||||||
* [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
|
* [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
|
||||||
*/
|
*/
|
||||||
final case class EventEnvelope(
|
final case class EventEnvelope(
|
||||||
offset: Long,
|
|
||||||
persistenceId: String,
|
|
||||||
sequenceNr: Long,
|
|
||||||
event: Any)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Event wrapper adding meta data for the events in the result stream of
|
|
||||||
* [[akka.persistence.query.scaladsl.EventsByTagQuery2]] query, or similar queries.
|
|
||||||
*/
|
|
||||||
// TODO: Rename it to EventEnvelope in Akka 2.5
|
|
||||||
final case class EventEnvelope2(
|
|
||||||
offset: Offset,
|
offset: Offset,
|
||||||
persistenceId: String,
|
persistenceId: String,
|
||||||
sequenceNr: Long,
|
sequenceNr: Long,
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,11 @@ object Offset {
|
||||||
|
|
||||||
trait Offset
|
trait Offset
|
||||||
|
|
||||||
final case class Sequence(val value: Long) extends Offset with Ordered[Sequence] {
|
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
|
||||||
override def compare(that: Sequence): Int = value.compare(that.value)
|
override def compare(that: Sequence): Int = value.compare(that.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class TimeBasedUUID(val value: UUID) extends Offset with Ordered[TimeBasedUUID] {
|
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
|
||||||
if (value == null || value.version != 1) {
|
if (value == null || value.version != 1) {
|
||||||
throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID")
|
throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID")
|
||||||
}
|
}
|
||||||
|
|
@ -34,4 +34,4 @@ final case object NoOffset extends Offset {
|
||||||
* Java API:
|
* Java API:
|
||||||
*/
|
*/
|
||||||
def getInstance: Offset = this
|
def getInstance: Offset = this
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,12 @@
|
||||||
package akka.persistence.query.javadsl
|
package akka.persistence.query.javadsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
import akka.persistence.query.{ EventEnvelope, Offset }
|
||||||
import akka.stream.javadsl.Source
|
import akka.stream.javadsl.Source
|
||||||
import akka.persistence.query.EventEnvelope
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this interface.
|
* A plugin may optionally support this query by implementing this interface.
|
||||||
*/
|
*/
|
||||||
@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11")
|
|
||||||
trait CurrentEventsByTagQuery extends ReadJournal {
|
trait CurrentEventsByTagQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -18,7 +17,6 @@ trait CurrentEventsByTagQuery extends ReadJournal {
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
* is completed immediately when it reaches the end of the "result set". Events that are
|
||||||
* stored after the query is completed are not included in the event stream.
|
* stored after the query is completed are not included in the event stream.
|
||||||
*/
|
*/
|
||||||
def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed]
|
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,23 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package akka.persistence.query.javadsl
|
|
||||||
|
|
||||||
import akka.NotUsed
|
|
||||||
import akka.persistence.query.{ EventEnvelope2, Offset }
|
|
||||||
import akka.stream.javadsl.Source
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A plugin may optionally support this query by implementing this interface.
|
|
||||||
*/
|
|
||||||
// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5
|
|
||||||
trait CurrentEventsByTagQuery2 extends ReadJournal {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream
|
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
|
||||||
* stored after the query is completed are not included in the event stream.
|
|
||||||
*/
|
|
||||||
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -12,7 +12,7 @@ import akka.stream.javadsl.Source
|
||||||
trait CurrentPersistenceIdsQuery extends ReadJournal {
|
trait CurrentPersistenceIdsQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream
|
* Same type of query as [[PersistenceIdsQuery#allPersistenceIds]] but the stream
|
||||||
* is completed immediately when it reaches the end of the "result set". Persistent
|
* is completed immediately when it reaches the end of the "result set". Persistent
|
||||||
* actors that are created after the query is completed are not included in the stream.
|
* actors that are created after the query is completed are not included in the stream.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,12 @@
|
||||||
package akka.persistence.query.javadsl
|
package akka.persistence.query.javadsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
import akka.persistence.query.{ EventEnvelope, Offset }
|
||||||
import akka.stream.javadsl.Source
|
import akka.stream.javadsl.Source
|
||||||
import akka.persistence.query.EventEnvelope
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this interface.
|
* A plugin may optionally support this query by implementing this interface.
|
||||||
*/
|
*/
|
||||||
@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11")
|
|
||||||
trait EventsByTagQuery extends ReadJournal {
|
trait EventsByTagQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -36,6 +35,6 @@ trait EventsByTagQuery extends ReadJournal {
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
* Corresponding query that is completed when it reaches the end of the currently
|
||||||
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
||||||
*/
|
*/
|
||||||
def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed]
|
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,41 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package akka.persistence.query.javadsl
|
|
||||||
|
|
||||||
import akka.NotUsed
|
|
||||||
import akka.persistence.query.{ EventEnvelope2, Offset }
|
|
||||||
import akka.stream.javadsl.Source
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A plugin may optionally support this query by implementing this interface.
|
|
||||||
*/
|
|
||||||
// TODO: Rename it to EventsByTagQuery in Akka 2.5
|
|
||||||
trait EventsByTagQuery2 extends ReadJournal {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Query events that have a specific tag. A tag can for example correspond to an
|
|
||||||
* aggregate root type (in DDD terminology).
|
|
||||||
*
|
|
||||||
* The consumer can keep track of its current position in the event stream by storing the
|
|
||||||
* `offset` and restart the query from a given `offset` after a crash/restart.
|
|
||||||
*
|
|
||||||
* The exact meaning of the `offset` depends on the journal and must be documented by the
|
|
||||||
* read journal plugin. It may be a sequential id number that uniquely identifies the
|
|
||||||
* position of each event within the event stream. Distributed data stores cannot easily
|
|
||||||
* support those semantics and they may use a weaker meaning. For example it may be a
|
|
||||||
* timestamp (taken when the event was created or stored). Timestamps are not unique and
|
|
||||||
* not strictly ordered, since clocks on different machines may not be synchronized.
|
|
||||||
*
|
|
||||||
* The returned event stream should be ordered by `offset` if possible, but this can also be
|
|
||||||
* difficult to fulfill for a distributed data store. The order must be documented by the
|
|
||||||
* read journal plugin.
|
|
||||||
*
|
|
||||||
* The stream is not completed when it reaches the end of the currently stored events,
|
|
||||||
* but it continues to push new events when new events are persisted.
|
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
|
||||||
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
|
||||||
*/
|
|
||||||
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.stream.javadsl.Source
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this interface.
|
* A plugin may optionally support this query by implementing this interface.
|
||||||
*/
|
*/
|
||||||
trait AllPersistenceIdsQuery extends ReadJournal {
|
trait PersistenceIdsQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query all `PersistentActor` identifiers, i.e. as defined by the
|
* Query all `PersistentActor` identifiers, i.e. as defined by the
|
||||||
|
|
@ -20,6 +20,6 @@ trait AllPersistenceIdsQuery extends ReadJournal {
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
* Corresponding query that is completed when it reaches the end of the currently
|
||||||
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
|
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
|
||||||
*/
|
*/
|
||||||
def allPersistenceIds(): Source[String, NotUsed]
|
def persistenceIds(): Source[String, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -12,7 +12,7 @@ package akka.persistence.query.javadsl
|
||||||
* The interface is very open so that different journals may implement specific queries.
|
* The interface is very open so that different journals may implement specific queries.
|
||||||
*
|
*
|
||||||
* There are a few pre-defined queries that a query implementation may implement,
|
* There are a few pre-defined queries that a query implementation may implement,
|
||||||
* such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]]
|
* such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]]
|
||||||
* Implementation of these queries are optional and query (journal) plugins may define
|
* Implementation of these queries are optional and query (journal) plugins may define
|
||||||
* their own specialized queries by implementing other methods.
|
* their own specialized queries by implementing other methods.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
package akka.persistence.query.journal.leveldb.javadsl
|
package akka.persistence.query.journal.leveldb.javadsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset }
|
import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset }
|
||||||
import akka.persistence.query.javadsl._
|
import akka.persistence.query.javadsl._
|
||||||
import akka.stream.javadsl.Source
|
import akka.stream.javadsl.Source
|
||||||
|
|
||||||
|
|
@ -26,14 +26,14 @@ import akka.stream.javadsl.Source
|
||||||
*/
|
*/
|
||||||
class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal)
|
class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal)
|
||||||
extends ReadJournal
|
extends ReadJournal
|
||||||
with AllPersistenceIdsQuery
|
with PersistenceIdsQuery
|
||||||
with CurrentPersistenceIdsQuery
|
with CurrentPersistenceIdsQuery
|
||||||
with EventsByPersistenceIdQuery
|
with EventsByPersistenceIdQuery
|
||||||
with CurrentEventsByPersistenceIdQuery
|
with CurrentEventsByPersistenceIdQuery
|
||||||
with EventsByTagQuery
|
with EventsByTagQuery
|
||||||
with EventsByTagQuery2
|
with EventsByTagQuery
|
||||||
with CurrentEventsByTagQuery
|
with CurrentEventsByTagQuery
|
||||||
with CurrentEventsByTagQuery2 {
|
with CurrentEventsByTagQuery {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* `allPersistenceIds` is used for retrieving all `persistenceIds` of all
|
* `allPersistenceIds` is used for retrieving all `persistenceIds` of all
|
||||||
|
|
@ -53,8 +53,8 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
|
||||||
* The stream is completed with failure if there is a failure in executing the query in the
|
* The stream is completed with failure if there is a failure in executing the query in the
|
||||||
* backend journal.
|
* backend journal.
|
||||||
*/
|
*/
|
||||||
override def allPersistenceIds(): Source[String, NotUsed] =
|
override def persistenceIds(): Source[String, NotUsed] =
|
||||||
scaladslReadJournal.allPersistenceIds().asJava
|
scaladslReadJournal.persistenceIds().asJava
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same type of query as [[#allPersistenceIds]] but the stream
|
* Same type of query as [[#allPersistenceIds]] but the stream
|
||||||
|
|
@ -138,7 +138,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
|
||||||
* The stream is completed with failure if there is a failure in executing the query in the
|
* The stream is completed with failure if there is a failure in executing the query in the
|
||||||
* backend journal.
|
* backend journal.
|
||||||
*/
|
*/
|
||||||
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] =
|
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
|
||||||
scaladslReadJournal.eventsByTag(tag, offset).asJava
|
scaladslReadJournal.eventsByTag(tag, offset).asJava
|
||||||
|
|
||||||
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
|
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
|
||||||
|
|
@ -149,7 +149,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
* is completed immediately when it reaches the end of the "result set". Events that are
|
||||||
* stored after the query is completed are not included in the event stream.
|
* stored after the query is completed are not included in the event stream.
|
||||||
*/
|
*/
|
||||||
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] =
|
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
|
||||||
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
|
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
|
||||||
|
|
||||||
override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
|
override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.NotUsed
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset, Sequence }
|
import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset, Sequence }
|
||||||
import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher
|
import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher
|
||||||
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher
|
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher
|
||||||
import akka.persistence.query.journal.leveldb.EventsByTagPublisher
|
import akka.persistence.query.journal.leveldb.EventsByTagPublisher
|
||||||
|
|
@ -35,14 +35,14 @@ import com.typesafe.config.Config
|
||||||
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
|
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
|
||||||
*/
|
*/
|
||||||
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal
|
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal
|
||||||
with AllPersistenceIdsQuery
|
with PersistenceIdsQuery
|
||||||
with CurrentPersistenceIdsQuery
|
with CurrentPersistenceIdsQuery
|
||||||
with EventsByPersistenceIdQuery
|
with EventsByPersistenceIdQuery
|
||||||
with CurrentEventsByPersistenceIdQuery
|
with CurrentEventsByPersistenceIdQuery
|
||||||
with EventsByTagQuery
|
with EventsByTagQuery
|
||||||
with EventsByTagQuery2
|
with EventsByTagQuery
|
||||||
with CurrentEventsByTagQuery
|
with CurrentEventsByTagQuery
|
||||||
with CurrentEventsByTagQuery2 {
|
with CurrentEventsByTagQuery {
|
||||||
|
|
||||||
private val serialization = SerializationExtension(system)
|
private val serialization = SerializationExtension(system)
|
||||||
private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
|
private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
|
||||||
|
|
@ -51,7 +51,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
|
||||||
|
|
||||||
private val envelopetoEnvelope2 = Flow[EventEnvelope].map {
|
private val envelopetoEnvelope2 = Flow[EventEnvelope].map {
|
||||||
case EventEnvelope(offset, persistenceId, sequenceNr, event) ⇒
|
case EventEnvelope(offset, persistenceId, sequenceNr, event) ⇒
|
||||||
EventEnvelope2(Sequence(offset), persistenceId, sequenceNr, event)
|
EventEnvelope(Sequence(offset), persistenceId, sequenceNr, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -72,7 +72,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
|
||||||
* The stream is completed with failure if there is a failure in executing the query in the
|
* The stream is completed with failure if there is a failure in executing the query in the
|
||||||
* backend journal.
|
* backend journal.
|
||||||
*/
|
*/
|
||||||
override def allPersistenceIds(): Source[String, NotUsed] = {
|
override def persistenceIds(): Source[String, NotUsed] = {
|
||||||
// no polling for this query, the write journal will push all changes, i.e.
|
// no polling for this query, the write journal will push all changes, i.e.
|
||||||
// no refreshInterval
|
// no refreshInterval
|
||||||
Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId))
|
Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId))
|
||||||
|
|
@ -171,7 +171,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
|
||||||
* The stream is completed with failure if there is a failure in executing the query in the
|
* The stream is completed with failure if there is a failure in executing the query in the
|
||||||
* backend journal.
|
* backend journal.
|
||||||
*/
|
*/
|
||||||
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] =
|
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
|
||||||
offset match {
|
offset match {
|
||||||
case Sequence(offsetValue) ⇒
|
case Sequence(offsetValue) ⇒
|
||||||
eventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
|
eventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
|
||||||
|
|
@ -191,7 +191,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
* is completed immediately when it reaches the end of the "result set". Events that are
|
||||||
* stored after the query is completed are not included in the event stream.
|
* stored after the query is completed are not included in the event stream.
|
||||||
*/
|
*/
|
||||||
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] =
|
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
|
||||||
offset match {
|
offset match {
|
||||||
case Sequence(offsetValue) ⇒
|
case Sequence(offsetValue) ⇒
|
||||||
currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
|
currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,12 @@
|
||||||
package akka.persistence.query.scaladsl
|
package akka.persistence.query.scaladsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
import akka.persistence.query.{ EventEnvelope, Offset }
|
||||||
import akka.stream.scaladsl.Source
|
import akka.stream.scaladsl.Source
|
||||||
import akka.persistence.query.EventEnvelope
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this trait.
|
* A plugin may optionally support this query by implementing this trait.
|
||||||
*/
|
*/
|
||||||
@deprecated("To be replaced by CurrentEventsByTagQuery2 from Akka 2.5", "2.4.11")
|
|
||||||
trait CurrentEventsByTagQuery extends ReadJournal {
|
trait CurrentEventsByTagQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -18,7 +17,7 @@ trait CurrentEventsByTagQuery extends ReadJournal {
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
* is completed immediately when it reaches the end of the "result set". Events that are
|
||||||
* stored after the query is completed are not included in the event stream.
|
* stored after the query is completed are not included in the event stream.
|
||||||
*/
|
*/
|
||||||
def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed]
|
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package akka.persistence.query.scaladsl
|
|
||||||
|
|
||||||
import akka.NotUsed
|
|
||||||
import akka.persistence.query.{ EventEnvelope2, Offset }
|
|
||||||
import akka.stream.scaladsl.Source
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A plugin may optionally support this query by implementing this trait.
|
|
||||||
*/
|
|
||||||
// TODO: Rename it to CurrentEventsByTagQuery in Akka 2.5
|
|
||||||
trait CurrentEventsByTagQuery2 extends ReadJournal {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream
|
|
||||||
* is completed immediately when it reaches the end of the "result set". Events that are
|
|
||||||
* stored after the query is completed are not included in the event stream.
|
|
||||||
*/
|
|
||||||
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -12,7 +12,7 @@ import akka.stream.scaladsl.Source
|
||||||
trait CurrentPersistenceIdsQuery extends ReadJournal {
|
trait CurrentPersistenceIdsQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same type of query as [[AllPersistenceIdsQuery#allPersistenceIds]] but the stream
|
* Same type of query as [[PersistenceIdsQuery#allPersistenceIds]] but the stream
|
||||||
* is completed immediately when it reaches the end of the "result set". Persistent
|
* is completed immediately when it reaches the end of the "result set". Persistent
|
||||||
* actors that are created after the query is completed are not included in the stream.
|
* actors that are created after the query is completed are not included in the stream.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,12 @@
|
||||||
package akka.persistence.query.scaladsl
|
package akka.persistence.query.scaladsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
import akka.persistence.query.{ EventEnvelope, Offset }
|
||||||
import akka.stream.scaladsl.Source
|
import akka.stream.scaladsl.Source
|
||||||
import akka.persistence.query.EventEnvelope
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this trait.
|
* A plugin may optionally support this query by implementing this trait.
|
||||||
*/
|
*/
|
||||||
@deprecated("To be replaced by EventsByTagQuery2 from Akka 2.5", "2.4.11")
|
|
||||||
trait EventsByTagQuery extends ReadJournal {
|
trait EventsByTagQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -36,7 +35,7 @@ trait EventsByTagQuery extends ReadJournal {
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
* Corresponding query that is completed when it reaches the end of the currently
|
||||||
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
||||||
*/
|
*/
|
||||||
def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed]
|
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,42 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package akka.persistence.query.scaladsl
|
|
||||||
|
|
||||||
import akka.NotUsed
|
|
||||||
import akka.persistence.query.{ EventEnvelope2, Offset }
|
|
||||||
import akka.stream.scaladsl.Source
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A plugin may optionally support this query by implementing this trait.
|
|
||||||
*/
|
|
||||||
// TODO: Rename it to EventsByTagQuery in Akka 2.5
|
|
||||||
trait EventsByTagQuery2 extends ReadJournal {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Query events that have a specific tag. A tag can for example correspond to an
|
|
||||||
* aggregate root type (in DDD terminology).
|
|
||||||
*
|
|
||||||
* The consumer can keep track of its current position in the event stream by storing the
|
|
||||||
* `offset` and restart the query from a given `offset` after a crash/restart.
|
|
||||||
*
|
|
||||||
* The exact meaning of the `offset` depends on the journal and must be documented by the
|
|
||||||
* read journal plugin. It may be a sequential id number that uniquely identifies the
|
|
||||||
* position of each event within the event stream. Distributed data stores cannot easily
|
|
||||||
* support those semantics and they may use a weaker meaning. For example it may be a
|
|
||||||
* timestamp (taken when the event was created or stored). Timestamps are not unique and
|
|
||||||
* not strictly ordered, since clocks on different machines may not be synchronized.
|
|
||||||
*
|
|
||||||
* The returned event stream should be ordered by `offset` if possible, but this can also be
|
|
||||||
* difficult to fulfill for a distributed data store. The order must be documented by the
|
|
||||||
* read journal plugin.
|
|
||||||
*
|
|
||||||
* The stream is not completed when it reaches the end of the currently stored events,
|
|
||||||
* but it continues to push new events when new events are persisted.
|
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
|
||||||
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
|
|
||||||
*/
|
|
||||||
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.stream.scaladsl.Source
|
||||||
/**
|
/**
|
||||||
* A plugin may optionally support this query by implementing this trait.
|
* A plugin may optionally support this query by implementing this trait.
|
||||||
*/
|
*/
|
||||||
trait AllPersistenceIdsQuery extends ReadJournal {
|
trait PersistenceIdsQuery extends ReadJournal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query all `PersistentActor` identifiers, i.e. as defined by the
|
* Query all `PersistentActor` identifiers, i.e. as defined by the
|
||||||
|
|
@ -20,6 +20,6 @@ trait AllPersistenceIdsQuery extends ReadJournal {
|
||||||
* Corresponding query that is completed when it reaches the end of the currently
|
* Corresponding query that is completed when it reaches the end of the currently
|
||||||
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
|
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
|
||||||
*/
|
*/
|
||||||
def allPersistenceIds(): Source[String, NotUsed]
|
def persistenceIds(): Source[String, NotUsed]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -12,7 +12,7 @@ package akka.persistence.query.scaladsl
|
||||||
* The interface is very open so that different journals may implement specific queries.
|
* The interface is very open so that different journals may implement specific queries.
|
||||||
*
|
*
|
||||||
* There are a few pre-defined queries that a query implementation may implement,
|
* There are a few pre-defined queries that a query implementation may implement,
|
||||||
* such as [[EventsByPersistenceIdQuery]], [[AllPersistenceIdsQuery]] and [[EventsByTagQuery]]
|
* such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]]
|
||||||
* Implementation of these queries are optional and query (journal) plugins may define
|
* Implementation of these queries are optional and query (journal) plugins may define
|
||||||
* their own specialized queries by implementing other methods.
|
* their own specialized queries by implementing other methods.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ package akka.persistence.query;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.persistence.query.javadsl.AllPersistenceIdsQuery;
|
import akka.persistence.query.javadsl.PersistenceIdsQuery;
|
||||||
import akka.persistence.query.javadsl.ReadJournal;
|
import akka.persistence.query.javadsl.ReadJournal;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
|
@ -15,12 +15,12 @@ import akka.stream.javadsl.Source;
|
||||||
* Use for tests only!
|
* Use for tests only!
|
||||||
* Emits infinite stream of strings (representing queried for events).
|
* Emits infinite stream of strings (representing queried for events).
|
||||||
*/
|
*/
|
||||||
public class DummyJavaReadJournal implements ReadJournal, AllPersistenceIdsQuery {
|
public class DummyJavaReadJournal implements ReadJournal, PersistenceIdsQuery {
|
||||||
public static final String Identifier = "akka.persistence.query.journal.dummy-java";
|
public static final String Identifier = "akka.persistence.query.journal.dummy-java";
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Source<String, NotUsed> allPersistenceIds() {
|
public Source<String, NotUsed> persistenceIds() {
|
||||||
return Source.fromIterator(() -> new Iterator<String>() {
|
return Source.fromIterator(() -> new Iterator<String>() {
|
||||||
private int i = 0;
|
private int i = 0;
|
||||||
@Override public boolean hasNext() { return true; }
|
@Override public boolean hasNext() { return true; }
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import akka.NotUsed;
|
||||||
* Emits infinite stream of strings (representing queried for events).
|
* Emits infinite stream of strings (representing queried for events).
|
||||||
*/
|
*/
|
||||||
public class DummyJavaReadJournalForScala implements akka.persistence.query.scaladsl.ReadJournal,
|
public class DummyJavaReadJournalForScala implements akka.persistence.query.scaladsl.ReadJournal,
|
||||||
akka.persistence.query.scaladsl.AllPersistenceIdsQuery {
|
akka.persistence.query.scaladsl.PersistenceIdsQuery {
|
||||||
|
|
||||||
public static final String Identifier = DummyJavaReadJournal.Identifier;
|
public static final String Identifier = DummyJavaReadJournal.Identifier;
|
||||||
|
|
||||||
|
|
@ -22,8 +22,8 @@ public class DummyJavaReadJournalForScala implements akka.persistence.query.scal
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public akka.stream.scaladsl.Source<String, NotUsed> allPersistenceIds() {
|
public akka.stream.scaladsl.Source<String, NotUsed> persistenceIds() {
|
||||||
return readJournal.allPersistenceIds().asScala();
|
return readJournal.persistenceIds().asScala();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,6 @@ public class PersistenceQueryTest {
|
||||||
public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception {
|
public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception {
|
||||||
final DummyJavaReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class,
|
final DummyJavaReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class,
|
||||||
"noop-journal");
|
"noop-journal");
|
||||||
final akka.stream.javadsl.Source<String, NotUsed> ids = readJournal.allPersistenceIds();
|
final akka.stream.javadsl.Source<String, NotUsed> ids = readJournal.persistenceIds();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,8 @@ import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
* Use for tests only!
|
* Use for tests only!
|
||||||
* Emits infinite stream of strings (representing queried for events).
|
* Emits infinite stream of strings (representing queried for events).
|
||||||
*/
|
*/
|
||||||
class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.AllPersistenceIdsQuery {
|
class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery {
|
||||||
override def allPersistenceIds(): Source[String, NotUsed] =
|
override def persistenceIds(): Source[String, NotUsed] =
|
||||||
Source.fromIterator(() ⇒ Iterator.from(0)).map(_.toString)
|
Source.fromIterator(() ⇒ Iterator.from(0)).map(_.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -21,9 +21,9 @@ object DummyReadJournal {
|
||||||
final val Identifier = "akka.persistence.query.journal.dummy"
|
final val Identifier = "akka.persistence.query.journal.dummy"
|
||||||
}
|
}
|
||||||
|
|
||||||
class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.ReadJournal with javadsl.AllPersistenceIdsQuery {
|
class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.ReadJournal with javadsl.PersistenceIdsQuery {
|
||||||
override def allPersistenceIds(): akka.stream.javadsl.Source[String, NotUsed] =
|
override def persistenceIds(): akka.stream.javadsl.Source[String, NotUsed] =
|
||||||
readJournal.allPersistenceIds().asJava
|
readJournal.persistenceIds().asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
object DummyReadJournalProvider {
|
object DummyReadJournalProvider {
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.persistence.query.PersistenceQuery
|
import akka.persistence.query.PersistenceQuery
|
||||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||||
import akka.persistence.query.scaladsl.AllPersistenceIdsQuery
|
import akka.persistence.query.scaladsl.PersistenceIdsQuery
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
@ -32,7 +32,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config)
|
||||||
"Leveldb query AllPersistenceIds" must {
|
"Leveldb query AllPersistenceIds" must {
|
||||||
|
|
||||||
"implement standard AllPersistenceIdsQuery" in {
|
"implement standard AllPersistenceIdsQuery" in {
|
||||||
queries.isInstanceOf[AllPersistenceIdsQuery] should ===(true)
|
queries.isInstanceOf[PersistenceIdsQuery] should ===(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"find existing persistenceIds" in {
|
"find existing persistenceIds" in {
|
||||||
|
|
@ -57,7 +57,7 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config)
|
||||||
system.actorOf(TestActor.props("d")) ! "d1"
|
system.actorOf(TestActor.props("d")) ! "d1"
|
||||||
expectMsg("d1-done")
|
expectMsg("d1-done")
|
||||||
|
|
||||||
val src = queries.allPersistenceIds()
|
val src = queries.persistenceIds()
|
||||||
val probe = src.runWith(TestSink.probe[String])
|
val probe = src.runWith(TestSink.probe[String])
|
||||||
probe.within(10.seconds) {
|
probe.within(10.seconds) {
|
||||||
probe.request(5)
|
probe.request(5)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import scala.concurrent.duration._
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.persistence.query.PersistenceQuery
|
import akka.persistence.query.PersistenceQuery
|
||||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||||
import akka.persistence.query.scaladsl.EventsByTagQuery2
|
import akka.persistence.query.scaladsl.EventsByTagQuery
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
@ -49,7 +49,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
|
||||||
"Leveldb query EventsByPersistenceId" must {
|
"Leveldb query EventsByPersistenceId" must {
|
||||||
|
|
||||||
"implement standard EventsByTagQuery" in {
|
"implement standard EventsByTagQuery" in {
|
||||||
queries.isInstanceOf[EventsByTagQuery2] should ===(true)
|
queries.isInstanceOf[EventsByTagQuery] should ===(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"find existing events" in {
|
"find existing events" in {
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,9 @@ package akka.persistence.query.journal.leveldb
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.persistence.journal.Tagged
|
import akka.persistence.journal.Tagged
|
||||||
import akka.persistence.journal.WriteEventAdapter
|
import akka.persistence.journal.WriteEventAdapter
|
||||||
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence }
|
import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence }
|
||||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||||
import akka.persistence.query.scaladsl.EventsByTagQuery2
|
import akka.persistence.query.scaladsl.EventsByTagQuery
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
@ -55,7 +55,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
|
|
||||||
"Leveldb query EventsByTag" must {
|
"Leveldb query EventsByTag" must {
|
||||||
"implement standard EventsByTagQuery" in {
|
"implement standard EventsByTagQuery" in {
|
||||||
queries.isInstanceOf[EventsByTagQuery2] should ===(true)
|
queries.isInstanceOf[EventsByTagQuery] should ===(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"find existing events" in {
|
"find existing events" in {
|
||||||
|
|
@ -75,17 +75,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
|
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
|
||||||
greenSrc.runWith(TestSink.probe[Any])
|
greenSrc.runWith(TestSink.probe[Any])
|
||||||
.request(2)
|
.request(2)
|
||||||
.expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple"))
|
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple"))
|
||||||
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
|
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
|
||||||
.expectNoMsg(500.millis)
|
.expectNoMsg(500.millis)
|
||||||
.request(2)
|
.request(2)
|
||||||
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
|
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
|
|
||||||
val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L))
|
val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L))
|
||||||
blackSrc.runWith(TestSink.probe[Any])
|
blackSrc.runWith(TestSink.probe[Any])
|
||||||
.request(5)
|
.request(5)
|
||||||
.expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car"))
|
.expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car"))
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -95,8 +95,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
|
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
|
||||||
val probe = greenSrc.runWith(TestSink.probe[Any])
|
val probe = greenSrc.runWith(TestSink.probe[Any])
|
||||||
.request(2)
|
.request(2)
|
||||||
.expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple"))
|
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple"))
|
||||||
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
|
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
|
||||||
.expectNoMsg(100.millis)
|
.expectNoMsg(100.millis)
|
||||||
|
|
||||||
c ! "a green cucumber"
|
c ! "a green cucumber"
|
||||||
|
|
@ -105,7 +105,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
probe
|
probe
|
||||||
.expectNoMsg(100.millis)
|
.expectNoMsg(100.millis)
|
||||||
.request(5)
|
.request(5)
|
||||||
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
|
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
|
||||||
.expectComplete() // green cucumber not seen
|
.expectComplete() // green cucumber not seen
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -113,9 +113,9 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L))
|
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L))
|
||||||
val probe = greenSrc.runWith(TestSink.probe[Any])
|
val probe = greenSrc.runWith(TestSink.probe[Any])
|
||||||
.request(10)
|
.request(10)
|
||||||
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
|
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
|
||||||
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
|
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
|
||||||
.expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber"))
|
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber"))
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -127,7 +127,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L))
|
val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L))
|
||||||
val probe = blackSrc.runWith(TestSink.probe[Any])
|
val probe = blackSrc.runWith(TestSink.probe[Any])
|
||||||
.request(2)
|
.request(2)
|
||||||
.expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car"))
|
.expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car"))
|
||||||
.expectNoMsg(100.millis)
|
.expectNoMsg(100.millis)
|
||||||
|
|
||||||
d ! "a black dog"
|
d ! "a black dog"
|
||||||
|
|
@ -136,19 +136,19 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
|
||||||
expectMsg(s"a black night-done")
|
expectMsg(s"a black night-done")
|
||||||
|
|
||||||
probe
|
probe
|
||||||
.expectNext(EventEnvelope2(Sequence(2L), "d", 1L, "a black dog"))
|
.expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog"))
|
||||||
.expectNoMsg(100.millis)
|
.expectNoMsg(100.millis)
|
||||||
.request(10)
|
.request(10)
|
||||||
.expectNext(EventEnvelope2(Sequence(3L), "d", 2L, "a black night"))
|
.expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night"))
|
||||||
}
|
}
|
||||||
|
|
||||||
"find events from offset" in {
|
"find events from offset" in {
|
||||||
val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L))
|
val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L))
|
||||||
val probe = greenSrc.runWith(TestSink.probe[Any])
|
val probe = greenSrc.runWith(TestSink.probe[Any])
|
||||||
.request(10)
|
.request(10)
|
||||||
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
|
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
|
||||||
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
|
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
|
||||||
.expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber"))
|
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber"))
|
||||||
.expectNoMsg(100.millis)
|
.expectNoMsg(100.millis)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue