Merge pull request #16794 from carrot-garden/persistence-multiple-plugins

+per #15587 Make it possible to use multiple persistence plugins
This commit is contained in:
Björn Antonsson 2015-03-04 15:19:43 +01:00
commit 8e4f1d966e
14 changed files with 452 additions and 242 deletions

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence;
import akka.persistence.UntypedPersistentActor;
public class PersistenceMultiDocTest {
//#default-plugins
abstract class ActorWithDefaultPlugins extends UntypedPersistentActor {
@Override
public String persistenceId() { return "123"; }
}
//#default-plugins
//#override-plugins
abstract class ActorWithOverridePlugins extends UntypedPersistentActor {
@Override
public String persistenceId() { return "123"; }
// Absolute path to the journal plugin configuration entry in the `reference.conf`
@Override
public String journalPluginId() { return "akka.persistence.chronicle.journal"; }
// Absolute path to the snapshot store plugin configuration entry in the `reference.conf`
@Override
public String snapshotPluginId() { return "akka.persistence.chronicle.snapshot-store"; }
}
//#override-plugins
}

View file

@ -613,3 +613,25 @@ or
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config
in your Akka configuration. The LevelDB Java port is for testing purposes only. in your Akka configuration. The LevelDB Java port is for testing purposes only.
Multiple persistence plugin configurations
==========================================
By default, persistent actor or view will use "default" journal and snapshot store plugins
configured in the following sections of the ``reference.conf`` configuration resource:
.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#default-config
Note that in this case actor or view overrides only ``persistenceId`` method:
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins
Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf``
plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.:
.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#override-config

View file

@ -666,3 +666,24 @@ Configuration
There are several configuration properties for the persistence module, please refer There are several configuration properties for the persistence module, please refer
to the :ref:`reference configuration <config-akka-persistence>`. to the :ref:`reference configuration <config-akka-persistence>`.
Multiple persistence plugin configurations
==========================================
By default, persistent actor or view will use "default" journal and snapshot store plugins
configured in the following sections of the ``reference.conf`` configuration resource:
.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#default-config
Note that in this case actor or view overrides only ``persistenceId`` method:
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins
Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf``
plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.:
.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#override-config

View file

@ -178,8 +178,3 @@ persistent actor on the sending side.
Read more about at-least-once delivery in the :ref:`documentation for Scala <at-least-once-delivery>` and Read more about at-least-once delivery in the :ref:`documentation for Scala <at-least-once-delivery>` and
:ref:`documentation for Java <at-least-once-delivery-java>`. :ref:`documentation for Java <at-least-once-delivery-java>`.

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.persistence.PersistentActor
object PersistenceMultiDocSpec {
val DefaultConfig = """
//#default-config
# Absolute path to the default journal plugin configuration entry.
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
# Absolute path to the default snapshot store plugin configuration entry.
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
//#default-config
"""
//#default-plugins
trait ActorWithDefaultPlugins extends PersistentActor {
override def persistenceId = "123"
}
//#default-plugins
val OverrideConfig = """
//#override-config
# Configuration entry for the custom journal plugin, see `journalPluginId`.
akka.persistence.chronicle.journal {
# Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSyncJournal"
# Custom setting specific for the journal `ChronicleSyncJournal`.
folder = ${user.dir}/store/journal
# Standard persistence extension property: plugin actor uses config injection.
inject-config = true
}
# Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`.
akka.persistence.chronicle.snapshot-store {
# Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSnapshotStore"
# Custom setting specific for the snapshot store `ChronicleSnapshotStore`.
folder = ${user.dir}/store/snapshot
# Standard persistence extension property: plugin actor uses config injection.
inject-config = true
}
//#override-config
"""
//#override-plugins
trait ActorWithOverridePlugins extends PersistentActor {
override def persistenceId = "123"
// Absolute path to the journal plugin configuration entry in the `reference.conf`.
override def journalPluginId = "akka.persistence.chronicle.journal"
// Absolute path to the snapshot store plugin configuration entry in the `reference.conf`.
override def snapshotPluginId = "akka.persistence.chronicle.snapshot-store"
}
//#override-plugins
}

View file

