+per #18191 Leveldb impl of AllPersistenceIds query

This commit is contained in:
Patrik Nordwall 2015-08-19 20:17:15 +02:00
parent 009d80dd35
commit 75535bba43
7 changed files with 207 additions and 13 deletions

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
/**
* INTERNAL API
*/
private[akka] object AllPersistenceIdsPublisher {
def props(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String): Props =
Props(new AllPersistenceIdsPublisher(liveQuery, maxBufSize, writeJournalPluginId))
private case object Continue
}
class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String)
extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging {
import AllPersistenceIdsPublisher._
val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId)
def receive = init
def init: Receive = {
case _: Request
journal ! LeveldbJournal.SubscribeAllPersistenceIds
context.become(active)
case Cancel context.stop(self)
}
def active: Receive = {
case LeveldbJournal.CurrentPersistenceIds(allPersistenceIds)
buf ++= allPersistenceIds
deliverBuf()
if (!liveQuery && buf.isEmpty)
onCompleteThenStop()
case LeveldbJournal.PersistenceIdAdded(persistenceId)
if (liveQuery) {
buf :+= persistenceId
deliverBuf()
}
case _: Request
deliverBuf()
if (!liveQuery && buf.isEmpty)
onCompleteThenStop()
case Cancel context.stop(self)
}
}

View file

@ -59,7 +59,7 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long
}
def idle: Receive = {
case Continue | _: LeveldbJournal.ChangedPersistenceId
case Continue | _: LeveldbJournal.EventAppended
if (timeForReplay)
replay()
@ -122,7 +122,7 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long
case _: Request
deliverBuf()
case Continue | _: LeveldbJournal.ChangedPersistenceId // skip during replay
case Continue | _: LeveldbJournal.EventAppended // skip during replay
case Cancel
context.stop(self)

View file

@ -16,6 +16,7 @@ import akka.persistence.query.NoRefresh
import akka.persistence.query.RefreshInterval
import com.typesafe.config.Config
import akka.persistence.query.EventEnvelope
import akka.persistence.query.AllPersistenceIds
object LeveldbReadJournal {
final val Identifier = "akka.persistence.query.journal.leveldb"
@ -31,12 +32,21 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match {
case EventsByPersistenceId(pid, from, to) eventsByPersistenceId(pid, from, to, hints)
case AllPersistenceIds allPersistenceIds(hints)
case unknown unsupportedQueryType(unknown)
}
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = {
Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr,
refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ())
.named("eventsByPersistenceId-" + persistenceId)
}
def allPersistenceIds(hints: Seq[Hint]): Source[String, Unit] = {
val liveQuery = refreshInterval(hints).isDefined
Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ ())
.named("allPersistenceIds")
}
private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] =

View file

@ -0,0 +1,76 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.persistence.query.EventsByPersistenceId
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.RefreshInterval
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.persistence.query.NoRefresh
import akka.testkit.AkkaSpec
import akka.persistence.query.AllPersistenceIds
object AllPersistenceIdsSpec {
val config = """
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-AllPersistenceIdsSpec"
"""
}
class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config)
with Cleanup with ImplicitSender {
import AllPersistenceIdsSpec._
implicit val mat = ActorMaterializer()(system)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
"Leveldb query AllPersistenceIds" must {
"find existing persistenceIds" in {
system.actorOf(TestActor.props("a")) ! "a1"
expectMsg("a1-done")
system.actorOf(TestActor.props("b")) ! "b1"
expectMsg("b1-done")
system.actorOf(TestActor.props("c")) ! "c1"
expectMsg("c1-done")
val src = queries.query(AllPersistenceIds, NoRefresh)
src.runWith(TestSink.probe[String])
.request(5)
.expectNextUnordered("a", "b", "c")
.expectComplete()
}
"find new persistenceIds" in {
// a, b, c created by previous step
system.actorOf(TestActor.props("d")) ! "d1"
expectMsg("d1-done")
val src = queries.query(AllPersistenceIds)
val probe = src.runWith(TestSink.probe[String])
.request(5)
.expectNextUnorderedN(List("a", "b", "c", "d"))
system.actorOf(TestActor.props("e")) ! "e1"
probe.expectNext("e")
val more = (1 to 100).map("f" + _)
more.foreach { p
system.actorOf(TestActor.props(p)) ! p
}
probe.request(100)
probe.expectNextUnorderedN(more)
}
}
}