eventsByTag and currentEventsByTag to use Offset (#21615)

* EventsByTagQuery2 and CurrentEventsByTagQuery2 to keep binary compatibility
This commit is contained in:
Richard Imaoka 2016-10-17 23:45:27 +09:00 committed by Patrik Nordwall
parent c3abde60d5
commit 375c032604
17 changed files with 247 additions and 64 deletions

View file

@ -6,12 +6,13 @@ package docs.persistence.query
import akka.NotUsed
import akka.persistence.journal.{ EventAdapter, EventSeq }
import akka.testkit.AkkaSpec
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.scaladsl._
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.journal.Tagged
import akka.stream.scaladsl.Source
import akka.persistence.query.EventEnvelope
import akka.stream.ActorMaterializer
import scala.annotation.tailrec
object LeveldbPersistenceQueryDocSpec {
@ -81,7 +82,7 @@ class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] =
queries.eventsByTag(tag = "green", offset = 0L)
queries.eventsByTag(tag = "green", offset = Sequence(0L))
//#EventsByTag
}

View file

@ -6,15 +6,16 @@ package docs.persistence.query
import akka.NotUsed
import akka.actor._
import akka.persistence.{ Recovery, PersistentActor }
import akka.persistence.{ PersistentActor, Recovery }
import akka.persistence.query._
import akka.stream.{ FlowShape, ActorMaterializer }
import akka.stream.{ ActorMaterializer, FlowShape }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.javadsl
import akka.testkit.AkkaSpec
import akka.util.Timeout
import docs.persistence.query.PersistenceQueryDocSpec.{ TheOneWhoWritesToQueryJournal }
import docs.persistence.query.PersistenceQueryDocSpec.TheOneWhoWritesToQueryJournal
import org.reactivestreams.Subscriber
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
@ -45,7 +46,7 @@ object PersistenceQueryDocSpec {
class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
extends akka.persistence.query.scaladsl.ReadJournal
with akka.persistence.query.scaladsl.EventsByTagQuery
with akka.persistence.query.scaladsl.EventsByTagQuery2
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
with akka.persistence.query.scaladsl.AllPersistenceIdsQuery
with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
@ -54,10 +55,13 @@ object PersistenceQueryDocSpec {
config.getDuration("refresh-interval", MILLISECONDS).millis
override def eventsByTag(
tag: String, offset: Long = 0L): Source[EventEnvelope, NotUsed] = {
val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval)
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ => NotUsed)
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match {
case Sequence(offsetValue)
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval)
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ => NotUsed)
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}
override def eventsByPersistenceId(
@ -90,13 +94,13 @@ object PersistenceQueryDocSpec {
class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
extends akka.persistence.query.javadsl.ReadJournal
with akka.persistence.query.javadsl.EventsByTagQuery
with akka.persistence.query.javadsl.EventsByTagQuery2
with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
with akka.persistence.query.javadsl.AllPersistenceIdsQuery
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
override def eventsByTag(
tag: String, offset: Long = 0L): javadsl.Source[EventEnvelope, NotUsed] =
tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByPersistenceId(
@ -232,7 +236,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
.runFold(Vector.empty[Any])(_ :+ _)
// start another query, from the known offset
val furtherBlueThings = readJournal.eventsByTag("blue", offset = 10)
val furtherBlueThings = readJournal.eventsByTag("blue", offset = Sequence(10))
//#events-by-tag
//#events-by-persistent-id
@ -279,7 +283,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
bidProjection.latestOffset.foreach { startFromOffset =>
readJournal
.eventsByTag("bid", startFromOffset)
.eventsByTag("bid", Sequence(startFromOffset))
.mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) }
.mapAsync(1) { offset => bidProjection.saveProgress(offset) }
.runWith(Sink.ignore)