+per #18190 leveldb impl of EventsByPersistenceId query

* also changed EventsByPersistenceId query type to return
Source[EventEnvelope]
This commit is contained in:
Patrik Nordwall 2015-08-19 11:14:43 +02:00
parent a0bee97f26
commit 009d80dd35
16 changed files with 484 additions and 19 deletions

View file

@ -0,0 +1,30 @@
#######################################################
# Akka Persistence Query Reference Configuration File #
#######################################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
akka.persistence.query {
journal {
leveldb {
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal"
# Absolute path to the write journal plugin configuration entry that this query journal
# will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# If undefined (or "") it will connect to the default journal as specified by the
# akka.persistence.journal.plugin property.
write-plugin = ""
# Look for more data with this interval. The query journal is also notified by
# the write journal when something is changed and thereby updated quickly, but
# when there are a lot of changes it falls back to periodic queries to avoid
# overloading the system with many small queries.
refresh-interval = 3s
# How many events to fetch in one query and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
}
}

View file

@ -4,12 +4,11 @@
package akka.persistence.query
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.event.Logging
import scala.annotation.tailrec
import scala.util.Failure
import com.typesafe.config.Config
/**
* Persistence extension for queries.
@ -75,13 +74,15 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
// TODO remove duplication
val scalaPlugin =
if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil))
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil))
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil))
.map(jj new scaladsl.ReadJournalAdapter(jj))
.recoverWith {

View file

@ -37,7 +37,7 @@ abstract class AllPersistenceIds extends Query[String, Unit]
* A plugin may optionally support this [[Query]].
*/
final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue)
extends Query[Any, Unit]
extends Query[EventEnvelope, Unit]
object EventsByPersistenceId {
/** Java API */
def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId =

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import akka.stream.actor.ActorPublisher
/**
* INTERNAL API
*/
private[akka] trait DeliveryBuffer[T] { _: ActorPublisher[T]
var buf = Vector.empty[T]
def deliverBuf(): Unit =
if (buf.nonEmpty && totalDemand > 0) {
if (buf.size == 1) {
// optimize for this common case
onNext(buf.head)
buf = Vector.empty
} else if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
buf foreach onNext
buf = Vector.empty
}
}
}

View file

@ -0,0 +1,132 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.persistence.query.EventEnvelope
/**
* INTERNAL API
*/
private[akka] object EventsByPersistenceIdPublisher {
def props(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String): Props =
Props(new EventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, refreshInterval,
maxBufSize, writeJournalPluginId))
private case object Continue
}
class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String)
extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging {
import EventsByPersistenceIdPublisher._
val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId)
var currSeqNo = fromSequenceNr
val tickTask = refreshInterval.map { interval
import context.dispatcher
context.system.scheduler.schedule(interval, interval, self, Continue)
}
def nonLiveQuery: Boolean = refreshInterval.isEmpty
override def postStop(): Unit = {
tickTask.foreach(_.cancel())
}
def receive = init
def init: Receive = {
case _: Request
journal ! LeveldbJournal.SubscribePersistenceId(persistenceId)
replay()
case Continue // skip, wait for first Request
case Cancel context.stop(self)
}
def idle: Receive = {
case Continue | _: LeveldbJournal.ChangedPersistenceId
if (timeForReplay)
replay()
case _: Request
deliverBuf()
if (nonLiveQuery) {
if (buf.isEmpty)
onCompleteThenStop()
else
self ! Continue
}
case Cancel
context.stop(self)
}
def timeForReplay: Boolean =
buf.isEmpty || buf.size <= maxBufSize / 2
def replay(): Unit = {
val limit = maxBufSize - buf.size
log.debug("request replay for persistenceId [{}] from [{}] to [{}] limit [{}]", persistenceId, currSeqNo, toSequenceNr, limit)
journal ! ReplayMessages(currSeqNo, toSequenceNr, limit, persistenceId, self)
context.become(replaying(limit))
}
def replaying(limit: Int): Receive = {
var replayCount = 0
{
case ReplayedMessage(p)
buf :+= EventEnvelope(
offset = p.sequenceNr,
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currSeqNo = p.sequenceNr + 1
replayCount += 1
deliverBuf()
case _: RecoverySuccess
log.debug("replay completed for persistenceId [{}], currSeqNo [{}], replayCount [{}]", persistenceId, currSeqNo, replayCount)
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
else if (nonLiveQuery) {
if (buf.isEmpty && replayCount < limit)
onCompleteThenStop()
else
self ! Continue // more to fetch
}
context.become(idle)
case ReplayMessagesFailure(cause)
log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage)
deliverBuf()
onErrorThenStop(cause)
case _: Request
deliverBuf()
case Continue | _: LeveldbJournal.ChangedPersistenceId // skip during replay
case Cancel
context.stop(self)
}
}
}