@ -668,3 +668,24 @@ Configuration
There are several configuration properties for the persistence module, please refer There are several configuration properties for the persistence module, please refer
to the :ref:`reference configuration <config-akka-persistence>`. to the :ref:`reference configuration <config-akka-persistence>`.
Multiple persistence plugin configurations
==========================================
By default, persistent actor or view will use "default" journal and snapshot store plugins
configured in the following sections of the ``reference.conf`` configuration resource:
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-config
Note that in this case actor or view overrides only ``persistenceId`` method:
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-plugins
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-plugins
Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf``
plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.:
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-config

View file

@ -1,182 +1,163 @@
########################################## ###########################################################
# Akka Persistence Reference Config File # # Akka Persistence Extension Reference Configuration File #
########################################## ###########################################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
# Note that both journal and snapshot store plugin configuration entries require few fields:
# `class` : Fully qualified class name providing journal-plugin-api or snapshot-store-plugin-api implementation.
# `inject-config` : Plugin actor has a constructor which expects plugin configuration entry. This boolean field is optional.
# `plugin-dispatcher` : Absolute configuration path to the akka dispatcher configuration entry. This string field is optional.
akka { # Note that journal and snapshot store plugins included with the extension are suitable for testing purposes only.
# You should change extension defaults or override `journalPluginId` and `snapshotPluginId` in the persistent actor or view.
# Protobuf serialization for persistent messages # Directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page http://akka.io/community/
actor {
serializers {
akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
}
serialization-bindings {
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.serialization.Message" = akka-persistence-message
}
}
persistence {
# Default persistence extension settings.
akka.persistence {
# Default journal settings.
journal { journal {
# Absolute path to the journal plugin configuration entry used by persistent actor or view by default.
# Persistent actor or view can override `journalPluginId` method in order to rely on a different journal plugin.
plugin = "akka.persistence.journal.inmem"
# Maximum size of a persistent message batch written to the journal.
max-message-batch-size = 200
# Maximum size of a deletion batch written to the journal.
max-deletion-batch-size = 10000
}
# Default snapshot store settings.
snapshot-store {
# Absolute path to the snapshot plugin configuration entry used by persistent actor or view by default.
# Persistent actor or view can override `snapshotPluginId` method in order to rely on a different snapshot plugin.
plugin = "akka.persistence.snapshot-store.local"
}
# Default persistent view settings.
view {
# Automated incremental view update.
auto-update = on
# Interval between incremental updates.
auto-update-interval = 5s
# Maximum number of messages to replay per incremental view update. Set to -1 for no upper limit.
auto-update-replay-max = -1
}
# Default reliable delivery settings.
at-least-once-delivery {
# Interval between re-delivery attempts.
redeliver-interval = 5s
# Maximum number of unconfirmed messages that will be sent in one re-delivery burst.
redelivery-burst-limit = 10000
# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`, message will be sent to the actor.
warn-after-number-of-unconfirmed-attempts = 5
# Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is allowed to hold in memory.
max-unconfirmed-messages = 100000
}
# Default persistent extension thread pools.
dispatchers {
# Dispatcher used by every plugin which does not declare explicit `plugin-dispatcher` field.
default-plugin-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
default-replay-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
default-stream-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}
}
# Maximum size of a persistent message batch written to the journal. # Protobuf serialization for the persistent extension messages.
max-message-batch-size = 200 akka.actor {
serializers {
akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
}
serialization-bindings {
"akka.persistence.serialization.Message" = akka-persistence-message
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
}
serialization-identifiers {
"akka.persistence.serialization.MessageSerializer" = 7
"akka.persistence.serialization.SnapshotSerializer" = 8
}
}
# Maximum size of a deletion batch written to the journal. ###################################################
max-deletion-batch-size = 10000 # Persistence plugins included with the extension #
###################################################
# Path to the journal plugin to be used # In-memory journal plugin.
plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.inmem {
# Class name of the plugin.
class = "akka.persistence.journal.inmem.InmemJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
# In-memory journal plugin. # Local file system snapshot store plugin.
inmem { akka.persistence.snapshot-store.local {
# Class name of the plugin.
class = "akka.persistence.snapshot.local.LocalSnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"
# Storage location of snapshot files.
dir = "snapshots"
}
# Class name of the plugin. # LevelDB journal plugin.
class = "akka.persistence.journal.inmem.InmemJournal" # TODO move to separate module: https://github.com/akka/akka/issues/15884
akka.persistence.journal.leveldb {
# Dispatcher for the plugin actor. # Class name of the plugin.
plugin-dispatcher = "akka.actor.default-dispatcher" class = "akka.persistence.journal.leveldb.LeveldbJournal"
} # Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# LevelDB journal plugin. # Dispatcher for message replay.
leveldb { replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Storage location of LevelDB files.
# Class name of the plugin. dir = "journal"
class = "akka.persistence.journal.leveldb.LeveldbJournal" # Use fsync on write.
fsync = on
# Dispatcher for the plugin actor. # Verify checksum on read.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
native = on
}
# Shared LevelDB journal plugin (for testing only).
# TODO move to separate module: https://github.com/akka/akka/issues/15884
akka.persistence.journal.leveldb-shared {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# Timeout for async journal operations.
timeout = 10s
store {
# Dispatcher for shared store actor.
store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay. # Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Storage location of LevelDB files. # Storage location of LevelDB files.
dir = "journal" dir = "journal"
# Use fsync on write.
# Use fsync on write
fsync = on fsync = on
# Verify checksum on read. # Verify checksum on read.
checksum = off checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
# Native LevelDB (via JNI) or LevelDB Java port
native = on native = on
}
# Shared LevelDB journal plugin (for testing only).
leveldb-shared {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# timeout for async journal operations
timeout = 10s
store {
# Dispatcher for shared store actor.
store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Storage location of LevelDB files.
dir = "journal"
# Use fsync on write
fsync = on
# Verify checksum on read.
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port
native = on
}
}
} }
snapshot-store {
# Path to the snapshot store plugin to be used
plugin = "akka.persistence.snapshot-store.local"
# Local filesystem snapshot store plugin.
local {
# Class name of the plugin.
class = "akka.persistence.snapshot.local.LocalSnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"
# Storage location of snapshot files.
dir = "snapshots"
}
}
view {
# Automated incremental view update.
auto-update = on
# Interval between incremental updates
auto-update-interval = 5s
# Maximum number of messages to replay per incremental view update. Set to
# -1 for no upper limit.
auto-update-replay-max = -1
}
at-least-once-delivery {
# Interval between redelivery attempts
redeliver-interval = 5s
# Maximum number of unconfirmed messages that will be sent in one redelivery burst
redelivery-burst-limit = 10000
# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
# message will be sent to the actor.
warn-after-number-of-unconfirmed-attempts = 5
# Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
# allowed to hold in memory.
max-unconfirmed-messages = 100000
}
dispatchers {
default-plugin-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
default-replay-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
default-stream-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}
}
} }

View file

@ -12,6 +12,7 @@ import akka.actor.Stash
import akka.actor.StashFactory import akka.actor.StashFactory
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.actor.ActorRef
/** /**
* INTERNAL API * INTERNAL API
@ -36,13 +37,15 @@ private[persistence] object Eventsourced {
* Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and * Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and
* [[UntypedPersistentActor]]. * [[UntypedPersistentActor]].
*/ */
private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory { private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity {
import JournalProtocol._ import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult import SnapshotProtocol.LoadSnapshotResult
import Eventsourced._ import Eventsourced._
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
private lazy val journal = extension.journalFor(persistenceId)
private[persistence] lazy val journal = extension.journalFor(journalPluginId)
private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId)
private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement() private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement()
@ -68,11 +71,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case _ true case _ true
} }
/**
* Id of the persistent entity for which messages should be replayed.
*/
def persistenceId: String
/** /**
* Returns `persistenceId`. * Returns `persistenceId`.
*/ */
@ -114,15 +112,18 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def preStart(): Unit = override def preStart(): Unit =
self ! Recover() self ! Recover()
/** /** INTERNAL API. */
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
currentState.stateReceive(receive, message) currentState.stateReceive(receive, message)
/** /** INTERNAL API. */
* INTERNAL API. override protected[akka] def aroundPreStart(): Unit = {
*/ // Fail fast on missing plugins.
val j = journal; val s = snapshotStore
super.aroundPreStart()
}
/** INTERNAL API. */
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try { try {
internalStash.unstashAll() internalStash.unstashAll()
@ -145,9 +146,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
} }
} }
/** /** INTERNAL API. */
* INTERNAL API.
*/
override protected[akka] def aroundPostStop(): Unit = override protected[akka] def aroundPostStop(): Unit =
try { try {
internalStash.unstashAll() internalStash.unstashAll()

View file

@ -5,13 +5,15 @@
package akka.persistence package akka.persistence
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor._ import akka.actor._
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.AsyncWriteJournal
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import akka.event.LoggingAdapter
import akka.event.Logging
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
/** /**
* Persistence configuration. * Persistence configuration.
@ -70,68 +72,142 @@ final class PersistenceSettings(config: Config) {
} }
/** /**
* Persistence extension. * Identification of [[PersistentActor]] or [[PersistentView]].
*/
//#persistence-identity
trait PersistenceIdentity {
/**
* Id of the persistent entity for which messages should be replayed.
*/
def persistenceId: String
/**
* Configuration id of the journal plugin servicing this persistent actor or view.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
def journalPluginId: String = ""
/**
* Configuration id of the snapshot plugin servicing this persistent actor or view.
* When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
def snapshotPluginId: String = ""
}
//#persistence-identity
/**
* Persistence extension provider.
*/ */
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
/** /** Java API. */
* Java API.
*/
override def get(system: ActorSystem): Persistence = super.get(system) override def get(system: ActorSystem): Persistence = super.get(system)
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence def lookup() = Persistence
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef) extends Extension
} }
/** /**
* Persistence extension. * Persistence extension.
*/ */
class Persistence(val system: ExtendedActorSystem) extends Extension { class Persistence(val system: ExtendedActorSystem) extends Extension {
import Persistence._
private def log: LoggingAdapter = Logging(system, getClass.getName)
private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher" private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
private val config = system.settings.config.getConfig("akka.persistence") private val config = system.settings.config.getConfig("akka.persistence")
// Lazy, so user is not forced to configure defaults when she is not using them.
private lazy val defaultJournalPluginId = config.getString("journal.plugin")
// Lazy, so user is not forced to configure defaults when she is not using them.
private lazy val defaultSnapshotPluginId = config.getString("snapshot-store.plugin")
val settings = new PersistenceSettings(config) val settings = new PersistenceSettings(config)
private val snapshotStore = createPlugin("snapshot-store") { _ private def journalDispatchSelector(klaz: Class[_]): String =
DefaultPluginDispatcherId if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId
}
private val journal = createPlugin("journal") { clazz private def snapshotDispatchSelector(klaz: Class[_]): String =
if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId DefaultPluginDispatcherId
else DefaultPluginDispatcherId
/** Check for default identity. */
private def isDefault(text: String) = text == null || text.length == 0
/** Discovered persistence journal plugins. */
private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/** Discovered persistence snapshot store plugins. */
private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/**
* Returns a journal plugin actor identified by `journalPluginId`.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
@tailrec final def journalFor(journalPluginId: String): ActorRef = {
val configPath = if (isDefault(journalPluginId)) defaultJournalPluginId else journalPluginId
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder =
PluginHolder(createPlugin(configPath)(journalDispatchSelector))
}
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
journalFor(journalPluginId) // Recursive invocation.
}
} }
/** /**
* Creates a canonical persistent actor id from a persistent actor ref. * Returns a snapshot store plugin actor identified by `snapshotPluginId`.
* When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/ */
@tailrec final def snapshotStoreFor(snapshotPluginId: String): ActorRef = {
val configPath = if (isDefault(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
val extensionIdMap = snapshotPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder =
PluginHolder(createPlugin(configPath)(snapshotDispatchSelector))
}
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
snapshotStoreFor(snapshotPluginId) // Recursive invocation.
}
}
private def createPlugin(configPath: String)(dispatcherSelector: Class[_] String) = {
val pluginActorName = configPath
val pluginConfig = system.settings.config.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil
val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs)
system.systemActorOf(pluginActorProps, pluginActorName)
}
/** Creates a canonical persistent actor id from a persistent actor ref. */
def persistenceId(persistentActor: ActorRef): String = id(persistentActor) def persistenceId(persistentActor: ActorRef): String = id(persistentActor)
/**
* Returns a snapshot store for a persistent actor identified by `persistenceId`.
*/
def snapshotStoreFor(persistenceId: String): ActorRef = {
// Currently returns a snapshot store singleton but this methods allows for later
// optimizations where each persistent actor can have its own snapshot store actor.
snapshotStore
}
/**
* Returns a journal for a persistent actor identified by `persistenceId`.
*/
def journalFor(persistenceId: String): ActorRef = {
// Currently returns a journal singleton but this methods allows for later
// optimizations where each persistent actor can have its own journal actor.
journal
}
private def createPlugin(pluginType: String)(dispatcherSelector: Class[_] String) = {
val pluginConfigPath = config.getString(s"${pluginType}.plugin")
val pluginConfig = system.settings.config.getConfig(pluginConfigPath)
val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
}
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
} }

