+per #18192 leveldb impl of EventsByTag query

* also refactoring of EventsByPersistenceIdPublisher

* increase test timeouts
This commit is contained in:
Patrik Nordwall 2015-08-20 11:45:24 +02:00
parent b9fecfd53b
commit 8f723deda1
11 changed files with 688 additions and 80 deletions

View file

@ -23,7 +23,10 @@ private[akka] object AllPersistenceIdsPublisher {
private case object Continue
}
class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String)
/**
* INTERNAL API
*/
private[akka] class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String)
extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging {
import AllPersistenceIdsPublisher._

View file

@ -20,16 +20,29 @@ import akka.persistence.query.EventEnvelope
*/
private[akka] object EventsByPersistenceIdPublisher {
def props(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String): Props =
Props(new EventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, refreshInterval,
maxBufSize, writeJournalPluginId))
maxBufSize: Int, writeJournalPluginId: String): Props = {
refreshInterval match {
case Some(interval)
Props(new LiveEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, interval,
maxBufSize, writeJournalPluginId))
case None
Props(new CurrentEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr,
maxBufSize, writeJournalPluginId))
}
}
private case object Continue
/**
* INTERNAL API
*/
private[akka] case object Continue
}
class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String)
/**
* INTERNAL API
*/
private[akka] abstract class AbstractEventsByPersistenceIdPublisher(
val persistenceId: String, val fromSequenceNr: Long,
val maxBufSize: Int, val writeJournalPluginId: String)
extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging {
import EventsByPersistenceIdPublisher._
@ -37,48 +50,34 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long
var currSeqNo = fromSequenceNr
val tickTask = refreshInterval.map { interval
import context.dispatcher
context.system.scheduler.schedule(interval, interval, self, Continue)
}
def nonLiveQuery: Boolean = refreshInterval.isEmpty
override def postStop(): Unit = {
tickTask.foreach(_.cancel())
}
def toSequenceNr: Long
def receive = init
def init: Receive = {
case _: Request
journal ! LeveldbJournal.SubscribePersistenceId(persistenceId)
replay()
case Continue // skip, wait for first Request
case Cancel context.stop(self)
case _: Request receiveInitialRequest()
case Continue // skip, wait for first Request
case Cancel context.stop(self)
}
def receiveInitialRequest(): Unit
def idle: Receive = {
case Continue | _: LeveldbJournal.EventAppended
if (timeForReplay)
replay()
case _: Request
deliverBuf()
if (nonLiveQuery) {
if (buf.isEmpty)
onCompleteThenStop()
else
self ! Continue
}
receiveIdleRequest()
case Cancel
context.stop(self)
}
def receiveIdleRequest(): Unit
def timeForReplay: Boolean =
buf.isEmpty || buf.size <= maxBufSize / 2
(buf.isEmpty || buf.size <= maxBufSize / 2) && (currSeqNo <= toSequenceNr)
def replay(): Unit = {
val limit = maxBufSize - buf.size
@ -88,45 +87,105 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long
}
def replaying(limit: Int): Receive = {
var replayCount = 0
case ReplayedMessage(p)
buf :+= EventEnvelope(
offset = p.sequenceNr,
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currSeqNo = p.sequenceNr + 1
deliverBuf()
{
case ReplayedMessage(p)
buf :+= EventEnvelope(
offset = p.sequenceNr,
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currSeqNo = p.sequenceNr + 1
replayCount += 1
deliverBuf()
case RecoverySuccess(highestSeqNr)
log.debug("replay completed for persistenceId [{}], currSeqNo [{}]", persistenceId, currSeqNo)
receiveRecoverySuccess(highestSeqNr)
case _: RecoverySuccess
log.debug("replay completed for persistenceId [{}], currSeqNo [{}], replayCount [{}]", persistenceId, currSeqNo, replayCount)
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
else if (nonLiveQuery) {
if (buf.isEmpty && replayCount < limit)
onCompleteThenStop()
else
self ! Continue // more to fetch
}
context.become(idle)
case ReplayMessagesFailure(cause)
log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage)
deliverBuf()
onErrorThenStop(cause)
case ReplayMessagesFailure(cause)
log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage)
deliverBuf()
onErrorThenStop(cause)
case _: Request
deliverBuf()
case _: Request
deliverBuf()
case Continue | _: LeveldbJournal.EventAppended // skip during replay
case Continue | _: LeveldbJournal.EventAppended // skip during replay
case Cancel
context.stop(self)
}
case Cancel
context.stop(self)
}
def receiveRecoverySuccess(highestSeqNr: Long): Unit
}
/**
* INTERNAL API
*/
private[akka] class LiveEventsByPersistenceIdPublisher(
persistenceId: String, fromSequenceNr: Long, override val toSequenceNr: Long,
refreshInterval: FiniteDuration,
maxBufSize: Int, writeJournalPluginId: String)
extends AbstractEventsByPersistenceIdPublisher(
persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) {
import EventsByPersistenceIdPublisher._
val tickTask =
context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher)
override def postStop(): Unit =
tickTask.cancel()
override def receiveInitialRequest(): Unit = {
journal ! LeveldbJournal.SubscribePersistenceId(persistenceId)
replay()
}
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
}
override def receiveRecoverySuccess(highestSeqNr: Long): Unit = {
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
context.become(idle)
}
}
/**
* INTERNAL API
*/
private[akka] class CurrentEventsByPersistenceIdPublisher(
persistenceId: String, fromSequenceNr: Long, var toSeqNr: Long,
maxBufSize: Int, writeJournalPluginId: String)
extends AbstractEventsByPersistenceIdPublisher(
persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) {
import EventsByPersistenceIdPublisher._
override def toSequenceNr: Long = toSeqNr
override def receiveInitialRequest(): Unit =
replay()
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
else
self ! Continue
}
override def receiveRecoverySuccess(highestSeqNr: Long): Unit = {
deliverBuf()
if (highestSeqNr < toSequenceNr)
toSeqNr = highestSeqNr
if (highestSeqNr == 0L || (buf.isEmpty && currSeqNo > toSequenceNr))
onCompleteThenStop()
else
self ! Continue // more to fetch
context.become(idle)
}
}

