!per #18463 Make Persistence Query API explorable

* make the standard queries "single method interfaces" that may be implemented
  by a query journal plugin
* remove hints (major problems with varargs anyway), the hints for standard
  queries  should be given in configuration instead, e.g. refresh-interval
This commit is contained in:
Patrik Nordwall 2015-09-14 11:08:22 +02:00
parent a45f31cecb
commit 5bd245fbc8
46 changed files with 1487 additions and 948 deletions

View file

@ -5,12 +5,12 @@
package docs.persistence.query
import akka.actor._
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.{ Recovery, PersistentActor }
import akka.persistence.query._
import akka.stream.{ FlowShape, ActorMaterializer }
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.javadsl
import akka.testkit.AkkaSpec
import akka.util.Timeout
import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal }
@ -25,42 +25,112 @@ object PersistenceQueryDocSpec {
implicit val timeout = Timeout(3.seconds)
//#advanced-journal-query-types
final case class RichEvent(tags: Set[String], payload: Any)
// a plugin can provide:
case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean)
//#advanced-journal-query-types
//#my-read-journal
class MyReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal {
class MyReadJournalProvider(system: ExtendedActorSystem, config: Config)
extends ReadJournalProvider {
private val defaulRefreshInterval = 3.seconds
override val scaladslReadJournal: MyScaladslReadJournal =
new MyScaladslReadJournal(system, config)
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
q match {
case EventsByTag(tag, offset)
val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints))
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ noMaterializedValue)
override val javadslReadJournal: MyJavadslReadJournal =
new MyJavadslReadJournal(scaladslReadJournal)
}
case unsupported
Source.failed[T](
new UnsupportedOperationException(
s"Query $unsupported not supported by ${getClass.getName}"))
.mapMaterializedValue(_ noMaterializedValue)
}
class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
extends akka.persistence.query.scaladsl.ReadJournal
with akka.persistence.query.scaladsl.EventsByTagQuery
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
with akka.persistence.query.scaladsl.AllPersistenceIdsQuery
with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
private def refreshInterval(hints: Seq[Hint]): FiniteDuration =
hints.collectFirst { case RefreshInterval(interval) interval }
.getOrElse(defaulRefreshInterval)
private val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval", MILLISECONDS).millis
private def noMaterializedValue[M]: M =
null.asInstanceOf[M]
override def eventsByTag(
tag: String, offset: Long = 0L): Source[EventEnvelope, Unit] = {
val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval)
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ ())
}
override def eventsByPersistenceId(
persistenceId: String, fromSequenceNr: Long = 0L,
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, Unit] = {
// implement in a similar way as eventsByTag
???
}
override def allPersistenceIds(): Source[String, Unit] = {
// implement in a similar way as eventsByTag
???
}
override def currentPersistenceIds(): Source[String, Unit] = {
// implement in a similar way as eventsByTag
???
}
// possibility to add more plugin specific queries
//#advanced-journal-query-definition
def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = {
//#advanced-journal-query-definition
// implement in a similar way as eventsByTag
???
}
}
class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
extends akka.persistence.query.javadsl.ReadJournal
with akka.persistence.query.javadsl.EventsByTagQuery
with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
with akka.persistence.query.javadsl.AllPersistenceIdsQuery
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
override def eventsByTag(
tag: String, offset: Long = 0L): javadsl.Source[EventEnvelope, Unit] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByPersistenceId(
persistenceId: String, fromSequenceNr: Long = 0L,
toSequenceNr: Long = Long.MaxValue): javadsl.Source[EventEnvelope, Unit] =
scaladslReadJournal.eventsByPersistenceId(
persistenceId, fromSequenceNr, toSequenceNr).asJava
override def allPersistenceIds(): javadsl.Source[String, Unit] =
scaladslReadJournal.allPersistenceIds().asJava
override def currentPersistenceIds(): javadsl.Source[String, Unit] =
scaladslReadJournal.currentPersistenceIds().asJava
// possibility to add more plugin specific queries
def byTagsWithMeta(
tags: java.util.Set[String]): javadsl.Source[RichEvent, QueryMetadata] = {
import scala.collection.JavaConverters._
scaladslReadJournal.byTagsWithMeta(tags.asScala.toSet).asJava
}
}
//#my-read-journal
case class ComplexState() {
def readyToSave = false
}
case class Record(any: Any)
class DummyStore { def save(record: Record) = Future.successful(42L) }
val JournalId = "akka.persistence.query.my-read-journal"
class X {
val JournalId = ""
def convertToReadSideTypes(in: Any): Any = ???
@ -72,13 +142,14 @@ object PersistenceQueryDocSpec {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val readJournal = PersistenceQuery(system).readJournalFor(JournalId)
val readJournal =
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId)
val dbBatchWriter: Subscriber[immutable.Seq[Any]] =
ReactiveStreamsCompatibleDBDriver.batchWriter
// Using an example (Reactive Streams) Database driver
readJournal
.query(EventsByPersistenceId("user-1337"))
.eventsByPersistenceId("user-1337")
.map(envelope => envelope.event)
.map(convertToReadSideTypes) // convert to datatype
.grouped(20) // batch inserts into groups of 20
@ -109,103 +180,81 @@ object PersistenceQueryDocSpec {
}
class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
import PersistenceQueryDocSpec._
def this() {
this(
"""
akka.persistence.query.noop-read-journal {
class = "docs.persistence.query.NoopReadJournal"
akka.persistence.query.my-read-journal {
class = "docs.persistence.query.PersistenceQueryDocSpec$MyReadJournalProvider"
refresh-interval = 3s
}
""".stripMargin)
""")
}
//#basic-usage
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal")
// issue query to journal
val source: Source[EventEnvelope, Unit] =
readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue))
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach { event => println("Event: " + event) }
//#basic-usage
//#all-persistence-ids-live
readJournal.query(AllPersistenceIds)
//#all-persistence-ids-live
class BasicUsage {
//#basic-usage
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](
"akka.persistence.query.my-read-journal")
//#all-persistence-ids-snap
readJournal.query(AllPersistenceIds, hints = NoRefresh)
//#all-persistence-ids-snap
// issue query to journal
val source: Source[EventEnvelope, Unit] =
readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)
//#events-by-tag
// assuming journal is able to work with numeric offsets we can:
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach { event => println("Event: " + event) }
//#basic-usage
val blueThings: Source[EventEnvelope, Unit] =
readJournal.query(EventsByTag("blue"))
//#all-persistence-ids-live
readJournal.allPersistenceIds()
//#all-persistence-ids-live
// find top 10 blue things:
val top10BlueThings: Future[Vector[Any]] =
blueThings
.map(_.event)
.take(10) // cancels the query stream after pulling 10 elements
.runFold(Vector.empty[Any])(_ :+ _)
//#all-persistence-ids-snap
readJournal.currentPersistenceIds()
//#all-persistence-ids-snap
// start another query, from the known offset
val furtherBlueThings = readJournal.query(EventsByTag("blue", offset = 10))
//#events-by-tag
//#events-by-tag
// assuming journal is able to work with numeric offsets we can:
//#events-by-persistent-id-refresh
readJournal.query(EventsByPersistenceId("user-us-1337"), hints = RefreshInterval(1.second))
val blueThings: Source[EventEnvelope, Unit] =
readJournal.eventsByTag("blue")
//#events-by-persistent-id-refresh
// find top 10 blue things:
val top10BlueThings: Future[Vector[Any]] =
blueThings
.map(_.event)
.take(10) // cancels the query stream after pulling 10 elements
.runFold(Vector.empty[Any])(_ :+ _)
//#advanced-journal-query-definition
final case class RichEvent(tags: immutable.Set[String], payload: Any)
// start another query, from the known offset
val furtherBlueThings = readJournal.eventsByTag("blue", offset = 10)
//#events-by-tag
case class QueryStats(totalEvents: Long)
//#events-by-persistent-id
readJournal.eventsByPersistenceId("user-us-1337")
case class ByTagsWithStats(tags: immutable.Set[String])
extends Query[RichEvent, QueryStats]
//#events-by-persistent-id
//#advanced-journal-query-definition
//#advanced-journal-query-usage
val query: Source[RichEvent, QueryMetadata] =
readJournal.byTagsWithMeta(Set("red", "blue"))
//#advanced-journal-query-hints
query
.mapMaterializedValue { meta =>
println(s"The query is: " +
s"ordered deterministically: ${meta.deterministicOrder}, " +
s"infinite: ${meta.infinite}")
}
.map { event => println(s"Event payload: ${event.payload}") }
.runWith(Sink.ignore)
import scala.concurrent.duration._
readJournal.query(EventsByTag("blue"), hints = RefreshInterval(1.second))
//#advanced-journal-query-hints
//#advanced-journal-query-usage
val query: Source[RichEvent, QueryStats] =
readJournal.query(ByTagsWithStats(Set("red", "blue")))
query
.mapMaterializedValue { stats => println(s"Stats: $stats") }
.map { event => println(s"Event payload: ${event.payload}") }
.runWith(Sink.ignore)
//#advanced-journal-query-usage
//#materialized-query-metadata
// a plugin can provide:
case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean)
case object AllEvents extends Query[Any, QueryMetadata]
val events = readJournal.query(AllEvents)
events
.mapMaterializedValue { meta =>
println(s"The query is: " +
s"ordered deterministically: ${meta.deterministicOrder}, " +
s"infinite: ${meta.infinite}")
}
//#materialized-query-metadata
//#advanced-journal-query-usage
}
//#projection-into-different-store
class MyResumableProjection(name: String) {
@ -215,6 +264,9 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
//#projection-into-different-store
class RunWithActor {
val readJournal =
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId)
//#projection-into-different-store-actor-run
import akka.pattern.ask
import system.dispatcher
@ -227,7 +279,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
bidProjection.latestOffset.foreach { startFromOffset =>
readJournal
.query(EventsByTag("bid", startFromOffset))
.eventsByTag("bid", startFromOffset)
.mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) }
.mapAsync(1) { offset => bidProjection.saveProgress(offset) }
.runWith(Sink.ignore)
@ -236,6 +288,10 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
}
class RunWithAsyncFunction {
val readJournal =
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](
"akka.persistence.query.my-read-journal")
//#projection-into-different-store-simple
trait ExampleStore {
def save(event: Any): Future[Unit]
@ -246,7 +302,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
val store: ExampleStore = ???
readJournal
.query(EventsByTag("bid"))
.eventsByTag("bid")
.mapAsync(1) { e => store.save(e) }
.runWith(Sink.ignore)
//#projection-into-different-store-simple
@ -254,7 +310,3 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
}
class NoopReadJournal(sys: ExtendedActorSystem) extends ReadJournal {
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
Source.empty.mapMaterializedValue(_ => null.asInstanceOf[M])
}