Merge pull request #18264 from akka/wip-18190-leveldb-EventsByPersistenceId-patriknw

+per #18190 leveldb impl of EventsByPersistenceId query
This commit is contained in:
Patrik Nordwall 2015-08-19 20:16:01 +02:00
commit 87bc51220d
16 changed files with 484 additions and 19 deletions

View file

@ -28,7 +28,7 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
private var isInitialized = false
private var isInitTimedOut = false
private var store: Option[ActorRef] = None
protected var store: Option[ActorRef] = None
private val storeNotInitialized =
Future.failed(new TimeoutException("Store not initialized. " +
"Use `SharedLeveldbJournal.setStore(sharedStore, system)`"))

View file

@ -17,7 +17,30 @@ import akka.util.Helpers.ConfigOps
*
* Journal backed by a local LevelDB store. For production use.
*/
private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore
private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore {
import LeveldbJournal._
override def receivePluginInternal: Receive = {
case SubscribePersistenceId(persistenceId: String)
addPersistenceIdSubscriber(sender(), persistenceId)
context.watch(sender())
case Terminated(ref)
removeSubscriber(ref)
}
}
/**
* INTERNAL API.
*/
private[persistence] object LeveldbJournal {
/**
* Subscribe the `sender` to changes (append events) for a specific `persistenceId`.
* Used by query-side. The journal will send [[ChangedPersistenceId]] messages to
* the subscriber when `asyncWriteMessages` has been called.
*/
case class SubscribePersistenceId(persistenceId: String)
case class ChangedPersistenceId(persistenceId: String) extends DeadLetterSuppression
}
/**
* INTERNAL API.
@ -27,6 +50,18 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi
private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy {
val timeout: Timeout = context.system.settings.config.getMillisDuration(
"akka.persistence.journal.leveldb-shared.timeout")
override def receivePluginInternal: Receive = {
case m: LeveldbJournal.SubscribePersistenceId
// forward subscriptions, they are used by query-side
store match {
case Some(s) s.forward(m)
case None
log.error("Failed SubscribePersistenceId({}) request. " +
"Store not initialized. Use `SharedLeveldbJournal.setStore(sharedStore, system)`", m.persistenceId)
}
}
}
object SharedLeveldbJournal {

View file

@ -6,6 +6,7 @@
package akka.persistence.journal.leveldb
import java.io.File
import scala.collection.mutable
import akka.actor._
import akka.persistence._
import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget }
@ -32,6 +33,8 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
val leveldbDir = new File(config.getString("dir"))
var leveldb: DB = _
private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
def leveldbFactory =
if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory
else org.iq80.leveldb.impl.Iq80DBFactory.factory
@ -40,12 +43,19 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
import Key._
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
Future.fromTry(Try {
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
var persistenceIds = Set.empty[String]
val result = Future.fromTry(Try {
withBatch(batch messages.map { a
Try(a.payload.foreach(message addToMessageBatch(message, batch)))
Try {
a.payload.foreach(message addToMessageBatch(message, batch))
persistenceIds += a.persistenceId
}
})
})
persistenceIds.foreach(notifyPersistenceIdChange)
result
}
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
try Future.successful {
@ -109,5 +119,22 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
leveldb.close()
super.postStop()
}
protected def hasPersistenceIdSubscribers: Boolean = persistenceIdSubscribers.nonEmpty
protected def addPersistenceIdSubscriber(subscriber: ActorRef, persistenceId: String): Unit =
persistenceIdSubscribers.addBinding(persistenceId, subscriber)
protected def removeSubscriber(subscriber: ActorRef): Unit = {
val keys = persistenceIdSubscribers.collect { case (k, s) if s.contains(subscriber) k }
keys.foreach { key persistenceIdSubscribers.removeBinding(key, subscriber) }
}
private def notifyPersistenceIdChange(persistenceId: String): Unit =
if (persistenceIdSubscribers.contains(persistenceId)) {
val changed = LeveldbJournal.ChangedPersistenceId(persistenceId)
persistenceIdSubscribers(persistenceId).foreach(_ ! changed)
}
}