From 8f723deda1b8eced72b617b139c37c1b66ed8984 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Aug 2015 11:45:24 +0200 Subject: [PATCH] +per #18192 leveldb impl of EventsByTag query * also refactoring of EventsByPersistenceIdPublisher * increase test timeouts --- .../leveldb/AllPersistenceIdsPublisher.scala | 5 +- .../EventsByPersistenceIdPublisher.scala | 187 +++++++++++------ .../leveldb/EventsByTagPublisher.scala | 193 ++++++++++++++++++ .../journal/leveldb/LeveldbReadJournal.scala | 12 ++ .../leveldb/AllPersistenceIdsSpec.scala | 1 + .../leveldb/EventsByPersistenceIdSpec.scala | 37 ++++ .../journal/leveldb/EventsByTagSpec.scala | 163 +++++++++++++++ .../journal/leveldb/LeveldbJournal.scala | 55 ++++- .../journal/leveldb/LeveldbRecovery.scala | 34 ++- .../journal/leveldb/LeveldbStore.scala | 67 +++++- .../persistence/journal/leveldb/Tagged.scala | 14 ++ 11 files changed, 688 insertions(+), 80 deletions(-) create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala create mode 100644 akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala index 0cbedea47c..f48b9ef5db 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala @@ -23,7 +23,10 @@ private[akka] object AllPersistenceIdsPublisher { private case object Continue } -class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String) +/** + * INTERNAL API + */ +private[akka] class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String) extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging { import AllPersistenceIdsPublisher._ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala index 76b3789689..2e72f62672 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala @@ -20,16 +20,29 @@ import akka.persistence.query.EventEnvelope */ private[akka] object EventsByPersistenceIdPublisher { def props(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, refreshInterval: Option[FiniteDuration], - maxBufSize: Int, writeJournalPluginId: String): Props = - Props(new EventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, refreshInterval, - maxBufSize, writeJournalPluginId)) + maxBufSize: Int, writeJournalPluginId: String): Props = { + refreshInterval match { + case Some(interval) ⇒ + Props(new LiveEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, interval, + maxBufSize, writeJournalPluginId)) + case None ⇒ + Props(new CurrentEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, + maxBufSize, writeJournalPluginId)) + } + } - private case object Continue + /** + * INTERNAL API + */ + private[akka] case object Continue } -class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - refreshInterval: Option[FiniteDuration], - maxBufSize: Int, writeJournalPluginId: String) +/** + * INTERNAL API + */ +private[akka] abstract class AbstractEventsByPersistenceIdPublisher( + val persistenceId: String, val fromSequenceNr: Long, + val maxBufSize: Int, val writeJournalPluginId: String) extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging { import EventsByPersistenceIdPublisher._ @@ -37,48 +50,34 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long var currSeqNo = fromSequenceNr - val tickTask = refreshInterval.map { interval ⇒ - import context.dispatcher - context.system.scheduler.schedule(interval, interval, self, Continue) - } - - def nonLiveQuery: Boolean = refreshInterval.isEmpty - - override def postStop(): Unit = { - tickTask.foreach(_.cancel()) - } + def toSequenceNr: Long def receive = init def init: Receive = { - case _: Request ⇒ - journal ! LeveldbJournal.SubscribePersistenceId(persistenceId) - replay() - case Continue ⇒ // skip, wait for first Request - case Cancel ⇒ context.stop(self) + case _: Request ⇒ receiveInitialRequest() + case Continue ⇒ // skip, wait for first Request + case Cancel ⇒ context.stop(self) } + def receiveInitialRequest(): Unit + def idle: Receive = { case Continue | _: LeveldbJournal.EventAppended ⇒ if (timeForReplay) replay() case _: Request ⇒ - deliverBuf() - if (nonLiveQuery) { - if (buf.isEmpty) - onCompleteThenStop() - else - self ! Continue - } + receiveIdleRequest() case Cancel ⇒ context.stop(self) - } + def receiveIdleRequest(): Unit + def timeForReplay: Boolean = - buf.isEmpty || buf.size <= maxBufSize / 2 + (buf.isEmpty || buf.size <= maxBufSize / 2) && (currSeqNo <= toSequenceNr) def replay(): Unit = { val limit = maxBufSize - buf.size @@ -88,45 +87,105 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long } def replaying(limit: Int): Receive = { - var replayCount = 0 + case ReplayedMessage(p) ⇒ + buf :+= EventEnvelope( + offset = p.sequenceNr, + persistenceId = persistenceId, + sequenceNr = p.sequenceNr, + event = p.payload) + currSeqNo = p.sequenceNr + 1 + deliverBuf() - { - case ReplayedMessage(p) ⇒ - buf :+= EventEnvelope( - offset = p.sequenceNr, - persistenceId = persistenceId, - sequenceNr = p.sequenceNr, - event = p.payload) - currSeqNo = p.sequenceNr + 1 - replayCount += 1 - deliverBuf() + case RecoverySuccess(highestSeqNr) ⇒ + log.debug("replay completed for persistenceId [{}], currSeqNo [{}]", persistenceId, currSeqNo) + receiveRecoverySuccess(highestSeqNr) - case _: RecoverySuccess ⇒ - log.debug("replay completed for persistenceId [{}], currSeqNo [{}], replayCount [{}]", persistenceId, currSeqNo, replayCount) - deliverBuf() - if (buf.isEmpty && currSeqNo > toSequenceNr) - onCompleteThenStop() - else if (nonLiveQuery) { - if (buf.isEmpty && replayCount < limit) - onCompleteThenStop() - else - self ! Continue // more to fetch - } - context.become(idle) + case ReplayMessagesFailure(cause) ⇒ + log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage) + deliverBuf() + onErrorThenStop(cause) - case ReplayMessagesFailure(cause) ⇒ - log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage) - deliverBuf() - onErrorThenStop(cause) + case _: Request ⇒ + deliverBuf() - case _: Request ⇒ - deliverBuf() + case Continue | _: LeveldbJournal.EventAppended ⇒ // skip during replay - case Continue | _: LeveldbJournal.EventAppended ⇒ // skip during replay + case Cancel ⇒ + context.stop(self) + } - case Cancel ⇒ - context.stop(self) - } + def receiveRecoverySuccess(highestSeqNr: Long): Unit +} + +/** + * INTERNAL API + */ +private[akka] class LiveEventsByPersistenceIdPublisher( + persistenceId: String, fromSequenceNr: Long, override val toSequenceNr: Long, + refreshInterval: FiniteDuration, + maxBufSize: Int, writeJournalPluginId: String) + extends AbstractEventsByPersistenceIdPublisher( + persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) { + import EventsByPersistenceIdPublisher._ + + val tickTask = + context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher) + + override def postStop(): Unit = + tickTask.cancel() + + override def receiveInitialRequest(): Unit = { + journal ! LeveldbJournal.SubscribePersistenceId(persistenceId) + replay() + } + + override def receiveIdleRequest(): Unit = { + deliverBuf() + if (buf.isEmpty && currSeqNo > toSequenceNr) + onCompleteThenStop() + } + + override def receiveRecoverySuccess(highestSeqNr: Long): Unit = { + deliverBuf() + if (buf.isEmpty && currSeqNo > toSequenceNr) + onCompleteThenStop() + context.become(idle) + } + +} + +/** + * INTERNAL API + */ +private[akka] class CurrentEventsByPersistenceIdPublisher( + persistenceId: String, fromSequenceNr: Long, var toSeqNr: Long, + maxBufSize: Int, writeJournalPluginId: String) + extends AbstractEventsByPersistenceIdPublisher( + persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) { + import EventsByPersistenceIdPublisher._ + + override def toSequenceNr: Long = toSeqNr + + override def receiveInitialRequest(): Unit = + replay() + + override def receiveIdleRequest(): Unit = { + deliverBuf() + if (buf.isEmpty && currSeqNo > toSequenceNr) + onCompleteThenStop() + else + self ! Continue + } + + override def receiveRecoverySuccess(highestSeqNr: Long): Unit = { + deliverBuf() + if (highestSeqNr < toSequenceNr) + toSeqNr = highestSeqNr + if (highestSeqNr == 0L || (buf.isEmpty && currSeqNo > toSequenceNr)) + onCompleteThenStop() + else + self ! Continue // more to fetch + context.become(idle) } } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala new file mode 100644 index 0000000000..acf314d3a1 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala @@ -0,0 +1,193 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.persistence.JournalProtocol._ +import akka.persistence.Persistence +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.Cancel +import akka.stream.actor.ActorPublisherMessage.Request +import akka.persistence.journal.leveldb.LeveldbJournal +import akka.persistence.query.EventEnvelope +import akka.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages +import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage + +/** + * INTERNAL API + */ +private[akka] object EventsByTagPublisher { + def props(tag: String, fromOffset: Long, toOffset: Long, refreshInterval: Option[FiniteDuration], + maxBufSize: Int, writeJournalPluginId: String): Props = { + refreshInterval match { + case Some(interval) ⇒ + Props(new LiveEventsByTagPublisher(tag, fromOffset, toOffset, interval, + maxBufSize, writeJournalPluginId)) + case None ⇒ + Props(new CurrentEventsByTagPublisher(tag, fromOffset, toOffset, + maxBufSize, writeJournalPluginId)) + } + } + + /** + * INTERNAL API + */ + private[akka] case object Continue +} + +/** + * INTERNAL API + */ +private[akka] abstract class AbstractEventsByTagPublisher( + val tag: String, val fromOffset: Long, + val maxBufSize: Int, val writeJournalPluginId: String) + extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging { + import EventsByTagPublisher._ + + val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId) + + var currOffset = fromOffset + + def toOffset: Long + + def receive = init + + def init: Receive = { + case _: Request ⇒ receiveInitialRequest() + case Continue ⇒ // skip, wait for first Request + case Cancel ⇒ context.stop(self) + } + + def receiveInitialRequest(): Unit + + def idle: Receive = { + case Continue | _: LeveldbJournal.TaggedEventAppended ⇒ + if (timeForReplay) + replay() + + case _: Request ⇒ + receiveIdleRequest() + + case Cancel ⇒ + context.stop(self) + } + + def receiveIdleRequest(): Unit + + def timeForReplay: Boolean = + (buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset <= toOffset) + + def replay(): Unit = { + val limit = maxBufSize - buf.size + log.debug("request replay for tag [{}] from [{}] to [{}] limit [{}]", tag, currOffset, toOffset, limit) + journal ! ReplayTaggedMessages(currOffset, toOffset, limit, tag, self) + context.become(replaying(limit)) + } + + def replaying(limit: Int): Receive = { + case ReplayedTaggedMessage(p, _, offset) ⇒ + buf :+= EventEnvelope( + offset = offset, + persistenceId = p.persistenceId, + sequenceNr = p.sequenceNr, + event = p.payload) + currOffset = offset + 1 + deliverBuf() + + case RecoverySuccess(highestSeqNr) ⇒ + log.debug("replay completed for tag [{}], currOffset [{}]", tag, currOffset) + receiveRecoverySuccess(highestSeqNr) + + case ReplayMessagesFailure(cause) ⇒ + log.debug("replay failed for tag [{}], due to [{}]", tag, cause.getMessage) + deliverBuf() + onErrorThenStop(cause) + + case _: Request ⇒ + deliverBuf() + + case Continue | _: LeveldbJournal.TaggedEventAppended ⇒ // skip during replay + + case Cancel ⇒ + context.stop(self) + } + + def receiveRecoverySuccess(highestSeqNr: Long): Unit +} + +/** + * INTERNAL API + */ +private[akka] class LiveEventsByTagPublisher( + tag: String, fromOffset: Long, override val toOffset: Long, + refreshInterval: FiniteDuration, + maxBufSize: Int, writeJournalPluginId: String) + extends AbstractEventsByTagPublisher( + tag, fromOffset, maxBufSize, writeJournalPluginId) { + import EventsByTagPublisher._ + + val tickTask = + context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher) + + override def postStop(): Unit = + tickTask.cancel() + + override def receiveInitialRequest(): Unit = { + journal ! LeveldbJournal.SubscribeTag(tag) + replay() + } + + override def receiveIdleRequest(): Unit = { + deliverBuf() + if (buf.isEmpty && currOffset > toOffset) + onCompleteThenStop() + } + + override def receiveRecoverySuccess(highestSeqNr: Long): Unit = { + deliverBuf() + if (buf.isEmpty && currOffset > toOffset) + onCompleteThenStop() + context.become(idle) + } + +} + +/** + * INTERNAL API + */ +private[akka] class CurrentEventsByTagPublisher( + tag: String, fromOffset: Long, var _toOffset: Long, + maxBufSize: Int, writeJournalPluginId: String) + extends AbstractEventsByTagPublisher( + tag, fromOffset, maxBufSize, writeJournalPluginId) { + import EventsByTagPublisher._ + + override def toOffset: Long = _toOffset + + override def receiveInitialRequest(): Unit = + replay() + + override def receiveIdleRequest(): Unit = { + deliverBuf() + if (buf.isEmpty && currOffset > toOffset) + onCompleteThenStop() + else + self ! Continue + } + + override def receiveRecoverySuccess(highestSeqNr: Long): Unit = { + deliverBuf() + if (highestSeqNr < toOffset) + _toOffset = highestSeqNr + if (highestSeqNr == 0L || (buf.isEmpty && currOffset > toOffset)) + onCompleteThenStop() + else + self ! Continue // more to fetch + context.become(idle) + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala index ea727d3391..5695747e31 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala @@ -17,6 +17,9 @@ import akka.persistence.query.RefreshInterval import com.typesafe.config.Config import akka.persistence.query.EventEnvelope import akka.persistence.query.AllPersistenceIds +import akka.persistence.query.EventsByTag +import akka.util.ByteString +import java.net.URLEncoder object LeveldbReadJournal { final val Identifier = "akka.persistence.query.journal.leveldb" @@ -33,6 +36,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { case EventsByPersistenceId(pid, from, to) ⇒ eventsByPersistenceId(pid, from, to, hints) case AllPersistenceIds ⇒ allPersistenceIds(hints) + case EventsByTag(tag, offset) ⇒ eventsByTag(tag, offset, hints) case unknown ⇒ unsupportedQueryType(unknown) } @@ -43,12 +47,20 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc } def allPersistenceIds(hints: Seq[Hint]): Source[String, Unit] = { + // no polling for this query, the write journal will push all changes, but + // we still use the `NoRefresh` hint as user API val liveQuery = refreshInterval(hints).isDefined Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery, maxBufSize, writeJournalPluginId)) .mapMaterializedValue(_ ⇒ ()) .named("allPersistenceIds") } + def eventsByTag(tag: String, offset: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, + refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + } + private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] = if (hints.contains(NoRefresh)) None diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala index 0330e84f11..e27758a31c 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala @@ -22,6 +22,7 @@ object AllPersistenceIdsSpec { akka.loglevel = INFO akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.leveldb.dir = "target/journal-AllPersistenceIdsSpec" + akka.test.single-expect-default = 10s """ } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 5a57844b60..03f4327594 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -21,6 +21,7 @@ object EventsByPersistenceIdSpec { akka.loglevel = INFO akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec" + akka.test.single-expect-default = 10s """ } @@ -67,6 +68,24 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi .expectNext("b-1", "b-2") .expectComplete() } + + "not see new events after demand request" in { + val ref = setup("f") + val src = queries.query(EventsByPersistenceId("f", 0L, Long.MaxValue), NoRefresh) + val probe = src.map(_.event).runWith(TestSink.probe[Any]) + .request(2) + .expectNext("f-1", "f-2") + .expectNoMsg(100.millis) + + ref ! "f-4" + expectMsg("f-4-done") + + probe + .expectNoMsg(100.millis) + .request(5) + .expectNext("f-3") + .expectComplete() // f-4 not seen + } } "Leveldb live query EventsByPersistenceId" must { @@ -95,6 +114,24 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi probe.expectNext("d-4").expectComplete() } + + "find new events after demand request" in { + val ref = setup("e") + val src = queries.query(EventsByPersistenceId("e", 0L, Long.MaxValue), refreshInterval) + val probe = src.map(_.event).runWith(TestSink.probe[Any]) + .request(2) + .expectNext("e-1", "e-2") + .expectNoMsg(100.millis) + + ref ! "e-4" + expectMsg("e-4-done") + + probe + .expectNoMsg(100.millis) + .request(5) + .expectNext("e-3") + .expectNext("e-4") + } } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala new file mode 100644 index 0000000000..fc8802371d --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.RefreshInterval +import akka.stream.ActorMaterializer +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.persistence.query.NoRefresh +import akka.testkit.AkkaSpec +import akka.persistence.query.EventsByTag +import akka.persistence.journal.leveldb.Tagged +import akka.persistence.journal.EventSeq +import akka.persistence.journal.EventAdapter +import akka.persistence.query.EventEnvelope + +object EventsByTagSpec { + val config = """ + akka.loglevel = INFO + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.journal.leveldb { + dir = "target/journal-EventsByTagSpec" + event-adapters { + color-tagger = akka.persistence.query.journal.leveldb.ColorTagger + } + event-adapter-bindings = { + "java.lang.String" = color-tagger + } + } + akka.test.single-expect-default = 10s + """ + +} + +class ColorTagger extends EventAdapter { + val colors = Set("green", "black", "blue") + override def toJournal(event: Any): Any = event match { + case s: String ⇒ + var tags = colors.foldLeft(Set.empty[String])((acc, c) ⇒ if (s.contains(c)) acc + c else acc) + if (tags.isEmpty) event + else Tagged(event, tags) + case _ ⇒ event + } + + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) + + override def manifest(event: Any): String = "" +} + +class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) + with Cleanup with ImplicitSender { + import EventsByTagSpec._ + + implicit val mat = ActorMaterializer()(system) + + val refreshInterval = RefreshInterval(1.second) + + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + "Leveldb query EventsByTag" must { + "find existing events" in { + val a = system.actorOf(TestActor.props("a")) + val b = system.actorOf(TestActor.props("b")) + a ! "hello" + expectMsg(s"hello-done") + a ! "a green apple" + expectMsg(s"a green apple-done") + b ! "a black car" + expectMsg(s"a black car-done") + a ! "a green banana" + expectMsg(s"a green banana-done") + b ! "a green leaf" + expectMsg(s"a green leaf-done") + + val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh) + greenSrc.runWith(TestSink.probe[Any]) + .request(2) + .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) + .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNoMsg(500.millis) + .request(2) + .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectComplete() + + val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), NoRefresh) + blackSrc.runWith(TestSink.probe[Any]) + .request(5) + .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) + .expectComplete() + } + + "not see new events after demand request" in { + val c = system.actorOf(TestActor.props("c")) + + val greenSrc = queries.query(EventsByTag(tag = "green", offset = 0L), NoRefresh) + val probe = greenSrc.runWith(TestSink.probe[Any]) + .request(2) + .expectNext(EventEnvelope(1L, "a", 2L, "a green apple")) + .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNoMsg(100.millis) + + c ! "a green cucumber" + expectMsg(s"a green cucumber-done") + + probe + .expectNoMsg(100.millis) + .request(5) + .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectComplete() // green cucumber not seen + } + + "find events from offset" in { + val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L), NoRefresh) + val probe = greenSrc.runWith(TestSink.probe[Any]) + .request(10) + .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber")) + .expectComplete() + } + } + + "Leveldb live query EventsByTag" must { + "find new events" in { + val d = system.actorOf(TestActor.props("d")) + + val blackSrc = queries.query(EventsByTag(tag = "black", offset = 0L), refreshInterval) + val probe = blackSrc.runWith(TestSink.probe[Any]) + .request(2) + .expectNext(EventEnvelope(1L, "b", 1L, "a black car")) + .expectNoMsg(100.millis) + + d ! "a black dog" + expectMsg(s"a black dog-done") + d ! "a black night" + expectMsg(s"a black night-done") + + probe + .expectNext(EventEnvelope(2L, "d", 1L, "a black dog")) + .expectNoMsg(100.millis) + .request(10) + .expectNext(EventEnvelope(3L, "d", 2L, "a black night")) + } + + "find events from offset" in { + val greenSrc = queries.query(EventsByTag(tag = "green", offset = 2L)) + val probe = greenSrc.runWith(TestSink.probe[Any]) + .request(10) + .expectNext(EventEnvelope(2L, "a", 3L, "a green banana")) + .expectNext(EventEnvelope(3L, "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(4L, "c", 1L, "a green cucumber")) + .expectNoMsg(100.millis) + } + + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 446b99b8ea..d37ab72d3c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -5,12 +5,16 @@ package akka.persistence.journal.leveldb import scala.concurrent.duration._ import scala.language.postfixOps - import akka.actor._ import akka.persistence.Persistence import akka.persistence.journal._ import akka.util.Timeout import akka.util.Helpers.ConfigOps +import akka.persistence.PersistentRepr +import scala.concurrent.Future +import akka.persistence.JournalProtocol.RecoverySuccess +import akka.persistence.JournalProtocol.ReplayMessagesFailure +import akka.pattern.pipe /** * INTERNAL API. @@ -21,12 +25,36 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi import LeveldbJournal._ override def receivePluginInternal: Receive = { + case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒ + import context.dispatcher + asyncReadHighestSequenceNr(tagAsPersistenceId(tag), fromSequenceNr) + .flatMap { highSeqNr ⇒ + val toSeqNr = math.min(toSequenceNr, highSeqNr) + if (highSeqNr == 0L || fromSequenceNr > toSeqNr) + Future.successful(highSeqNr) + else { + asyncReplayTaggedMessages(tag, fromSequenceNr, toSeqNr, max) { + case ReplayedTaggedMessage(p, tag, offset) ⇒ + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + replyTo.tell(ReplayedTaggedMessage(adaptedPersistentRepr, tag, offset), Actor.noSender) + } + }.map(_ ⇒ highSeqNr) + } + }.map { + highSeqNr ⇒ RecoverySuccess(highSeqNr) + }.recover { + case e ⇒ ReplayMessagesFailure(e) + }.pipeTo(replyTo) + case SubscribePersistenceId(persistenceId: String) ⇒ addPersistenceIdSubscriber(sender(), persistenceId) context.watch(sender()) case SubscribeAllPersistenceIds ⇒ addAllPersistenceIdsSubscriber(sender()) context.watch(sender()) + case SubscribeTag(tag: String) ⇒ + addTagSubscriber(sender(), tag) + context.watch(sender()) case Terminated(ref) ⇒ removeSubscriber(ref) } @@ -43,18 +71,31 @@ private[persistence] object LeveldbJournal { * Used by query-side. The journal will send [[EventAppended]] messages to * the subscriber when `asyncWriteMessages` has been called. */ - case class SubscribePersistenceId(persistenceId: String) extends SubscriptionCommand - case class EventAppended(persistenceId: String) extends DeadLetterSuppression + final case class SubscribePersistenceId(persistenceId: String) extends SubscriptionCommand + final case class EventAppended(persistenceId: String) extends DeadLetterSuppression /** - * Subscribe the `sender` to changes (appended events) for a specific `persistenceId`. + * Subscribe the `sender` to current and new persistenceIds. * Used by query-side. The journal will send one [[CurrentPersistenceIds]] to the * subscriber followed by [[PersistenceIdAdded]] messages when new persistenceIds * are created. */ - case object SubscribeAllPersistenceIds extends SubscriptionCommand - case class CurrentPersistenceIds(allPersistenceIds: Set[String]) extends DeadLetterSuppression - case class PersistenceIdAdded(persistenceId: String) extends DeadLetterSuppression + final case object SubscribeAllPersistenceIds extends SubscriptionCommand + final case class CurrentPersistenceIds(allPersistenceIds: Set[String]) extends DeadLetterSuppression + final case class PersistenceIdAdded(persistenceId: String) extends DeadLetterSuppression + + /** + * Subscribe the `sender` to changes (appended events) for a specific `tag`. + * Used by query-side. The journal will send [[TaggedEventAppended]] messages to + * the subscriber when `asyncWriteMessages` has been called. + */ + final case class SubscribeTag(tag: String) extends SubscriptionCommand + final case class TaggedEventAppended(tag: String) extends DeadLetterSuppression + + final case class ReplayTaggedMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, + tag: String, replyTo: ActorRef) extends SubscriptionCommand + final case class ReplayedTaggedMessage(persistent: PersistentRepr, tag: String, offset: Long) + extends DeadLetterSuppression with NoSerializationVerificationNeeded } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala index 1ac1af8bd6..f7e85259fd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala @@ -6,10 +6,10 @@ package akka.persistence.journal.leveldb import scala.concurrent.Future - import akka.persistence._ import akka.persistence.journal.AsyncRecovery import org.iq80.leveldb.DBIterator +import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage /** * INTERNAL API. @@ -74,6 +74,38 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } } + def asyncReplayTaggedMessages(tag: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: ReplayedTaggedMessage ⇒ Unit): Future[Unit] = { + val tagNid = tagNumericId(tag) + Future(replayTaggedMessages(tag, tagNid, fromSequenceNr: Long, toSequenceNr, max: Long)(replayCallback))(replayDispatcher) + } + + def replayTaggedMessages(tag: String, tagNid: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + replayCallback: ReplayedTaggedMessage ⇒ Unit): Unit = { + + @scala.annotation.tailrec + def go(iter: DBIterator, key: Key, ctr: Long, replayCallback: ReplayedTaggedMessage ⇒ Unit) { + if (iter.hasNext) { + val nextEntry = iter.next() + val nextKey = keyFromBytes(nextEntry.getKey) + if (nextKey.sequenceNr > toSequenceNr) { + // end iteration here + } else if (key.persistenceId == nextKey.persistenceId) { + val msg = persistentFromBytes(nextEntry.getValue) + if (ctr < max) { + replayCallback(ReplayedTaggedMessage(msg, tag, nextKey.sequenceNr)) + go(iter, nextKey, ctr + 1L, replayCallback) + } + } + } + } + + withIterator { iter ⇒ + val startKey = Key(tagNid, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) + iter.seek(keyToBytes(startKey)) + go(iter, startKey, 0L, replayCallback) + } + } + def readHighestSequenceNr(persistenceId: Int) = { val ro = leveldbSnapshot() try { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index a27ebe8136..9766de9715 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -34,8 +34,12 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with var leveldb: DB = _ private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] + private val tagSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] private var allPersistenceIdsSubscribers = Set.empty[ActorRef] + private var tagSequenceNr = Map.empty[String, Long] + private val tagPersistenceIdPrefix = "$$$" + def leveldbFactory = if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory else org.iq80.leveldb.impl.Iq80DBFactory.factory @@ -46,22 +50,34 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { var persistenceIds = Set.empty[String] - val hasSubscribers = hasPersistenceIdSubscribers + var allTags = Set.empty[String] + val result = Future.fromTry(Try { withBatch(batch ⇒ messages.map { a ⇒ Try { - a.payload.foreach(message ⇒ addToMessageBatch(message, batch)) - if (hasSubscribers) + a.payload.foreach { p ⇒ + val (p2, tags) = p.payload match { + case Tagged(payload, tags) ⇒ + (p.withPayload(payload), tags) + case _ ⇒ (p, Set.empty[String]) + } + if (tags.nonEmpty && hasTagSubscribers) + allTags ++= tags + addToMessageBatch(p2, tags, batch) + } + if (hasPersistenceIdSubscribers) persistenceIds += a.persistenceId } }) }) - if (hasSubscribers) { + if (hasPersistenceIdSubscribers) { persistenceIds.foreach { pid ⇒ notifyPersistenceIdChange(pid) } } + if (hasTagSubscribers && allTags.nonEmpty) + allTags.foreach(notifyTagChange) result } @@ -112,12 +128,35 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get - private def addToMessageBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { + private def addToMessageBatch(persistent: PersistentRepr, tags: Set[String], batch: WriteBatch): Unit = { + val persistentBytes = persistentToBytes(persistent) val nid = numericId(persistent.persistenceId) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) - batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) + batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentBytes) + + tags.foreach { tag ⇒ + val tagNid = tagNumericId(tag) + val tagSeqNr = nextTagSequenceNr(tag) + batch.put(keyToBytes(counterKey(tagNid)), counterToBytes(tagSeqNr)) + batch.put(keyToBytes(Key(tagNid, tagSeqNr, 0)), persistentBytes) + } } + private def nextTagSequenceNr(tag: String): Long = { + val n = tagSequenceNr.get(tag) match { + case Some(n) ⇒ n + case None ⇒ readHighestSequenceNr(tagNumericId(tag)) + } + tagSequenceNr = tagSequenceNr.updated(tag, n + 1) + n + 1 + } + + def tagNumericId(tag: String): Int = + numericId(tagAsPersistenceId(tag)) + + def tagAsPersistenceId(tag: String): String = + tagPersistenceIdPrefix + tag + override def preStart() { leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE)) super.preStart() @@ -137,9 +176,17 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with val keys = persistenceIdSubscribers.collect { case (k, s) if s.contains(subscriber) ⇒ k } keys.foreach { key ⇒ persistenceIdSubscribers.removeBinding(key, subscriber) } + val tagKeys = tagSubscribers.collect { case (k, s) if s.contains(subscriber) ⇒ k } + tagKeys.foreach { key ⇒ tagSubscribers.removeBinding(key, subscriber) } + allPersistenceIdsSubscribers -= subscriber } + protected def hasTagSubscribers: Boolean = tagSubscribers.nonEmpty + + protected def addTagSubscriber(subscriber: ActorRef, tag: String): Unit = + tagSubscribers.addBinding(tag, subscriber) + protected def hasAllPersistenceIdsSubscribers: Boolean = allPersistenceIdsSubscribers.nonEmpty protected def addAllPersistenceIdsSubscriber(subscriber: ActorRef): Unit = { @@ -153,8 +200,14 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with persistenceIdSubscribers(persistenceId).foreach(_ ! changed) } + private def notifyTagChange(tag: String): Unit = + if (tagSubscribers.contains(tag)) { + val changed = LeveldbJournal.TaggedEventAppended(tag) + tagSubscribers(tag).foreach(_ ! changed) + } + override protected def newPersistenceIdAdded(id: String): Unit = { - if (hasAllPersistenceIdsSubscribers) { + if (hasAllPersistenceIdsSubscribers && !id.startsWith(tagPersistenceIdPrefix)) { val added = LeveldbJournal.PersistenceIdAdded(id) allPersistenceIdsSubscribers.foreach(_ ! added) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala new file mode 100644 index 0000000000..94e8274ba8 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/Tagged.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.journal.leveldb + +/** + * The LevelDB journal supports tagging of events that are used + * by the `EventsByTag` query. To specify the tags you create an + * [[akka.persistence.journal.EventAdapter]] that wraps the events + * in a `Tagged` with the given `tags`. + * + * The journal will unwrap the event and store the `payload`. + */ +case class Tagged(payload: Any, tags: Set[String])