View file

@ -116,14 +116,14 @@ object Recover {
/** /**
* An persistent Actor - can be used to implement command or event sourcing. * An persistent Actor - can be used to implement command or event sourcing.
*/ */
trait PersistentActor extends Eventsourced { trait PersistentActor extends Eventsourced with PersistenceIdentity {
def receive = receiveCommand def receive = receiveCommand
} }
/** /**
* Java API: an persistent actor - can be used to implement command or event sourcing. * Java API: an persistent actor - can be used to implement command or event sourcing.
*/ */
abstract class UntypedPersistentActor extends UntypedActor with Eventsourced { abstract class UntypedPersistentActor extends UntypedActor with Eventsourced with PersistenceIdentity {
final def onReceive(message: Any) = onReceiveCommand(message) final def onReceive(message: Any) = onReceiveCommand(message)

View file

@ -80,7 +80,7 @@ private[akka] object PersistentView {
* - [[autoUpdate]] for turning automated updates on or off * - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/ */
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory { trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity {
import PersistentView._ import PersistentView._
import JournalProtocol._ import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult import SnapshotProtocol.LoadSnapshotResult
@ -88,7 +88,9 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
private val viewSettings = extension.settings.view private val viewSettings = extension.settings.view
private lazy val journal = extension.journalFor(persistenceId)
private[persistence] lazy val journal = extension.journalFor(journalPluginId)
private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId)
private var schedule: Option[Cancellable] = None private var schedule: Option[Cancellable] = None
@ -118,11 +120,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
*/ */
def viewId: String def viewId: String
/**
* Id of the persistent entity for which messages should be replayed.
*/
def persistenceId: String
/** /**
* Returns `viewId`. * Returns `viewId`.
*/ */
@ -187,13 +184,18 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
self, ScheduledUpdate(autoUpdateReplayMax))) self, ScheduledUpdate(autoUpdateReplayMax)))
} }
/** /** INTERNAL API. */
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = {
currentState.stateReceive(receive, message) currentState.stateReceive(receive, message)
} }
/** INTERNAL API. */
override protected[akka] def aroundPreStart(): Unit = {
// Fail fast on missing plugins.
val j = journal; val s = snapshotStore
super.aroundPreStart()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try internalStash.unstashAll() finally super.preRestart(reason, message) try internalStash.unstashAll() finally super.preRestart(reason, message)
} }

View file

@ -11,7 +11,9 @@ import akka.persistence.SnapshotProtocol._
* Snapshot API on top of the internal snapshot protocol. * Snapshot API on top of the internal snapshot protocol.
*/ */
trait Snapshotter extends Actor { trait Snapshotter extends Actor {
private lazy val snapshotStore = Persistence(context.system).snapshotStoreFor(snapshotterId)
/** Snapshot store plugin actor. */
private[persistence] def snapshotStore: ActorRef
/** /**
* Snapshotter id. * Snapshotter id.

View file

@ -30,8 +30,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
val PersistentImplClass = classOf[PersistentImpl] val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
def identifier: Int = 7 val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]]
def includeManifest: Boolean = true override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = { private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress val address = system.provider.getDefaultAddress

View file

@ -31,8 +31,10 @@ private[serialization] final case class SnapshotHeader(serializerId: Int, manife
* [[Snapshot]] serializer. * [[Snapshot]] serializer.
*/ */
class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
def identifier: Int = 8
def includeManifest: Boolean = false val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]]
override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
override val includeManifest: Boolean = false
private lazy val transportInformation: Option[Serialization.Information] = { private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress val address = system.provider.getDefaultAddress