View file

@ -0,0 +1,193 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage
/**
* INTERNAL API
*/
private[akka] object EventsByTagPublisher {
def props(tag: String, fromOffset: Long, toOffset: Long, refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String): Props = {
refreshInterval match {
case Some(interval)
Props(new LiveEventsByTagPublisher(tag, fromOffset, toOffset, interval,
maxBufSize, writeJournalPluginId))
case None
Props(new CurrentEventsByTagPublisher(tag, fromOffset, toOffset,
maxBufSize, writeJournalPluginId))
}
}
/**
* INTERNAL API
*/
private[akka] case object Continue
}
/**
* INTERNAL API
*/
private[akka] abstract class AbstractEventsByTagPublisher(
val tag: String, val fromOffset: Long,
val maxBufSize: Int, val writeJournalPluginId: String)
extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging {
import EventsByTagPublisher._
val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId)
var currOffset = fromOffset
def toOffset: Long
def receive = init
def init: Receive = {
case _: Request receiveInitialRequest()
case Continue // skip, wait for first Request
case Cancel context.stop(self)
}
def receiveInitialRequest(): Unit
def idle: Receive = {
case Continue | _: LeveldbJournal.TaggedEventAppended
if (timeForReplay)
replay()
case _: Request
receiveIdleRequest()
case Cancel
context.stop(self)
}
def receiveIdleRequest(): Unit
def timeForReplay: Boolean =
(buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset <= toOffset)
def replay(): Unit = {
val limit = maxBufSize - buf.size
log.debug("request replay for tag [{}] from [{}] to [{}] limit [{}]", tag, currOffset, toOffset, limit)
journal ! ReplayTaggedMessages(currOffset, toOffset, limit, tag, self)
context.become(replaying(limit))
}
def replaying(limit: Int): Receive = {
case ReplayedTaggedMessage(p, _, offset)
buf :+= EventEnvelope(
offset = offset,
persistenceId = p.persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currOffset = offset + 1
deliverBuf()
case RecoverySuccess(highestSeqNr)
log.debug("replay completed for tag [{}], currOffset [{}]", tag, currOffset)
receiveRecoverySuccess(highestSeqNr)
case ReplayMessagesFailure(cause)
log.debug("replay failed for tag [{}], due to [{}]", tag, cause.getMessage)
deliverBuf()
onErrorThenStop(cause)
case _: Request
deliverBuf()
case Continue | _: LeveldbJournal.TaggedEventAppended // skip during replay
case Cancel
context.stop(self)
}
def receiveRecoverySuccess(highestSeqNr: Long): Unit
}
/**
* INTERNAL API
*/
private[akka] class LiveEventsByTagPublisher(
tag: String, fromOffset: Long, override val toOffset: Long,
refreshInterval: FiniteDuration,
maxBufSize: Int, writeJournalPluginId: String)
extends AbstractEventsByTagPublisher(
tag, fromOffset, maxBufSize, writeJournalPluginId) {
import EventsByTagPublisher._
val tickTask =
context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher)
override def postStop(): Unit =
tickTask.cancel()
override def receiveInitialRequest(): Unit = {
journal ! LeveldbJournal.SubscribeTag(tag)
replay()
}
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
onCompleteThenStop()
}
override def receiveRecoverySuccess(highestSeqNr: Long): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
onCompleteThenStop()
context.become(idle)
}
}
/**
* INTERNAL API
*/
private[akka] class CurrentEventsByTagPublisher(
tag: String, fromOffset: Long, var _toOffset: Long,
maxBufSize: Int, writeJournalPluginId: String)
extends AbstractEventsByTagPublisher(
tag, fromOffset, maxBufSize, writeJournalPluginId) {
import EventsByTagPublisher._
override def toOffset: Long = _toOffset
override def receiveInitialRequest(): Unit =
replay()
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
onCompleteThenStop()
else
self ! Continue
}
override def receiveRecoverySuccess(highestSeqNr: Long): Unit = {
deliverBuf()
if (highestSeqNr < toOffset)
_toOffset = highestSeqNr
if (highestSeqNr == 0L || (buf.isEmpty && currOffset > toOffset))
onCompleteThenStop()
else
self ! Continue // more to fetch
context.become(idle)
}
}

