New EventEnvelope including Offset rather than Long #21722

This commit is contained in:
Johan Andrén 2016-10-27 15:32:10 +02:00 committed by GitHub
parent 3951cf4e68
commit 33ece118a8
13 changed files with 75 additions and 58 deletions

View file

@ -102,11 +102,11 @@ public class PersistenceQueryDocTest {
}
@Override
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
public Source<EventEnvelope2, NotUsed> eventsByTag(String tag, Offset offset) {
if(offset instanceof Sequence){
Sequence sequenceOffset = (Sequence) offset;
final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval);
return Source.<EventEnvelope>actorPublisher(props).
return Source.<EventEnvelope2>actorPublisher(props).
mapMaterializedValue(m -> NotUsed.getInstance());
}
else
@ -160,7 +160,7 @@ public class PersistenceQueryDocTest {
}
@Override
public akka.stream.scaladsl.Source<EventEnvelope, NotUsed> eventsByTag(
public akka.stream.scaladsl.Source<EventEnvelope2, NotUsed> eventsByTag(
String tag, akka.persistence.query.Offset offset) {
return javadslReadJournal.eventsByTag(tag, offset).asScala();
}
@ -256,7 +256,7 @@ public class PersistenceQueryDocTest {
//#events-by-tag
// assuming journal is able to work with numeric offsets we can:
final Source<EventEnvelope, NotUsed> blueThings =
final Source<EventEnvelope2, NotUsed> blueThings =
readJournal.eventsByTag("blue", new Sequence(0L));
// find top 10 blue things:
@ -270,7 +270,7 @@ public class PersistenceQueryDocTest {
}, mat);
// start another query, from the known offset
Source<EventEnvelope, NotUsed> blue = readJournal.eventsByTag("blue", new Sequence(10));
Source<EventEnvelope2, NotUsed> blue = readJournal.eventsByTag("blue", new Sequence(10));
//#events-by-tag
}
@ -371,7 +371,7 @@ public class PersistenceQueryDocTest {
this.name = name;
}
public CompletionStage<Long> saveProgress(long offset) {
public CompletionStage<Long> saveProgress(Offset offset) {
// ...
//#projection-into-different-store
return null;

View file

@ -12,6 +12,7 @@ import akka.actor.ActorSystem;
import akka.persistence.journal.WriteEventAdapter;
import akka.persistence.journal.Tagged;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.EventEnvelope2;
import akka.persistence.query.Sequence;
import akka.persistence.query.javadsl.*;
import akka.persistence.query.PersistenceQuery;
@ -60,7 +61,7 @@ public class LeveldbPersistenceQueryDocTest {
PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
LeveldbReadJournal.Identifier());
Source<EventEnvelope, NotUsed> source =
Source<EventEnvelope2, NotUsed> source =
queries.eventsByTag("green", new Sequence(0L));
//#EventsByTag
}

View file

@ -6,7 +6,7 @@ package docs.persistence.query
import akka.NotUsed
import akka.persistence.journal.{ EventAdapter, EventSeq }
import akka.testkit.AkkaSpec
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence }
import akka.persistence.query.scaladsl._
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.journal.Tagged
@ -81,7 +81,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] =
val src: Source[EventEnvelope2, NotUsed] =
queries.eventsByTag(tag = "green", offset = Sequence(0L))
//#EventsByTag
}

View file

