diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala new file mode 100644 index 0000000000..0cbedea47c --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.persistence.JournalProtocol._ +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.LeveldbJournal +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.Cancel +import akka.stream.actor.ActorPublisherMessage.Request + +/** + * INTERNAL API + */ +private[akka] object AllPersistenceIdsPublisher { + def props(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String): Props = + Props(new AllPersistenceIdsPublisher(liveQuery, maxBufSize, writeJournalPluginId)) + + private case object Continue +} + +class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String) + extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging { + import AllPersistenceIdsPublisher._ + + val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId) + + def receive = init + + def init: Receive = { + case _: Request ⇒ + journal ! LeveldbJournal.SubscribeAllPersistenceIds + context.become(active) + case Cancel ⇒ context.stop(self) + } + + def active: Receive = { + case LeveldbJournal.CurrentPersistenceIds(allPersistenceIds) ⇒ + buf ++= allPersistenceIds + deliverBuf() + if (!liveQuery && buf.isEmpty) + onCompleteThenStop() + + case LeveldbJournal.PersistenceIdAdded(persistenceId) ⇒ + if (liveQuery) { + buf :+= persistenceId + deliverBuf() + } + + case _: Request ⇒ + deliverBuf() + if (!liveQuery && buf.isEmpty) + onCompleteThenStop() + + case Cancel ⇒ context.stop(self) + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala index 94a186c10b..76b3789689 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala @@ -59,7 +59,7 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long } def idle: Receive = { - case Continue | _: LeveldbJournal.ChangedPersistenceId ⇒ + case Continue | _: LeveldbJournal.EventAppended ⇒ if (timeForReplay) replay() @@ -122,7 +122,7 @@ class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long case _: Request ⇒ deliverBuf() - case Continue | _: LeveldbJournal.ChangedPersistenceId ⇒ // skip during replay + case Continue | _: LeveldbJournal.EventAppended ⇒ // skip during replay case Cancel ⇒ context.stop(self) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala index aee9535418..ea727d3391 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala @@ -16,6 +16,7 @@ import akka.persistence.query.NoRefresh import akka.persistence.query.RefreshInterval import com.typesafe.config.Config import akka.persistence.query.EventEnvelope +import akka.persistence.query.AllPersistenceIds object LeveldbReadJournal { final val Identifier = "akka.persistence.query.journal.leveldb" @@ -31,12 +32,21 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends sc override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { case EventsByPersistenceId(pid, from, to) ⇒ eventsByPersistenceId(pid, from, to, hints) + case AllPersistenceIds ⇒ allPersistenceIds(hints) case unknown ⇒ unsupportedQueryType(unknown) } def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = { Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr, refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + .named("eventsByPersistenceId-" + persistenceId) + } + + def allPersistenceIds(hints: Seq[Hint]): Source[String, Unit] = { + val liveQuery = refreshInterval(hints).isDefined + Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery, maxBufSize, writeJournalPluginId)) + .mapMaterializedValue(_ ⇒ ()) + .named("allPersistenceIds") } private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] = diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala new file mode 100644 index 0000000000..0330e84f11 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.persistence.query.EventsByPersistenceId +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.RefreshInterval +import akka.stream.ActorMaterializer +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.persistence.query.NoRefresh +import akka.testkit.AkkaSpec +import akka.persistence.query.AllPersistenceIds + +object AllPersistenceIdsSpec { + val config = """ + akka.loglevel = INFO + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.journal.leveldb.dir = "target/journal-AllPersistenceIdsSpec" + """ +} + +class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) + with Cleanup with ImplicitSender { + import AllPersistenceIdsSpec._ + + implicit val mat = ActorMaterializer()(system) + + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + "Leveldb query AllPersistenceIds" must { + "find existing persistenceIds" in { + system.actorOf(TestActor.props("a")) ! "a1" + expectMsg("a1-done") + system.actorOf(TestActor.props("b")) ! "b1" + expectMsg("b1-done") + system.actorOf(TestActor.props("c")) ! "c1" + expectMsg("c1-done") + + val src = queries.query(AllPersistenceIds, NoRefresh) + src.runWith(TestSink.probe[String]) + .request(5) + .expectNextUnordered("a", "b", "c") + .expectComplete() + } + + "find new persistenceIds" in { + // a, b, c created by previous step + system.actorOf(TestActor.props("d")) ! "d1" + expectMsg("d1-done") + + val src = queries.query(AllPersistenceIds) + val probe = src.runWith(TestSink.probe[String]) + .request(5) + .expectNextUnorderedN(List("a", "b", "c", "d")) + + system.actorOf(TestActor.props("e")) ! "e1" + probe.expectNext("e") + + val more = (1 to 100).map("f" + _) + more.foreach { p ⇒ + system.actorOf(TestActor.props(p)) ! p + } + + probe.request(100) + probe.expectNextUnorderedN(more) + + } + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index 6fbec39b0a..d7fc2aa41f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -29,6 +29,10 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore case Some(v) ⇒ v } + def isNewPersistenceId(id: String): Boolean = !idMap.contains(id) + + def allPersistenceIds: Set[String] = idMap.keySet + private def readIdMap(): Map[String, Int] = withIterator { iter ⇒ iter.seek(keyToBytes(mappingKey(idOffset))) readIdMap(Map.empty, iter) @@ -48,9 +52,12 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore private def writeIdMapping(id: String, numericId: Int): Int = { idMap = idMap + (id -> numericId) leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes(UTF_8)) + newPersistenceIdAdded(id) numericId } + protected def newPersistenceIdAdded(id: String): Unit = () + override def preStart() { idMap = readIdMap() super.preStart() diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index ff509bc364..446b99b8ea 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -24,6 +24,9 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi case SubscribePersistenceId(persistenceId: String) ⇒ addPersistenceIdSubscriber(sender(), persistenceId) context.watch(sender()) + case SubscribeAllPersistenceIds ⇒ + addAllPersistenceIdsSubscriber(sender()) + context.watch(sender()) case Terminated(ref) ⇒ removeSubscriber(ref) } @@ -33,13 +36,25 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi * INTERNAL API. */ private[persistence] object LeveldbJournal { + sealed trait SubscriptionCommand + /** - * Subscribe the `sender` to changes (append events) for a specific `persistenceId`. - * Used by query-side. The journal will send [[ChangedPersistenceId]] messages to + * Subscribe the `sender` to changes (appended events) for a specific `persistenceId`. + * Used by query-side. The journal will send [[EventAppended]] messages to * the subscriber when `asyncWriteMessages` has been called. */ - case class SubscribePersistenceId(persistenceId: String) - case class ChangedPersistenceId(persistenceId: String) extends DeadLetterSuppression + case class SubscribePersistenceId(persistenceId: String) extends SubscriptionCommand + case class EventAppended(persistenceId: String) extends DeadLetterSuppression + + /** + * Subscribe the `sender` to changes (appended events) for a specific `persistenceId`. + * Used by query-side. The journal will send one [[CurrentPersistenceIds]] to the + * subscriber followed by [[PersistenceIdAdded]] messages when new persistenceIds + * are created. + */ + case object SubscribeAllPersistenceIds extends SubscriptionCommand + case class CurrentPersistenceIds(allPersistenceIds: Set[String]) extends DeadLetterSuppression + case class PersistenceIdAdded(persistenceId: String) extends DeadLetterSuppression } /** @@ -52,13 +67,13 @@ private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy { "akka.persistence.journal.leveldb-shared.timeout") override def receivePluginInternal: Receive = { - case m: LeveldbJournal.SubscribePersistenceId ⇒ + case cmd: LeveldbJournal.SubscriptionCommand ⇒ // forward subscriptions, they are used by query-side store match { - case Some(s) ⇒ s.forward(m) + case Some(s) ⇒ s.forward(cmd) case None ⇒ - log.error("Failed SubscribePersistenceId({}) request. " + - "Store not initialized. Use `SharedLeveldbJournal.setStore(sharedStore, system)`", m.persistenceId) + log.error("Failed {} request. " + + "Store not initialized. Use `SharedLeveldbJournal.setStore(sharedStore, system)`", cmd) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index c68f46bb60..a27ebe8136 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -34,6 +34,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with var leveldb: DB = _ private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] + private var allPersistenceIdsSubscribers = Set.empty[ActorRef] def leveldbFactory = if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory @@ -45,15 +46,22 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { var persistenceIds = Set.empty[String] + val hasSubscribers = hasPersistenceIdSubscribers val result = Future.fromTry(Try { withBatch(batch ⇒ messages.map { a ⇒ Try { a.payload.foreach(message ⇒ addToMessageBatch(message, batch)) - persistenceIds += a.persistenceId + if (hasSubscribers) + persistenceIds += a.persistenceId } }) }) - persistenceIds.foreach(notifyPersistenceIdChange) + + if (hasSubscribers) { + persistenceIds.foreach { pid ⇒ + notifyPersistenceIdChange(pid) + } + } result } @@ -128,13 +136,29 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with protected def removeSubscriber(subscriber: ActorRef): Unit = { val keys = persistenceIdSubscribers.collect { case (k, s) if s.contains(subscriber) ⇒ k } keys.foreach { key ⇒ persistenceIdSubscribers.removeBinding(key, subscriber) } + + allPersistenceIdsSubscribers -= subscriber + } + + protected def hasAllPersistenceIdsSubscribers: Boolean = allPersistenceIdsSubscribers.nonEmpty + + protected def addAllPersistenceIdsSubscriber(subscriber: ActorRef): Unit = { + allPersistenceIdsSubscribers += subscriber + subscriber ! LeveldbJournal.CurrentPersistenceIds(allPersistenceIds) } private def notifyPersistenceIdChange(persistenceId: String): Unit = if (persistenceIdSubscribers.contains(persistenceId)) { - val changed = LeveldbJournal.ChangedPersistenceId(persistenceId) + val changed = LeveldbJournal.EventAppended(persistenceId) persistenceIdSubscribers(persistenceId).foreach(_ ! changed) } + override protected def newPersistenceIdAdded(id: String): Unit = { + if (hasAllPersistenceIdsSubscribers) { + val added = LeveldbJournal.PersistenceIdAdded(id) + allPersistenceIdsSubscribers.foreach(_ ! added) + } + } + }