View file

@ -0,0 +1,51 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem
import akka.persistence.query.EventsByPersistenceId
import akka.persistence.query.Hint
import akka.persistence.query.Query
import akka.persistence.query.scaladsl
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.Source
import scala.concurrent.duration.FiniteDuration
import akka.persistence.query.NoRefresh
import akka.persistence.query.RefreshInterval
import com.typesafe.config.Config
import akka.persistence.query.EventEnvelope
object LeveldbReadJournal {
final val Identifier = "akka.persistence.query.journal.leveldb"
}
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal {
private val serialization = SerializationExtension(system)
private val defaulRefreshInterval: Option[FiniteDuration] =
Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size")
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match {
case EventsByPersistenceId(pid, from, to) eventsByPersistenceId(pid, from, to, hints)
case unknown unsupportedQueryType(unknown)
}
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = {
Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr,
refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ())
}
private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] =
if (hints.contains(NoRefresh))
None
else
hints.collectFirst { case RefreshInterval(interval) interval }.orElse(defaulRefreshInterval)
private def unsupportedQueryType[M, T](unknown: Query[T, M]): Nothing =
throw new IllegalArgumentException(s"${getClass.getSimpleName} does not implement the ${unknown.getClass.getName} query type!")
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import akka.testkit.AkkaSpec
import java.io.File
import org.apache.commons.io.FileUtils
trait Cleanup { this: AkkaSpec
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
override protected def afterTermination() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.persistence.query.EventsByPersistenceId
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.RefreshInterval
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.persistence.query.NoRefresh
import akka.testkit.AkkaSpec
object EventsByPersistenceIdSpec {
val config = """
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec"
"""
}
class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.config)
with Cleanup with ImplicitSender {
import EventsByPersistenceIdSpec._
implicit val mat = ActorMaterializer()(system)
val refreshInterval = RefreshInterval(1.second)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
def setup(persistenceId: String): ActorRef = {
val ref = system.actorOf(TestActor.props(persistenceId))
ref ! s"$persistenceId-1"
ref ! s"$persistenceId-2"
ref ! s"$persistenceId-3"
expectMsg(s"$persistenceId-1-done")
expectMsg(s"$persistenceId-2-done")
expectMsg(s"$persistenceId-3-done")
ref
}
"Leveldb query EventsByPersistenceId" must {
"find existing events" in {
val ref = setup("a")
val src = queries.query(EventsByPersistenceId("a", 0L, Long.MaxValue), NoRefresh)
src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("a-1", "a-2")
.expectNoMsg(500.millis)
.request(2)
.expectNext("a-3")
.expectComplete()
}
"find existing events up to a sequence number" in {
val ref = setup("b")
val src = queries.query(EventsByPersistenceId("b", 0L, 2L), NoRefresh)
src.map(_.event).runWith(TestSink.probe[Any])
.request(5)
.expectNext("b-1", "b-2")
.expectComplete()
}
}
"Leveldb live query EventsByPersistenceId" must {
"find new events" in {
val ref = setup("c")
val src = queries.query(EventsByPersistenceId("c", 0L, Long.MaxValue), refreshInterval)
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(5)
.expectNext("c-1", "c-2", "c-3")
ref ! "c-4"
expectMsg("c-4-done")
probe.expectNext("c-4")
}
"find new events up to a sequence number" in {
val ref = setup("d")
val src = queries.query(EventsByPersistenceId("d", 0L, 4L), refreshInterval)
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(5)
.expectNext("d-1", "d-2", "d-3")
ref ! "d-4"
expectMsg("d-4-done")
probe.expectNext("d-4").expectComplete()
}
}
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import akka.persistence.PersistentActor
import akka.actor.Props
object TestActor {
def props(persistenceId: String): Props =
Props(new TestActor(persistenceId))
}
class TestActor(override val persistenceId: String) extends PersistentActor {
val receiveRecover: Receive = {
case evt: String
}
val receiveCommand: Receive = {
case cmd: String
persist(cmd) { evt
sender() ! evt + "-done"
}
}
}