fix #23618 : Support for persistence dynamic configuration at runtime (#23841)

This commit is contained in:
Prada Souvanlasy 2018-03-26 13:52:31 +02:00 committed by Patrik Nordwall
parent 0a426a7ab0
commit 46c662965f
16 changed files with 918 additions and 144 deletions

View file

@ -1434,3 +1434,17 @@ Note that `journalPluginId` and `snapshotPluginId` must refer to properly config
plugin entries with a standard `class` property as well as settings which are specific for those plugins, i.e.:
@@snip [PersistenceMultiDocSpec.scala]($code$/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #override-config }
## Give persistence plugin configurations at runtime
By default, a persistent actor will use the configuration loaded at `ActorSystem` creation time to create journal and snapshot store plugins.
When the persistent actor overrides the `journalPluginConfig` and `snapshotPluginConfig` methods,
the actor will use the declared `Config` objects with a fallback on the default configuration.
It allows a dynamic configuration of the journal and the snapshot store at runtime:
Scala
: @@snip [PersistenceMultiDocSpec.scala]($code$/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #runtime-config }
Java
: @@snip [PersistenceMultiDocTest.java]($code$/java/jdocs/persistence/PersistenceMultiDocTest.java) { #runtime-config }

View file

@ -4,28 +4,85 @@
package jdocs.persistence;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RuntimePluginConfig;
import akka.persistence.UntypedPersistentActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class PersistenceMultiDocTest {
//#default-plugins
abstract class ActorWithDefaultPlugins extends UntypedPersistentActor {
abstract class AbstractPersistentActorWithDefaultPlugins extends AbstractPersistentActor {
@Override
public String persistenceId() { return "123"; }
public String persistenceId() {
return "123";
}
}
//#default-plugins
//#override-plugins
abstract class ActorWithOverridePlugins extends UntypedPersistentActor {
abstract class AbstractPersistentActorWithOverridePlugins extends AbstractPersistentActor {
@Override
public String persistenceId() { return "123"; }
public String persistenceId() {
return "123";
}
// Absolute path to the journal plugin configuration entry in the `reference.conf`
@Override
public String journalPluginId() { return "akka.persistence.chronicle.journal"; }
public String journalPluginId() {
return "akka.persistence.chronicle.journal";
}
// Absolute path to the snapshot store plugin configuration entry in the `reference.conf`
@Override
public String snapshotPluginId() { return "akka.persistence.chronicle.snapshot-store"; }
public String snapshotPluginId() {
return "akka.persistence.chronicle.snapshot-store";
}
}
//#override-plugins
//#runtime-config
abstract class AbstractPersistentActorWithRuntimePluginConfig extends AbstractPersistentActor implements RuntimePluginConfig {
// Variable that is retrieved at runtime, from an external service for instance.
String runtimeDistinction = "foo";
@Override
public String persistenceId() {
return "123";
}
// Absolute path to the journal plugin configuration entry in the `reference.conf`
@Override
public String journalPluginId() {
return "journal-plugin-" + runtimeDistinction;
}
// Absolute path to the snapshot store plugin configuration entry in the `reference.conf`
@Override
public String snapshotPluginId() {
return "snapshot-store-plugin-" + runtimeDistinction;
}
// Configuration which contains the journal plugin id defined above
@Override
public Config journalPluginConfig() {
return ConfigFactory.empty().withValue(
"journal-plugin-" + runtimeDistinction,
getContext().getSystem().settings().config().getValue("journal-plugin") // or a very different configuration coming from an external service.
);
}
// Configuration which contains the snapshot store plugin id defined above
@Override
public Config snapshotPluginConfig() {
return ConfigFactory.empty().withValue(
"snapshot-plugin-" + runtimeDistinction,
getContext().getSystem().settings().config().getValue("snapshot-store-plugin") // or a very different configuration coming from an external service.
);
}
}
//#runtime-config
}

View file

@ -20,6 +20,7 @@ import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.Timeout;
import com.typesafe.config.ConfigFactory;
import docs.persistence.query.MyEventsByTagPublisher;
import org.reactivestreams.Subscriber;
import scala.concurrent.duration.FiniteDuration;

View file

@ -2,11 +2,13 @@
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.persistence.PersistentActor
import akka.persistence.{ RuntimePluginConfig, PersistentActor }
import com.typesafe.config.ConfigFactory
object PersistenceMultiDocSpec {
val DefaultConfig = """
val DefaultConfig =
"""
//#default-config
# Absolute path to the default journal plugin configuration entry.
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
@ -19,9 +21,11 @@ object PersistenceMultiDocSpec {
trait ActorWithDefaultPlugins extends PersistentActor {
override def persistenceId = "123"
}
//#default-plugins
val OverrideConfig = s"""
val OverrideConfig =
s"""
//#override-config
# Configuration entry for the custom journal plugin, see `journalPluginId`.
akka.persistence.chronicle.journal {
@ -43,11 +47,42 @@ object PersistenceMultiDocSpec {
//#override-plugins
trait ActorWithOverridePlugins extends PersistentActor {
override def persistenceId = "123"
// Absolute path to the journal plugin configuration entry in the `reference.conf`.
override def journalPluginId = "akka.persistence.chronicle.journal"
// Absolute path to the snapshot store plugin configuration entry in the `reference.conf`.
override def snapshotPluginId = "akka.persistence.chronicle.snapshot-store"
}
//#override-plugins
//#runtime-config
trait ActorWithRuntimePluginConfig extends PersistentActor with RuntimePluginConfig {
// Variable that is retrieved at runtime, from an external service for instance.
val runtimeDistinction = "foo"
override def persistenceId = "123"
// Absolute path to the journal plugin configuration entry, not defined in the `reference.conf`.
override def journalPluginId = s"journal-plugin-$runtimeDistinction"
// Absolute path to the snapshot store plugin configuration entry, not defined in the `reference.conf`.
override def snapshotPluginId = s"snapshot-store-plugin-$runtimeDistinction"
// Configuration which contains the journal plugin id defined above
override def journalPluginConfig = ConfigFactory.empty().withValue(
s"journal-plugin-$runtimeDistinction",
context.system.settings.config.getValue("journal-plugin") // or a very different configuration coming from an external service.
)
// Configuration which contains the snapshot store plugin id defined above
override def snapshotPluginConfig = ConfigFactory.empty().withValue(
s"snapshot-plugin-$runtimeDistinction",
context.system.settings.config.getValue("snapshot-store-plugin") // or a very different configuration coming from an external service.
)
}
//#runtime-config
}

View file

@ -5,11 +5,13 @@
package akka.persistence.query
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.event.Logging
import scala.annotation.tailrec
import scala.util.Failure
import com.typesafe.config.Config
import com.typesafe.config.{ Config, ConfigFactory }
/**
* Persistence extension for queries.
@ -39,21 +41,33 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
/** Discovered query plugins. */
private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/**
* Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given
* read journal configuration entry.
*
* The provided readJournalPluginConfig will be used to configure the journal plugin instead of the actor system
* config.
*/
final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T =
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T]
/**
* Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T =
readJournalPluginFor(readJournalPluginId).scaladslPlugin.asInstanceOf[T]
readJournalFor(readJournalPluginId, ConfigFactory.empty)
/**
* Java API: Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T =
readJournalPluginFor(readJournalPluginId).javadslPlugin.asInstanceOf[T]
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T =
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T]
@tailrec private def readJournalPluginFor(readJournalPluginId: String): PluginHolder = {
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty())
@tailrec private def readJournalPluginFor(readJournalPluginId: String, readJournalPluginConfig: Config): PluginHolder = {
val configPath = readJournalPluginId
val extensionIdMap = readJournalPluginExtensionIds.get
extensionIdMap.get(configPath) match {
@ -62,20 +76,21 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val provider = createPlugin(configPath)
val provider = createPlugin(configPath, readJournalPluginConfig)
PluginHolder(provider.scaladslReadJournal(), provider.javadslReadJournal())
}
}
readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
readJournalPluginFor(readJournalPluginId) // Recursive invocation.
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig) // Recursive invocation.
}
}
private def createPlugin(configPath: String): ReadJournalProvider = {
private def createPlugin(configPath: String, readJournalPluginConfig: Config): ReadJournalProvider = {
val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config)
require(
!isEmpty(configPath) && system.settings.config.hasPath(configPath),
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'")
val pluginConfig = system.settings.config.getConfig(configPath)
val pluginConfig = mergedConfig.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${configPath} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get

View file

@ -7,6 +7,7 @@ package akka.persistence.query;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.testkit.AkkaJUnitActorSystemResource;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;

View file

@ -13,7 +13,7 @@ import akka.actor.ExtendedActorSystem
* Use for tests only!
* Emits infinite stream of strings (representing queried for events).
*/
class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery {
class DummyReadJournal(val dummyValue: String) extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery {
override def persistenceIds(): Source[String, NotUsed] =
Source.fromIterator(() Iterator.from(0)).map(_.toString)
}
@ -42,13 +42,19 @@ object DummyReadJournalProvider {
${DummyReadJournal.Identifier}4 {
class = "${classOf[DummyReadJournalProvider4].getCanonicalName}"
}
${DummyReadJournal.Identifier}5 {
class = "${classOf[DummyReadJournalProvider5].getCanonicalName}"
}
""")
}
class DummyReadJournalProvider extends ReadJournalProvider {
class DummyReadJournalProvider(dummyValue: String) extends ReadJournalProvider {
// mandatory zero-arg constructor
def this() = this("dummy")
override val scaladslReadJournal: DummyReadJournal =
new DummyReadJournal
new DummyReadJournal(dummyValue)
override val javadslReadJournal: DummyReadJournalForJava =
new DummyReadJournalForJava(scaladslReadJournal)
@ -60,3 +66,7 @@ class DummyReadJournalProvider3(sys: ExtendedActorSystem, conf: Config) extends
class DummyReadJournalProvider4(sys: ExtendedActorSystem, conf: Config, confPath: String) extends DummyReadJournalProvider
class DummyReadJournalProvider5(sys: ExtendedActorSystem) extends DummyReadJournalProvider
class CustomDummyReadJournalProvider5(sys: ExtendedActorSystem) extends DummyReadJournalProvider("custom")

View file

@ -5,11 +5,12 @@
package akka.persistence.query
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.persistence.journal.EventSeq
import akka.persistence.journal.ReadEventAdapter
import com.typesafe.config.ConfigFactory
import akka.persistence.journal.{ EventSeq, ReadEventAdapter }
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.Await
import scala.concurrent.duration._
@ -17,21 +18,45 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
val eventAdaptersConfig =
s"""
|akka.persistence.query.journal.dummy {
| event-adapters {
| adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName}
| }
|}
|akka.persistence.query.journal.dummy {
| event-adapters {
| adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName}
| }
|}
""".stripMargin
val customReadJournalPluginConfig =
s"""
|${DummyReadJournal.Identifier}5 {
| class = "${classOf[CustomDummyReadJournalProvider5].getCanonicalName}"
|}
|${DummyReadJournal.Identifier}6 {
| class = "${classOf[DummyReadJournalProvider].getCanonicalName}"
|}
""".stripMargin
"ReadJournal" must {
"be found by full config key" in {
withActorSystem() { system
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier)
val readJournalPluginConfig: Config = ConfigFactory.parseString(customReadJournalPluginConfig)
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier, readJournalPluginConfig)
// other combinations of constructor parameters
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "2")
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "3")
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "4")
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier + "2", readJournalPluginConfig)
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier + "3", readJournalPluginConfig)
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier + "4", readJournalPluginConfig)
// config key existing within both the provided readJournalPluginConfig
// and the actorSystem config. The journal must be created from the provided config then.
val dummyReadJournal5 = PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier + "5", readJournalPluginConfig)
dummyReadJournal5.dummyValue should equal("custom")
// config key directly coming from the provided readJournalPluginConfig,
// and does not exist within the actorSystem config
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](
DummyReadJournal.Identifier + "6", readJournalPluginConfig)
}
}
@ -46,6 +71,7 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
}
private val systemCounter = new AtomicInteger()
private def withActorSystem(conf: String = "")(block: ActorSystem Unit): Unit = {
val config =
DummyReadJournalProvider.config
@ -60,8 +86,13 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
}
object ExampleQueryModels {
case class OldModel(value: String) { def promote = NewModel(value) }
case class OldModel(value: String) {
def promote = NewModel(value)
}
case class NewModel(value: String)
}
class PrefixStringWithPAdapter extends ReadEventAdapter {

View file

@ -60,7 +60,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("a-1", "a-2")
.expectNoMsg(500.millis)
.expectNoMessage(500.millis)
.request(2)
.expectNext("a-3")
.expectComplete()
@ -81,13 +81,13 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("f-1", "f-2")
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
ref ! "f-4"
expectMsg("f-4-done")
probe
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
.request(5)
.expectNext("f-3")
.expectComplete() // f-4 not seen
@ -186,13 +186,13 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
val probe = src.map(_.event).runWith(TestSink.probe[Any])
.request(2)
.expectNext("e-1", "e-2")
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
ref ! "e-4"
expectMsg("e-4-done")
probe
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
.request(5)
.expectNext("e-3")
.expectNext("e-4")

View file

@ -79,7 +79,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple"))
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
.expectNoMsg(500.millis)
.expectNoMessage(500.millis)
.request(2)
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
.expectComplete()
@ -99,13 +99,13 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple"))
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
c ! "a green cucumber"
expectMsg(s"a green cucumber-done")
probe
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
.request(5)
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
.expectComplete() // green cucumber not seen
@ -130,7 +130,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
val probe = blackSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car"))
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
d ! "a black dog"
expectMsg(s"a black dog-done")
@ -139,7 +139,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
probe
.expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog"))
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
.request(10)
.expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night"))
}
@ -151,7 +151,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
// note that banana is not included, since exclusive offset
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber"))
.expectNoMsg(100.millis)
.expectNoMessage(100.millis)
}
}

View file

@ -1,4 +1,10 @@
# #23841 Remove backward compatibility on some akka-persistence internal API
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.journalConfigFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.journalFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.snapshotStoreFor")
# #24508 Adding defer method to PersistentActor
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentActor.defer")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDefer")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AbstractPersistentActorLike.defer")

View file

@ -1,2 +1,2 @@
# Missed out in previous releases
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithTimers")
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithTimers")

View file

@ -12,6 +12,7 @@ import akka.annotation.InternalApi
import akka.dispatch.Envelope
import akka.event.{ Logging, LoggingAdapter }
import akka.util.Helpers.ConfigOps
import com.typesafe.config.ConfigFactory
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
@ -26,8 +27,10 @@ private[persistence] object Eventsourced {
/** INTERNAL API */
private[akka] sealed trait PendingHandlerInvocation {
def evt: Any
def handler: Any Unit
}
/** INTERNAL API: forces actor to stash incoming commands until all these invocations are handled */
private[akka] final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** INTERNAL API: does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */
@ -58,15 +61,34 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
private val extension = Persistence(context.system)
private[persistence] lazy val journal = extension.journalFor(journalPluginId)
private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId)
private[persistence] lazy val journal = {
val journalPluginConfig = this match {
case c: RuntimePluginConfig c.journalPluginConfig
case _ ConfigFactory.empty
}
extension.journalFor(journalPluginId, journalPluginConfig)
}
private[persistence] lazy val snapshotStore = {
val snapshotPluginConfig = this match {
case c: RuntimePluginConfig c.snapshotPluginConfig
case _ ConfigFactory.empty
}
extension.snapshotStoreFor(snapshotPluginId, snapshotPluginConfig)
}
private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement()
private val writerUuid = UUID.randomUUID.toString
private var journalBatch = Vector.empty[PersistentEnvelope]
// no longer used, but kept for binary compatibility
private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size")
private val maxMessageBatchSize = {
val journalPluginConfig = this match {
case c: RuntimePluginConfig c.journalPluginConfig
case _ ConfigFactory.empty
}
extension.journalConfigFor(journalPluginId, journalPluginConfig).getInt("max-message-batch-size")
}
private var writeInProgress = false
private var sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L
@ -77,7 +99,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
// Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands
private var pendingStashingPersistInvocations: Long = 0
// Holds user-supplied callbacks for persist/persistAsync calls
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]()
// we only append / isEmpty / get(0) on it
private var eventBatch: List[PersistentEnvelope] = Nil
private val internalStash = createStash()
@ -120,7 +143,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception
* was thrown there
* was thrown there
*/
protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
event match {
@ -183,7 +206,14 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
if (all) internalStash.unstashAll() else internalStash.unstash()
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
val timeout = {
val journalPluginConfig = this match {
case c: RuntimePluginConfig c.journalPluginConfig
case _ ConfigFactory.empty
}
extension.journalConfigFor(journalPluginId, journalPluginConfig).getMillisDuration("recovery-event-timeout")
}
changeState(recoveryStarted(recovery.replayMax, timeout))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
}
@ -197,7 +227,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
require(persistenceId.trim.nonEmpty, s"persistenceId cannot be empty for PersistentActor [${self.path}]")
// Fail fast on missing plugins.
val j = journal; val s = snapshotStore
val j = journal;
val s = snapshotStore
requestRecoveryPermit()
super.aroundPreStart()
}
@ -431,6 +462,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
private trait State {
def stateReceive(receive: Receive, message: Any): Unit
def recoveryRunning: Boolean
}
@ -445,6 +477,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
private def waitingRecoveryPermit(recovery: Recovery) = new State {
override def toString: String = s"waiting for recovery permit"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
@ -463,11 +496,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* All incoming messages are stashed.
*
* @param replayMax maximum number of messages to replay.
* @param timeout recovery event timeout
*/
private def recoveryStarted(replayMax: Long) = new State {
private def recoveryStarted(replayMax: Long, timeout: FiniteDuration) = new State {
// protect against snapshot stalling forever because of journal overloaded and such
val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
val timeoutCancellable = {
import context.dispatcher
context.system.scheduler.scheduleOnce(timeout, self, RecoveryTick(snapshot = true))
@ -494,6 +526,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
}
override def toString: String = s"recovery started (replayMax = [$replayMax])"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = try message match {
@ -651,7 +684,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
try {
peekApplyHandler(p.payload)
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
} catch {
case NonFatal(e) onWriteMessageComplete(err = true); throw e
}
}
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
@ -675,7 +710,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
try {
peekApplyHandler(l)
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
} catch {
case NonFatal(e) onWriteMessageComplete(err = true); throw e
}
}
case WriteMessagesSuccessful
writeInProgress = false
@ -705,7 +742,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
else try {
Eventsourced.super.aroundReceive(receive, message)
aroundReceiveComplete(err = false)
} catch { case NonFatal(e) aroundReceiveComplete(err = true); throw e }
} catch {
case NonFatal(e) aroundReceiveComplete(err = true); throw e
}
private def aroundReceiveComplete(err: Boolean): Unit = {
if (eventBatch.nonEmpty) flushBatch()

View file

@ -12,7 +12,7 @@ import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters }
import akka.util.Collections.EmptyImmutableSeq
import akka.util.Helpers.ConfigOps
import com.typesafe.config.Config
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.concurrent.duration._
@ -68,6 +68,7 @@ final class PersistenceSettings(config: Config) {
}
}
}
/**
@ -82,7 +83,7 @@ trait PersistenceIdentity {
def persistenceId: String
/**
* Configuration id of the journal plugin servicing this persistent actor or view.
* Configuration id of the journal plugin servicing this persistent actor.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
@ -90,14 +91,14 @@ trait PersistenceIdentity {
def journalPluginId: String = ""
/**
* Configuration id of the snapshot plugin servicing this persistent actor or view.
* Configuration id of the snapshot plugin servicing this persistent actor.
* When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
def snapshotPluginId: String = ""
}
//#persistence-identity
trait PersistenceRecovery {
@ -110,6 +111,7 @@ trait PersistenceRecovery {
* To skip recovery completely return `Recovery.none`.
*/
def recovery: Recovery = Recovery()
//#persistence-recovery
}
@ -122,14 +124,41 @@ trait PersistenceStash extends Stash with StashFactory {
Persistence(context.system).defaultInternalStashOverflowStrategy
}
trait RuntimePluginConfig {
/**
* Additional configuration of the journal plugin servicing this persistent actor.
* When empty, the whole configuration of the journal plugin will be taken from the [[Config]] loaded into the
* [[ActorSystem]].
* When configured, the journal plugin configuration will be taken from this [[Config]] merged with the [[Config]]
* loaded into the [[ActorSystem]].
*
* @return an additional configuration used to configure the journal plugin.
*/
def journalPluginConfig: Config
/**
* Additional configuration of the snapshot plugin servicing this persistent actor.
* When empty, the whole configuration of the snapshot plugin will be taken from the [[Config]] loaded into the
* [[ActorSystem]].
* When configured, the snapshot plugin configuration will be taken from this [[Config]] merged with the [[Config]]
* loaded into the [[ActorSystem]].
*
* @return an additional configuration used to configure the snapshot plugin.
*/
def snapshotPluginConfig: Config
}
/**
* Persistence extension provider.
*/
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
/** Java API. */
override def get(system: ActorSystem): Persistence = super.get(system)
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config)
extends Extension
@ -145,6 +174,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
* Persistence extension.
*/
class Persistence(val system: ExtendedActorSystem) extends Extension {
import Persistence._
private def log: LoggingAdapter = Logging(system, getClass.getName)
@ -215,8 +245,19 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* adapter for each class, otherwise the most specific adapter matching a given class will be returned.
*/
final def adaptersFor(journalPluginId: String): EventAdapters = {
adaptersFor(journalPluginId: String, ConfigFactory.empty)
}
/**
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
* adapter for each class, otherwise the most specific adapter matching a given class will be returned.
*
* The provided journalPluginConfig will be used to configure the plugin instead of the actor system config.
*/
final def adaptersFor(journalPluginId: String, journalPluginConfig: Config): EventAdapters = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, JournalFallbackConfigPath).adapters
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).adapters
}
/**
@ -238,9 +279,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
*/
private[akka] final def journalConfigFor(journalPluginId: String): Config = {
private[akka] final def journalConfigFor(journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): Config = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, JournalFallbackConfigPath).config
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).config
}
/**
@ -262,9 +303,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
private[akka] final def journalFor(journalPluginId: String): ActorRef = {
private[akka] final def journalFor(journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): ActorRef = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, JournalFallbackConfigPath).actor
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).actor
}
/**
@ -275,20 +316,20 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = {
private[akka] final def snapshotStoreFor(snapshotPluginId: String, snapshotPluginConfig: Config = ConfigFactory.empty): ActorRef = {
val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).actor
pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath, snapshotPluginConfig).actor
}
@tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = {
@tailrec private def pluginHolderFor(configPath: String, fallbackPath: String, additionalConfig: Config): PluginHolder = {
val extensionIdMap = pluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system)
case None
val extensionId = new PluginHolderExtensionId(configPath, fallbackPath)
val extensionId = new PluginHolderExtensionId(configPath, fallbackPath, additionalConfig)
pluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
pluginHolderFor(configPath, fallbackPath) // Recursive invocation.
pluginHolderFor(configPath, fallbackPath, additionalConfig) // Recursive invocation.
}
}
@ -310,14 +351,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
try {
Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found
List(pluginConfig)
} catch { case NonFatal(_) Nil } // otherwise use empty constructor
} catch {
case NonFatal(_) Nil
} // otherwise use empty constructor
}
val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs)
system.systemActorOf(pluginActorProps, pluginActorName)
}
private def createAdapters(configPath: String): EventAdapters = {
val pluginConfig = system.settings.config.getConfig(configPath)
private def createAdapters(configPath: String, additionalConfig: Config): EventAdapters = {
val pluginConfig = additionalConfig.withFallback(system.settings.config).getConfig(configPath)
EventAdapters(system, pluginConfig)
}
@ -326,15 +369,18 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
private class PluginHolderExtensionId(configPath: String, fallbackPath: String) extends ExtensionId[PluginHolder] {
private class PluginHolderExtensionId(configPath: String, fallbackPath: String, additionalConfig: Config) extends ExtensionId[PluginHolder] {
def this(configPath: String, fallbackPath: String) = this(configPath, fallbackPath, ConfigFactory.empty)
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val mergedConfig = additionalConfig.withFallback(system.settings.config)
require(
!isEmpty(configPath) && system.settings.config.hasPath(configPath),
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val config: Config = system.settings.config.getConfig(configPath)
.withFallback(system.settings.config.getConfig(fallbackPath))
val config: Config = mergedConfig.getConfig(configPath)
.withFallback(mergedConfig.getConfig(fallbackPath))
val plugin: ActorRef = createPlugin(configPath, config)
val adapters: EventAdapters = createAdapters(configPath)
val adapters: EventAdapters = createAdapters(configPath, mergedConfig)
PluginHolder(plugin, adapters, config)
}

View file

@ -44,6 +44,12 @@ abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with Bef
def namedPersistentActor[T <: NamedPersistentActor: ClassTag] =
system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name))
/**
* Creates a persistent actor with current name as constructor argument, plus a custom [[Config]]
*/
def namedPersistentActorWithProvidedConfig[T <: NamedPersistentActor: ClassTag](providedConfig: Config) =
system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name, providedConfig))
override protected def beforeEach() {
_name = s"${namePrefix}-${counter.incrementAndGet()}"
}