View file

@ -17,6 +17,9 @@ import akka.persistence.query.RefreshInterval
import com.typesafe.config.Config
import akka.persistence.query.EventEnvelope
import akka.persistence.query.AllPersistenceIds
import akka.persistence.query.EventsByTag
import akka.util.ByteString
import java.net.URLEncoder
object LeveldbReadJournal {
final val Identifier = "akka.persistence.query.journal.leveldb"
@ -33,6 +36,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match {
case EventsByPersistenceId(pid, from, to) eventsByPersistenceId(pid, from, to, hints)
case AllPersistenceIds allPersistenceIds(hints)
case EventsByTag(tag, offset) eventsByTag(tag, offset, hints)
case unknown unsupportedQueryType(unknown)
}
@ -43,12 +47,20 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc
}
def allPersistenceIds(hints: Seq[Hint]): Source[String, Unit] = {
// no polling for this query, the write journal will push all changes, but
// we still use the `NoRefresh` hint as user API
val liveQuery = refreshInterval(hints).isDefined
Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ ())
.named("allPersistenceIds")
}
def eventsByTag(tag: String, offset: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = {
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue,
refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ())
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
}
private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] =
if (hints.contains(NoRefresh))
None

View file

@ -22,6 +22,7 @@ object AllPersistenceIdsSpec {
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-AllPersistenceIdsSpec"
akka.test.single-expect-default = 10s
"""
}

View file

@ -21,6 +21,7 @@ object EventsByPersistenceIdSpec {
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec"
akka.test.single-expect-default = 10s
"""
}
@ -67,6 +68,24 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
.expectNext("b-1", "b-2")
.expectComplete()
}
"not see new events after demand request" in {
val ref = setup("f")
val src = queries.query(EventsByPersistenceId("f", 0L, Long.MaxValue), NoRefresh)
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("f-1", "f-2")
.expectNoMsg(100.millis)
ref ! "f-4"
expectMsg("f-4-done")
probe
.expectNoMsg(100.millis)
.request(5)
.expectNext("f-3")
.expectComplete() // f-4 not seen
}
}
"Leveldb live query EventsByPersistenceId" must {
@ -95,6 +114,24 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
probe.expectNext("d-4").expectComplete()
}
"find new events after demand request" in {
val ref = setup("e")
val src = queries.query(EventsByPersistenceId("e", 0L, Long.MaxValue), refreshInterval)
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("e-1", "e-2")
.expectNoMsg(100.millis)
ref ! "e-4"
expectMsg("e-4-done")
probe
.expectNoMsg(100.millis)
.request(5)
.expectNext("e-3")
.expectNext("e-4")
}
}
}