@ -6,7 +6,7 @@ package docs.persistence.query
import akka.actor.Props
import akka.persistence.PersistentRepr
import akka.persistence.query.EventEnvelope
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Sequence }
import akka.serialization.SerializationExtension
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
@ -20,7 +20,7 @@ object MyEventsByTagPublisher {
//#events-by-tag-publisher
class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration)
extends ActorPublisher[EventEnvelope] {
extends ActorPublisher[EventEnvelope2] {
private case object Continue
@ -28,7 +28,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
private val Limit = 1000
private var currentOffset = offset
var buf = Vector.empty[EventEnvelope]
var buf = Vector.empty[EventEnvelope2]
import context.dispatcher
val continueTask = context.system.scheduler.schedule(
@ -81,7 +81,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
buf = result.map {
case (id, bytes) =>
val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get
EventEnvelope(offset = id, p.persistenceId, p.sequenceNr, p.payload)
EventEnvelope2(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload)
}
} catch {
case e: Exception =>

View file

@ -55,11 +55,15 @@ object PersistenceQueryDocSpec {
config.getDuration("refresh-interval", MILLISECONDS).millis
override def eventsByTag(
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match {
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] = offset match {
case Sequence(offsetValue)
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval)
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ => NotUsed)
.map {
case EventEnvelope(offset, id, seqNr, event) =>
EventEnvelope2(Sequence(offset), id, seqNr, event)
}
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}
@ -100,7 +104,7 @@ object PersistenceQueryDocSpec {
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
override def eventsByTag(
tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] =
tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope2, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByPersistenceId(
@ -225,7 +229,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
//#events-by-tag
// assuming journal is able to work with numeric offsets we can:
val blueThings: Source[EventEnvelope, NotUsed] =
val blueThings: Source[EventEnvelope2, NotUsed] =
readJournal.eventsByTag("blue")
// find top 10 blue things:
@ -262,7 +266,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
//#projection-into-different-store
class MyResumableProjection(name: String) {
def saveProgress(offset: Long): Future[Long] = ???
def saveProgress(offset: Offset): Future[Long] = ???
def latestOffset: Future[Long] = ???
}
//#projection-into-different-store

View file

@ -12,3 +12,14 @@ final case class EventEnvelope(
persistenceId: String,
sequenceNr: Long,
event: Any)
/**
* Event wrapper adding meta data for the events in the result stream of
* [[akka.persistence.query.scaladsl.EventsByTagQuery2]] query, or similar queries.
*/
// TODO: Rename it to EventEnvelope in Akka 2.5
final case class EventEnvelope2(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any)

View file

@ -4,7 +4,7 @@
package akka.persistence.query.javadsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope2, Offset }
import akka.stream.javadsl.Source
/**
@ -18,7 +18,6 @@ trait CurrentEventsByTagQuery2 extends ReadJournal {
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
}

View file

@ -4,7 +4,7 @@
package akka.persistence.query.javadsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope2, Offset }
import akka.stream.javadsl.Source
/**
@ -36,6 +36,6 @@ trait EventsByTagQuery2 extends ReadJournal {
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
}

View file

@ -4,8 +4,7 @@
package akka.persistence.query.journal.leveldb.javadsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset }
import akka.persistence.query.javadsl._
import akka.stream.javadsl.Source
@ -139,7 +138,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
@ -150,7 +149,7 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed] =
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =

View file

@ -9,14 +9,14 @@ import akka.NotUsed
import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ EventEnvelope, Offset, Sequence }
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, Offset, Sequence }
import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher
import akka.persistence.query.journal.leveldb.EventsByTagPublisher
import akka.persistence.query.scaladsl._
import akka.persistence.query.scaladsl.ReadJournal
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Flow, Source }
import akka.util.ByteString
import com.typesafe.config.Config
@ -49,6 +49,11 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size")
private val envelopetoEnvelope2 = Flow[EventEnvelope].map {
case EventEnvelope(offset, persistenceId, sequenceNr, event)
EventEnvelope2(Sequence(offset), persistenceId, sequenceNr, event)
}
/**
* `allPersistenceIds` is used for retrieving all `persistenceIds` of all
* persistent actors.
@ -166,19 +171,18 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] =
offset match {
case Sequence(offsetValue)
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue,
refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
eventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = {
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue,
refreshInterval, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
refreshInterval, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
}
@ -187,12 +191,11 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope2, NotUsed] =
offset match {
case Sequence(offsetValue)
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offsetValue, Long.MaxValue,
None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}

View file

@ -4,7 +4,7 @@
package akka.persistence.query.scaladsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope2, Offset }
import akka.stream.scaladsl.Source
/**
@ -18,7 +18,7 @@ trait CurrentEventsByTagQuery2 extends ReadJournal {
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
}

View file

@ -4,7 +4,7 @@
package akka.persistence.query.scaladsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope2, Offset }
import akka.stream.scaladsl.Source
/**
@ -36,7 +36,7 @@ trait EventsByTagQuery2 extends ReadJournal {
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope2, NotUsed]
}

View file

@ -6,7 +6,7 @@ package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.persistence.journal.Tagged
import akka.persistence.journal.WriteEventAdapter
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.{ EventEnvelope, EventEnvelope2, PersistenceQuery, Sequence }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.scaladsl.EventsByTagQuery2
import akka.stream.ActorMaterializer
@ -75,17 +75,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
greenSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "a", 2L, "a green apple"))
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple"))
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
.expectNoMsg(500.millis)
.request(2)
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
.expectComplete()
val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L))
blackSrc.runWith(TestSink.probe[Any])
.request(5)
.expectNext(EventEnvelope(1L, "b", 1L, "a black car"))
.expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car"))
.expectComplete()
}
@ -95,8 +95,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "a", 2L, "a green apple"))
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope2(Sequence(1L), "a", 2L, "a green apple"))
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
.expectNoMsg(100.millis)
c ! "a green cucumber"
@ -105,7 +105,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
probe
.expectNoMsg(100.millis)
.request(5)
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
.expectComplete() // green cucumber not seen
}
@ -113,9 +113,9 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber"))
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
.expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber"))
.expectComplete()
}
}
@ -127,7 +127,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L))
val probe = blackSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "b", 1L, "a black car"))
.expectNext(EventEnvelope2(Sequence(1L), "b", 1L, "a black car"))
.expectNoMsg(100.millis)
d ! "a black dog"
@ -136,19 +136,19 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
expectMsg(s"a black night-done")
probe
.expectNext(EventEnvelope(2L, "d", 1L, "a black dog"))
.expectNext(EventEnvelope2(Sequence(2L), "d", 1L, "a black dog"))
.expectNoMsg(100.millis)
.request(10)
.expectNext(EventEnvelope(3L, "d", 2L, "a black night"))
.expectNext(EventEnvelope2(Sequence(3L), "d", 2L, "a black night"))
}
"find events from offset" in {
val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber"))
.expectNext(EventEnvelope2(Sequence(2L), "a", 3L, "a green banana"))
.expectNext(EventEnvelope2(Sequence(3L), "b", 2L, "a green leaf"))
.expectNext(EventEnvelope2(Sequence(4L), "c", 1L, "a green cucumber"))
.expectNoMsg(100.millis)
}