View file

@ -0,0 +1,163 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.RefreshInterval
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.persistence.query.NoRefresh
import akka.testkit.AkkaSpec
import akka.persistence.query.EventsByTag
import akka.persistence.journal.leveldb.Tagged
import akka.persistence.journal.EventSeq
import akka.persistence.journal.EventAdapter
import akka.persistence.query.EventEnvelope
object EventsByTagSpec {
val config = """
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb {
dir = "target/journal-EventsByTagSpec"
event-adapters {
color-tagger = akka.persistence.query.journal.leveldb.ColorTagger
}
event-adapter-bindings = {
"java.lang.String" = color-tagger
}
}
akka.test.single-expect-default = 10s
"""
}
class ColorTagger extends EventAdapter {
val colors = Set("green", "black", "blue")
override def toJournal(event: Any): Any = event match {
case s: String
var tags = colors.foldLeft(Set.empty[String])((acc, c) if (s.contains(c)) acc + c else acc)
if (tags.isEmpty) event
else Tagged(event, tags)
case _ event
}
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
override def manifest(event: Any): String = ""
}
class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
with Cleanup with ImplicitSender {
import EventsByTagSpec._
implicit val mat = ActorMaterializer()(system)
val refreshInterval = RefreshInterval(1.second)
val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier)
"Leveldb query EventsByTag" must {
"find existing events" in {
val a = system.actorOf(TestActor.props("a"))
val b = system.actorOf(TestActor.props("b"))
a ! "hello"
expectMsg(s"hello-done")
a ! "a green apple"
expectMsg(s"a green apple-done")
b ! "a black car"
expectMsg(s"a black car-done")
a ! "a green banana"
expectMsg(s"a green banana-done")
b ! "a green leaf"
expectMsg(s"a green leaf-done")
val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh)
greenSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "a", 2L, "a green apple"))
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNoMsg(500.millis)
.request(2)
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectComplete()
val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), NoRefresh)
blackSrc.runWith(TestSink.probe[Any])
.request(5)
.expectNext(EventEnvelope(1L, "b", 1L, "a black car"))
.expectComplete()
}
"not see new events after demand request" in {
val c = system.actorOf(TestActor.props("c"))
val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh)
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "a", 2L, "a green apple"))
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNoMsg(100.millis)
c ! "a green cucumber"
expectMsg(s"a green cucumber-done")
probe
.expectNoMsg(100.millis)
.request(5)
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectComplete() // green cucumber not seen
}
"find events from offset" in {
val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L), NoRefresh)
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber"))
.expectComplete()
}
}
"Leveldb live query EventsByTag" must {
"find new events" in {
val d = system.actorOf(TestActor.props("d"))
val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), refreshInterval)
val probe = blackSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(1L, "b", 1L, "a black car"))
.expectNoMsg(100.millis)
d ! "a black dog"
expectMsg(s"a black dog-done")
d ! "a black night"
expectMsg(s"a black night-done")
probe
.expectNext(EventEnvelope(2L, "d", 1L, "a black dog"))
.expectNoMsg(100.millis)
.request(10)
.expectNext(EventEnvelope(3L, "d", 2L, "a black night"))
}
"find events from offset" in {
val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(2L, "a", 3L, "a green banana"))
.expectNext(EventEnvelope(3L, "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber"))
.expectNoMsg(100.millis)
}
}
}