diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index af3cc288e0..3d8ed629d3 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -1434,3 +1434,17 @@ Note that `journalPluginId` and `snapshotPluginId` must refer to properly config plugin entries with a standard `class` property as well as settings which are specific for those plugins, i.e.: @@snip [PersistenceMultiDocSpec.scala]($code$/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #override-config } + +## Give persistence plugin configurations at runtime + +By default, a persistent actor will use the configuration loaded at `ActorSystem` creation time to create journal and snapshot store plugins. + +When the persistent actor overrides the `journalPluginConfig` and `snapshotPluginConfig` methods, +the actor will use the declared `Config` objects with a fallback on the default configuration. +It allows a dynamic configuration of the journal and the snapshot store at runtime: + +Scala +: @@snip [PersistenceMultiDocSpec.scala]($code$/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #runtime-config } + +Java +: @@snip [PersistenceMultiDocTest.java]($code$/java/jdocs/persistence/PersistenceMultiDocTest.java) { #runtime-config } diff --git a/akka-docs/src/test/java/jdocs/persistence/PersistenceMultiDocTest.java b/akka-docs/src/test/java/jdocs/persistence/PersistenceMultiDocTest.java index d399e080f3..5cd2258464 100644 --- a/akka-docs/src/test/java/jdocs/persistence/PersistenceMultiDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/PersistenceMultiDocTest.java @@ -4,28 +4,85 @@ package jdocs.persistence; +import akka.persistence.AbstractPersistentActor; +import akka.persistence.RuntimePluginConfig; import akka.persistence.UntypedPersistentActor; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; public class PersistenceMultiDocTest { //#default-plugins - abstract class ActorWithDefaultPlugins extends UntypedPersistentActor { + abstract class AbstractPersistentActorWithDefaultPlugins extends AbstractPersistentActor { @Override - public String persistenceId() { return "123"; } + public String persistenceId() { + return "123"; + } } //#default-plugins //#override-plugins - abstract class ActorWithOverridePlugins extends UntypedPersistentActor { + abstract class AbstractPersistentActorWithOverridePlugins extends AbstractPersistentActor { @Override - public String persistenceId() { return "123"; } + public String persistenceId() { + return "123"; + } + // Absolute path to the journal plugin configuration entry in the `reference.conf` @Override - public String journalPluginId() { return "akka.persistence.chronicle.journal"; } + public String journalPluginId() { + return "akka.persistence.chronicle.journal"; + } + // Absolute path to the snapshot store plugin configuration entry in the `reference.conf` @Override - public String snapshotPluginId() { return "akka.persistence.chronicle.snapshot-store"; } + public String snapshotPluginId() { + return "akka.persistence.chronicle.snapshot-store"; + } } //#override-plugins + //#runtime-config + abstract class AbstractPersistentActorWithRuntimePluginConfig extends AbstractPersistentActor implements RuntimePluginConfig { + // Variable that is retrieved at runtime, from an external service for instance. + String runtimeDistinction = "foo"; + + @Override + public String persistenceId() { + return "123"; + } + + // Absolute path to the journal plugin configuration entry in the `reference.conf` + @Override + public String journalPluginId() { + return "journal-plugin-" + runtimeDistinction; + } + + // Absolute path to the snapshot store plugin configuration entry in the `reference.conf` + @Override + public String snapshotPluginId() { + return "snapshot-store-plugin-" + runtimeDistinction; + } + + // Configuration which contains the journal plugin id defined above + @Override + public Config journalPluginConfig() { + return ConfigFactory.empty().withValue( + "journal-plugin-" + runtimeDistinction, + getContext().getSystem().settings().config().getValue("journal-plugin") // or a very different configuration coming from an external service. + ); + } + + // Configuration which contains the snapshot store plugin id defined above + @Override + public Config snapshotPluginConfig() { + return ConfigFactory.empty().withValue( + "snapshot-plugin-" + runtimeDistinction, + getContext().getSystem().settings().config().getValue("snapshot-store-plugin") // or a very different configuration coming from an external service. + ); + } + + } + //#runtime-config + } diff --git a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java index 3873786bf2..792be88a6c 100644 --- a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java @@ -20,6 +20,7 @@ import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; +import com.typesafe.config.ConfigFactory; import docs.persistence.query.MyEventsByTagPublisher; import org.reactivestreams.Subscriber; import scala.concurrent.duration.FiniteDuration; diff --git a/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala index e5dff71716..5692e5f3fe 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala @@ -2,11 +2,13 @@ * Copyright (C) 2009-2018 Lightbend Inc. */ -import akka.persistence.PersistentActor +import akka.persistence.{ RuntimePluginConfig, PersistentActor } +import com.typesafe.config.ConfigFactory object PersistenceMultiDocSpec { - val DefaultConfig = """ + val DefaultConfig = + """ //#default-config # Absolute path to the default journal plugin configuration entry. akka.persistence.journal.plugin = "akka.persistence.journal.inmem" @@ -19,9 +21,11 @@ object PersistenceMultiDocSpec { trait ActorWithDefaultPlugins extends PersistentActor { override def persistenceId = "123" } + //#default-plugins - val OverrideConfig = s""" + val OverrideConfig = + s""" //#override-config # Configuration entry for the custom journal plugin, see `journalPluginId`. akka.persistence.chronicle.journal { @@ -43,11 +47,42 @@ object PersistenceMultiDocSpec { //#override-plugins trait ActorWithOverridePlugins extends PersistentActor { override def persistenceId = "123" + // Absolute path to the journal plugin configuration entry in the `reference.conf`. override def journalPluginId = "akka.persistence.chronicle.journal" + // Absolute path to the snapshot store plugin configuration entry in the `reference.conf`. override def snapshotPluginId = "akka.persistence.chronicle.snapshot-store" } + //#override-plugins + //#runtime-config + trait ActorWithRuntimePluginConfig extends PersistentActor with RuntimePluginConfig { + // Variable that is retrieved at runtime, from an external service for instance. + val runtimeDistinction = "foo" + + override def persistenceId = "123" + + // Absolute path to the journal plugin configuration entry, not defined in the `reference.conf`. + override def journalPluginId = s"journal-plugin-$runtimeDistinction" + + // Absolute path to the snapshot store plugin configuration entry, not defined in the `reference.conf`. + override def snapshotPluginId = s"snapshot-store-plugin-$runtimeDistinction" + + // Configuration which contains the journal plugin id defined above + override def journalPluginConfig = ConfigFactory.empty().withValue( + s"journal-plugin-$runtimeDistinction", + context.system.settings.config.getValue("journal-plugin") // or a very different configuration coming from an external service. + ) + + // Configuration which contains the snapshot store plugin id defined above + override def snapshotPluginConfig = ConfigFactory.empty().withValue( + s"snapshot-plugin-$runtimeDistinction", + context.system.settings.config.getValue("snapshot-store-plugin") // or a very different configuration coming from an external service. + ) + + } + + //#runtime-config } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index e7be21de5d..0077e657ae 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -5,11 +5,13 @@ package akka.persistence.query import java.util.concurrent.atomic.AtomicReference + import akka.actor._ import akka.event.Logging + import scala.annotation.tailrec import scala.util.Failure -import com.typesafe.config.Config +import com.typesafe.config.{ Config, ConfigFactory } /** * Persistence extension for queries. @@ -39,21 +41,33 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { /** Discovered query plugins. */ private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + /** + * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given + * read journal configuration entry. + * + * The provided readJournalPluginConfig will be used to configure the journal plugin instead of the actor system + * config. + */ + final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T = + readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T] + /** * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given * read journal configuration entry. */ final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T = - readJournalPluginFor(readJournalPluginId).scaladslPlugin.asInstanceOf[T] + readJournalFor(readJournalPluginId, ConfigFactory.empty) /** * Java API: Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given * read journal configuration entry. */ - final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = - readJournalPluginFor(readJournalPluginId).javadslPlugin.asInstanceOf[T] + final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T = + readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T] - @tailrec private def readJournalPluginFor(readJournalPluginId: String): PluginHolder = { + final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty()) + + @tailrec private def readJournalPluginFor(readJournalPluginId: String, readJournalPluginConfig: Config): PluginHolder = { val configPath = readJournalPluginId val extensionIdMap = readJournalPluginExtensionIds.get extensionIdMap.get(configPath) match { @@ -62,20 +76,21 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { case None ⇒ val extensionId = new ExtensionId[PluginHolder] { override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val provider = createPlugin(configPath) + val provider = createPlugin(configPath, readJournalPluginConfig) PluginHolder(provider.scaladslReadJournal(), provider.javadslReadJournal()) } } readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - readJournalPluginFor(readJournalPluginId) // Recursive invocation. + readJournalPluginFor(readJournalPluginId, readJournalPluginConfig) // Recursive invocation. } } - private def createPlugin(configPath: String): ReadJournalProvider = { + private def createPlugin(configPath: String, readJournalPluginConfig: Config): ReadJournalProvider = { + val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config) require( - !isEmpty(configPath) && system.settings.config.hasPath(configPath), + !isEmpty(configPath) && mergedConfig.hasPath(configPath), s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") - val pluginConfig = system.settings.config.getConfig(configPath) + val pluginConfig = mergedConfig.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") log.debug(s"Create plugin: ${configPath} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java index 140cdbd47b..31a9b438a1 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -7,6 +7,7 @@ package akka.persistence.query; import akka.NotUsed; import akka.actor.ActorSystem; import akka.testkit.AkkaJUnitActorSystemResource; +import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala index a7e8359919..fb91d5f798 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -13,7 +13,7 @@ import akka.actor.ExtendedActorSystem * Use for tests only! * Emits infinite stream of strings (representing queried for events). */ -class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery { +class DummyReadJournal(val dummyValue: String) extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery { override def persistenceIds(): Source[String, NotUsed] = Source.fromIterator(() ⇒ Iterator.from(0)).map(_.toString) } @@ -42,13 +42,19 @@ object DummyReadJournalProvider { ${DummyReadJournal.Identifier}4 { class = "${classOf[DummyReadJournalProvider4].getCanonicalName}" } + ${DummyReadJournal.Identifier}5 { + class = "${classOf[DummyReadJournalProvider5].getCanonicalName}" + } """) } -class DummyReadJournalProvider extends ReadJournalProvider { +class DummyReadJournalProvider(dummyValue: String) extends ReadJournalProvider { + + // mandatory zero-arg constructor + def this() = this("dummy") override val scaladslReadJournal: DummyReadJournal = - new DummyReadJournal + new DummyReadJournal(dummyValue) override val javadslReadJournal: DummyReadJournalForJava = new DummyReadJournalForJava(scaladslReadJournal) @@ -60,3 +66,7 @@ class DummyReadJournalProvider3(sys: ExtendedActorSystem, conf: Config) extends class DummyReadJournalProvider4(sys: ExtendedActorSystem, conf: Config, confPath: String) extends DummyReadJournalProvider +class DummyReadJournalProvider5(sys: ExtendedActorSystem) extends DummyReadJournalProvider + +class CustomDummyReadJournalProvider5(sys: ExtendedActorSystem) extends DummyReadJournalProvider("custom") + diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index f803bdf680..cae8379228 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -5,11 +5,12 @@ package akka.persistence.query import java.util.concurrent.atomic.AtomicInteger + import akka.actor.ActorSystem -import akka.persistence.journal.EventSeq -import akka.persistence.journal.ReadEventAdapter -import com.typesafe.config.ConfigFactory +import akka.persistence.journal.{ EventSeq, ReadEventAdapter } +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + import scala.concurrent.Await import scala.concurrent.duration._ @@ -17,21 +18,45 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte val eventAdaptersConfig = s""" - |akka.persistence.query.journal.dummy { - | event-adapters { - | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} - | } - |} + |akka.persistence.query.journal.dummy { + | event-adapters { + | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} + | } + |} """.stripMargin + val customReadJournalPluginConfig = + s""" + |${DummyReadJournal.Identifier}5 { + | class = "${classOf[CustomDummyReadJournalProvider5].getCanonicalName}" + |} + |${DummyReadJournal.Identifier}6 { + | class = "${classOf[DummyReadJournalProvider].getCanonicalName}" + |} + """.stripMargin + "ReadJournal" must { "be found by full config key" in { withActorSystem() { system ⇒ - PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier) + val readJournalPluginConfig: Config = ConfigFactory.parseString(customReadJournalPluginConfig) + PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier, readJournalPluginConfig) // other combinations of constructor parameters - PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "2") - PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "3") - PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "4") + PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier + "2", readJournalPluginConfig) + PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier + "3", readJournalPluginConfig) + PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier + "4", readJournalPluginConfig) + // config key existing within both the provided readJournalPluginConfig + // and the actorSystem config. The journal must be created from the provided config then. + val dummyReadJournal5 = PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier + "5", readJournalPluginConfig) + dummyReadJournal5.dummyValue should equal("custom") + // config key directly coming from the provided readJournalPluginConfig, + // and does not exist within the actorSystem config + PersistenceQuery.get(system).readJournalFor[DummyReadJournal]( + DummyReadJournal.Identifier + "6", readJournalPluginConfig) } } @@ -46,6 +71,7 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte } private val systemCounter = new AtomicInteger() + private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { val config = DummyReadJournalProvider.config @@ -60,8 +86,13 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte } object ExampleQueryModels { - case class OldModel(value: String) { def promote = NewModel(value) } + + case class OldModel(value: String) { + def promote = NewModel(value) + } + case class NewModel(value: String) + } class PrefixStringWithPAdapter extends ReadEventAdapter { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 179c10c1fb..80e11112a6 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -60,7 +60,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("a-1", "a-2") - .expectNoMsg(500.millis) + .expectNoMessage(500.millis) .request(2) .expectNext("a-3") .expectComplete() @@ -81,13 +81,13 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("f-1", "f-2") - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) ref ! "f-4" expectMsg("f-4-done") probe - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) .request(5) .expectNext("f-3") .expectComplete() // f-4 not seen @@ -186,13 +186,13 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi val probe = src.map(_.event).runWith(TestSink.probe[Any]) .request(2) .expectNext("e-1", "e-2") - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) ref ! "e-4" expectMsg("e-4-done") probe - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) .request(5) .expectNext("e-3") .expectNext("e-4") diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index 626cda80bf..84b89b6454 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -79,7 +79,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .request(2) .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) - .expectNoMsg(500.millis) + .expectNoMessage(500.millis) .request(2) .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() @@ -99,13 +99,13 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .request(2) .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) c ! "a green cucumber" expectMsg(s"a green cucumber-done") probe - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) .request(5) .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectComplete() // green cucumber not seen @@ -130,7 +130,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")) - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) d ! "a black dog" expectMsg(s"a black dog-done") @@ -139,7 +139,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) probe .expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog")) - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) .request(10) .expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night")) } @@ -151,7 +151,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) // note that banana is not included, since exclusive offset .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) - .expectNoMsg(100.millis) + .expectNoMessage(100.millis) } } diff --git a/akka-persistence/src/main/mima-filters/2.5.11.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.11.backwards.excludes index 38469c2f05..421915f686 100644 --- a/akka-persistence/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-persistence/src/main/mima-filters/2.5.11.backwards.excludes @@ -1,4 +1,10 @@ +# #23841 Remove backward compatibility on some akka-persistence internal API +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.journalConfigFor") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.journalFor") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Persistence.snapshotStoreFor") + # #24508 Adding defer method to PersistentActor ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentActor.defer") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDefer") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AbstractPersistentActorLike.defer") + diff --git a/akka-persistence/src/main/mima-filters/2.5.8.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.8.backwards.excludes index 775cd9ff84..e3de60161f 100644 --- a/akka-persistence/src/main/mima-filters/2.5.8.backwards.excludes +++ b/akka-persistence/src/main/mima-filters/2.5.8.backwards.excludes @@ -1,2 +1,2 @@ # Missed out in previous releases -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithTimers") \ No newline at end of file +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithTimers") diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 7d05c75cf2..1ada334605 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import akka.dispatch.Envelope import akka.event.{ Logging, LoggingAdapter } import akka.util.Helpers.ConfigOps +import com.typesafe.config.ConfigFactory import scala.collection.immutable import scala.concurrent.duration.FiniteDuration @@ -26,8 +27,10 @@ private[persistence] object Eventsourced { /** INTERNAL API */ private[akka] sealed trait PendingHandlerInvocation { def evt: Any + def handler: Any ⇒ Unit } + /** INTERNAL API: forces actor to stash incoming commands until all these invocations are handled */ private[akka] final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation /** INTERNAL API: does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ @@ -58,15 +61,34 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private val extension = Persistence(context.system) - private[persistence] lazy val journal = extension.journalFor(journalPluginId) - private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId) + private[persistence] lazy val journal = { + val journalPluginConfig = this match { + case c: RuntimePluginConfig ⇒ c.journalPluginConfig + case _ ⇒ ConfigFactory.empty + } + extension.journalFor(journalPluginId, journalPluginConfig) + } + + private[persistence] lazy val snapshotStore = { + val snapshotPluginConfig = this match { + case c: RuntimePluginConfig ⇒ c.snapshotPluginConfig + case _ ⇒ ConfigFactory.empty + } + extension.snapshotStoreFor(snapshotPluginId, snapshotPluginConfig) + } private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement() private val writerUuid = UUID.randomUUID.toString private var journalBatch = Vector.empty[PersistentEnvelope] // no longer used, but kept for binary compatibility - private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size") + private val maxMessageBatchSize = { + val journalPluginConfig = this match { + case c: RuntimePluginConfig ⇒ c.journalPluginConfig + case _ ⇒ ConfigFactory.empty + } + extension.journalConfigFor(journalPluginId, journalPluginConfig).getInt("max-message-batch-size") + } private var writeInProgress = false private var sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L @@ -77,7 +99,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands private var pendingStashingPersistInvocations: Long = 0 // Holds user-supplied callbacks for persist/persistAsync calls - private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it + private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() + // we only append / isEmpty / get(0) on it private var eventBatch: List[PersistentEnvelope] = Nil private val internalStash = createStash() @@ -120,7 +143,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * * @param cause failure cause. * @param event the event that was processed in `receiveRecover`, if the exception - * was thrown there + * was thrown there */ protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = event match { @@ -183,7 +206,14 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas if (all) internalStash.unstashAll() else internalStash.unstash() private def startRecovery(recovery: Recovery): Unit = { - changeState(recoveryStarted(recovery.replayMax)) + val timeout = { + val journalPluginConfig = this match { + case c: RuntimePluginConfig ⇒ c.journalPluginConfig + case _ ⇒ ConfigFactory.empty + } + extension.journalConfigFor(journalPluginId, journalPluginConfig).getMillisDuration("recovery-event-timeout") + } + changeState(recoveryStarted(recovery.replayMax, timeout)) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) } @@ -197,7 +227,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas require(persistenceId.trim.nonEmpty, s"persistenceId cannot be empty for PersistentActor [${self.path}]") // Fail fast on missing plugins. - val j = journal; val s = snapshotStore + val j = journal; + val s = snapshotStore requestRecoveryPermit() super.aroundPreStart() } @@ -431,6 +462,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private trait State { def stateReceive(receive: Receive, message: Any): Unit + def recoveryRunning: Boolean } @@ -445,6 +477,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private def waitingRecoveryPermit(recovery: Recovery) = new State { override def toString: String = s"waiting for recovery permit" + override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { @@ -463,11 +496,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * All incoming messages are stashed. * * @param replayMax maximum number of messages to replay. + * @param timeout recovery event timeout */ - private def recoveryStarted(replayMax: Long) = new State { + private def recoveryStarted(replayMax: Long, timeout: FiniteDuration) = new State { - // protect against snapshot stalling forever because of journal overloaded and such - val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") val timeoutCancellable = { import context.dispatcher context.system.scheduler.scheduleOnce(timeout, self, RecoveryTick(snapshot = true)) @@ -494,6 +526,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } override def toString: String = s"recovery started (replayMax = [$replayMax])" + override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = try message match { @@ -651,7 +684,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas try { peekApplyHandler(p.payload) onWriteMessageComplete(err = false) - } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } + } catch { + case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e + } } case WriteMessageRejected(p, cause, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart @@ -675,7 +710,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas try { peekApplyHandler(l) onWriteMessageComplete(err = false) - } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } + } catch { + case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e + } } case WriteMessagesSuccessful ⇒ writeInProgress = false @@ -705,7 +742,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas else try { Eventsourced.super.aroundReceive(receive, message) aroundReceiveComplete(err = false) - } catch { case NonFatal(e) ⇒ aroundReceiveComplete(err = true); throw e } + } catch { + case NonFatal(e) ⇒ aroundReceiveComplete(err = true); throw e + } private def aroundReceiveComplete(err: Boolean): Unit = { if (eventBatch.nonEmpty) flushBatch() diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 510713d88e..f35611eba5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -12,7 +12,7 @@ import akka.event.{ Logging, LoggingAdapter } import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } import akka.util.Collections.EmptyImmutableSeq import akka.util.Helpers.ConfigOps -import com.typesafe.config.Config +import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec import scala.concurrent.duration._ @@ -68,6 +68,7 @@ final class PersistenceSettings(config: Config) { } } + } /** @@ -82,7 +83,7 @@ trait PersistenceIdentity { def persistenceId: String /** - * Configuration id of the journal plugin servicing this persistent actor or view. + * Configuration id of the journal plugin servicing this persistent actor. * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. @@ -90,14 +91,14 @@ trait PersistenceIdentity { def journalPluginId: String = "" /** - * Configuration id of the snapshot plugin servicing this persistent actor or view. + * Configuration id of the snapshot plugin servicing this persistent actor. * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ def snapshotPluginId: String = "" - } + //#persistence-identity trait PersistenceRecovery { @@ -110,6 +111,7 @@ trait PersistenceRecovery { * To skip recovery completely return `Recovery.none`. */ def recovery: Recovery = Recovery() + //#persistence-recovery } @@ -122,14 +124,41 @@ trait PersistenceStash extends Stash with StashFactory { Persistence(context.system).defaultInternalStashOverflowStrategy } +trait RuntimePluginConfig { + /** + * Additional configuration of the journal plugin servicing this persistent actor. + * When empty, the whole configuration of the journal plugin will be taken from the [[Config]] loaded into the + * [[ActorSystem]]. + * When configured, the journal plugin configuration will be taken from this [[Config]] merged with the [[Config]] + * loaded into the [[ActorSystem]]. + * + * @return an additional configuration used to configure the journal plugin. + */ + def journalPluginConfig: Config + + /** + * Additional configuration of the snapshot plugin servicing this persistent actor. + * When empty, the whole configuration of the snapshot plugin will be taken from the [[Config]] loaded into the + * [[ActorSystem]]. + * When configured, the snapshot plugin configuration will be taken from this [[Config]] merged with the [[Config]] + * loaded into the [[ActorSystem]]. + * + * @return an additional configuration used to configure the snapshot plugin. + */ + def snapshotPluginConfig: Config +} + /** * Persistence extension provider. */ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { /** Java API. */ override def get(system: ActorSystem): Persistence = super.get(system) + def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) + def lookup() = Persistence + /** INTERNAL API. */ private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config) extends Extension @@ -145,6 +174,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { * Persistence extension. */ class Persistence(val system: ExtendedActorSystem) extends Extension { + import Persistence._ private def log: LoggingAdapter = Logging(system, getClass.getName) @@ -215,8 +245,19 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * adapter for each class, otherwise the most specific adapter matching a given class will be returned. */ final def adaptersFor(journalPluginId: String): EventAdapters = { + adaptersFor(journalPluginId: String, ConfigFactory.empty) + } + + /** + * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. + * If no adapters are registered for a given journal the EventAdapters object will simply return the identity + * adapter for each class, otherwise the most specific adapter matching a given class will be returned. + * + * The provided journalPluginConfig will be used to configure the plugin instead of the actor system config. + */ + final def adaptersFor(journalPluginId: String, journalPluginConfig: Config): EventAdapters = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, JournalFallbackConfigPath).adapters + pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).adapters } /** @@ -238,9 +279,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. */ - private[akka] final def journalConfigFor(journalPluginId: String): Config = { + private[akka] final def journalConfigFor(journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): Config = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, JournalFallbackConfigPath).config + pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).config } /** @@ -262,9 +303,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - private[akka] final def journalFor(journalPluginId: String): ActorRef = { + private[akka] final def journalFor(journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, JournalFallbackConfigPath).actor + pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).actor } /** @@ -275,20 +316,20 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + private[akka] final def snapshotStoreFor(snapshotPluginId: String, snapshotPluginConfig: Config = ConfigFactory.empty): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId - pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).actor + pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath, snapshotPluginConfig).actor } - @tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = { + @tailrec private def pluginHolderFor(configPath: String, fallbackPath: String, additionalConfig: Config): PluginHolder = { val extensionIdMap = pluginExtensionId.get extensionIdMap.get(configPath) match { case Some(extensionId) ⇒ extensionId(system) case None ⇒ - val extensionId = new PluginHolderExtensionId(configPath, fallbackPath) + val extensionId = new PluginHolderExtensionId(configPath, fallbackPath, additionalConfig) pluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - pluginHolderFor(configPath, fallbackPath) // Recursive invocation. + pluginHolderFor(configPath, fallbackPath, additionalConfig) // Recursive invocation. } } @@ -310,14 +351,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { try { Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found List(pluginConfig) - } catch { case NonFatal(_) ⇒ Nil } // otherwise use empty constructor + } catch { + case NonFatal(_) ⇒ Nil + } // otherwise use empty constructor } val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) system.systemActorOf(pluginActorProps, pluginActorName) } - private def createAdapters(configPath: String): EventAdapters = { - val pluginConfig = system.settings.config.getConfig(configPath) + private def createAdapters(configPath: String, additionalConfig: Config): EventAdapters = { + val pluginConfig = additionalConfig.withFallback(system.settings.config).getConfig(configPath) EventAdapters(system, pluginConfig) } @@ -326,15 +369,18 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def id(ref: ActorRef) = ref.path.toStringWithoutAddress - private class PluginHolderExtensionId(configPath: String, fallbackPath: String) extends ExtensionId[PluginHolder] { + private class PluginHolderExtensionId(configPath: String, fallbackPath: String, additionalConfig: Config) extends ExtensionId[PluginHolder] { + def this(configPath: String, fallbackPath: String) = this(configPath, fallbackPath, ConfigFactory.empty) + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val mergedConfig = additionalConfig.withFallback(system.settings.config) require( - !isEmpty(configPath) && system.settings.config.hasPath(configPath), + !isEmpty(configPath) && mergedConfig.hasPath(configPath), s"'reference.conf' is missing persistence plugin config path: '$configPath'") - val config: Config = system.settings.config.getConfig(configPath) - .withFallback(system.settings.config.getConfig(fallbackPath)) + val config: Config = mergedConfig.getConfig(configPath) + .withFallback(mergedConfig.getConfig(fallbackPath)) val plugin: ActorRef = createPlugin(configPath, config) - val adapters: EventAdapters = createAdapters(configPath) + val adapters: EventAdapters = createAdapters(configPath, mergedConfig) PluginHolder(plugin, adapters, config) } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 440462aea6..00f7cd55e2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -44,6 +44,12 @@ abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with Bef def namedPersistentActor[T <: NamedPersistentActor: ClassTag] = system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name)) + /** + * Creates a persistent actor with current name as constructor argument, plus a custom [[Config]] + */ + def namedPersistentActorWithProvidedConfig[T <: NamedPersistentActor: ClassTag](providedConfig: Config) = + system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name, providedConfig)) + override protected def beforeEach() { _name = s"${namePrefix}-${counter.incrementAndGet()}" } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index c4ab313729..32561869aa 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -7,8 +7,9 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import akka.actor._ +import akka.persistence.PersistentActorSpec._ import akka.testkit.{ EventFilter, ImplicitSender, TestLatch, TestProbe } -import com.typesafe.config.Config +import com.typesafe.config.{ Config, ConfigFactory } import scala.collection.immutable.Seq import scala.concurrent.Await @@ -17,9 +18,13 @@ import scala.util.Random import scala.util.control.NoStackTrace object PersistentActorSpec { + final case class Cmd(data: Any) + final case class Evt(data: Any) + final case class LatchCmd(latch: TestLatch, data: Any) extends NoSerializationVerificationNeeded + final case class Delete(toSequenceNr: Long) abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { @@ -42,6 +47,30 @@ object PersistentActorSpec { def receiveRecover = updateState } + trait LevelDbRuntimePluginConfig extends PersistenceIdentity with RuntimePluginConfig { + val providedConfig: Config + + override def journalPluginId: String = s"custom.persistence.journal.leveldb" + + override def snapshotPluginId: String = "custom.persistence.snapshot-store.local" + + override def journalPluginConfig: Config = providedConfig + + override def snapshotPluginConfig: Config = providedConfig + } + + trait InmemRuntimePluginConfig extends PersistenceIdentity with RuntimePluginConfig { + val providedConfig: Config + + override def journalPluginId: String = s"custom.persistence.journal.inmem" + + override def snapshotPluginId: String = "custom.persistence.snapshot-store.local" + + override def journalPluginConfig: Config = providedConfig + + override def snapshotPluginConfig: Config = providedConfig + } + class Behavior1PersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ @@ -56,12 +85,17 @@ object PersistentActorSpec { case Evt(data) ⇒ sender() ! s"Rejected: $data" case _ ⇒ super.onPersistRejected(cause, event, seqNr) } + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = event match { case Evt(data) ⇒ sender() ! s"Failure: $data" case _ ⇒ super.onPersistFailure(cause, event, seqNr) } } + class Behavior1PersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior1PersistentActor(name) with LevelDbRuntimePluginConfig + class Behavior1PersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior1PersistentActor(name) with InmemRuntimePluginConfig class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { @@ -70,6 +104,10 @@ object PersistentActorSpec { persistAll(Seq(Evt(s"${data}-3"), Evt(s"${data}-4")))(updateState) } } + class Behavior2PersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior2PersistentActor(name) with LevelDbRuntimePluginConfig + class Behavior2PersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior2PersistentActor(name) with InmemRuntimePluginConfig class Behavior3PersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { @@ -78,6 +116,10 @@ object PersistentActorSpec { updateState(Evt(s"${data}-10")) } } + class Behavior3PersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior3PersistentActor(name) with LevelDbRuntimePluginConfig + class Behavior3PersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends Behavior3PersistentActor(name) with InmemRuntimePluginConfig class ChangeBehaviorInLastEventHandlerPersistentActor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { @@ -97,6 +139,10 @@ object PersistentActorSpec { } } } + class ChangeBehaviorInLastEventHandlerPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInLastEventHandlerPersistentActor(name) with LevelDbRuntimePluginConfig + class ChangeBehaviorInLastEventHandlerPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInLastEventHandlerPersistentActor(name) with InmemRuntimePluginConfig class ChangeBehaviorInFirstEventHandlerPersistentActor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { @@ -116,6 +162,10 @@ object PersistentActorSpec { } } } + class ChangeBehaviorInFirstEventHandlerPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInFirstEventHandlerPersistentActor(name) with LevelDbRuntimePluginConfig + class ChangeBehaviorInFirstEventHandlerPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInFirstEventHandlerPersistentActor(name) with InmemRuntimePluginConfig class ChangeBehaviorInCommandHandlerFirstPersistentActor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { @@ -131,6 +181,10 @@ object PersistentActorSpec { persist(Evt(s"${data}-0"))(updateState) } } + class ChangeBehaviorInCommandHandlerFirstPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInCommandHandlerFirstPersistentActor(name) with LevelDbRuntimePluginConfig + class ChangeBehaviorInCommandHandlerFirstPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInCommandHandlerFirstPersistentActor(name) with InmemRuntimePluginConfig class ChangeBehaviorInCommandHandlerLastPersistentActor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { @@ -146,6 +200,10 @@ object PersistentActorSpec { context.become(newBehavior) } } + class ChangeBehaviorInCommandHandlerLastPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInCommandHandlerLastPersistentActor(name) with LevelDbRuntimePluginConfig + class ChangeBehaviorInCommandHandlerLastPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends ChangeBehaviorInCommandHandlerLastPersistentActor(name) with InmemRuntimePluginConfig class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { override def receiveRecover = super.receiveRecover orElse { @@ -164,6 +222,10 @@ object PersistentActorSpec { case "snap" ⇒ saveSnapshot(events) } } + class SnapshottingPersistentActorWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends SnapshottingPersistentActor(name, probe) with LevelDbRuntimePluginConfig + class SnapshottingPersistentActorWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends SnapshottingPersistentActor(name, probe) with InmemRuntimePluginConfig class SnapshottingBecomingPersistentActor(name: String, probe: ActorRef) extends SnapshottingPersistentActor(name, probe) { val becomingRecover: Receive = { @@ -181,12 +243,20 @@ object PersistentActorSpec { case "It's changing me" ⇒ probe ! "I am becoming" } } + class SnapshottingBecomingPersistentActorWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends SnapshottingBecomingPersistentActor(name, probe) with LevelDbRuntimePluginConfig + class SnapshottingBecomingPersistentActorWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends SnapshottingBecomingPersistentActor(name, probe) with InmemRuntimePluginConfig class ReplyInEventHandlerPersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender() ! evt.data) } } + class ReplyInEventHandlerPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends ReplyInEventHandlerPersistentActor(name) with LevelDbRuntimePluginConfig + class ReplyInEventHandlerPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends ReplyInEventHandlerPersistentActor(name) with InmemRuntimePluginConfig class AsyncPersistPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -211,6 +281,11 @@ object PersistentActorSpec { } } + class AsyncPersistPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistPersistentActor(name) with LevelDbRuntimePluginConfig + class AsyncPersistPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistPersistentActor(name) with InmemRuntimePluginConfig + class AsyncPersistThreeTimesPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -230,6 +305,11 @@ object PersistentActorSpec { counter } } + class AsyncPersistThreeTimesPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistThreeTimesPersistentActor(name) with LevelDbRuntimePluginConfig + class AsyncPersistThreeTimesPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistThreeTimesPersistentActor(name) with InmemRuntimePluginConfig + class AsyncPersistSameEventTwicePersistentActor(name: String) extends ExamplePersistentActor(name) { // atomic because used from inside the *async* callbacks @@ -248,6 +328,11 @@ object PersistentActorSpec { persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" } } } + class AsyncPersistSameEventTwicePersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistSameEventTwicePersistentActor(name) with LevelDbRuntimePluginConfig + class AsyncPersistSameEventTwicePersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistSameEventTwicePersistentActor(name) with InmemRuntimePluginConfig + class PersistAllNilPersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { @@ -264,6 +349,11 @@ object PersistentActorSpec { sender() ! data } } + class PersistAllNilPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends PersistAllNilPersistentActor(name) with LevelDbRuntimePluginConfig + class PersistAllNilPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends PersistAllNilPersistentActor(name) with InmemRuntimePluginConfig + class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -291,6 +381,11 @@ object PersistentActorSpec { counter } } + class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name) with LevelDbRuntimePluginConfig + class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name) with InmemRuntimePluginConfig + class AsyncPersistAndPersistMixedSyncAsyncPersistentActor(name: String) extends ExamplePersistentActor(name) { var sendMsgCounter = 0 @@ -313,6 +408,10 @@ object PersistentActorSpec { sendMsgCounter } } + class AsyncPersistAndPersistMixedSyncAsyncPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistAndPersistMixedSyncAsyncPersistentActor(name) with LevelDbRuntimePluginConfig + class AsyncPersistAndPersistMixedSyncAsyncPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistAndPersistMixedSyncAsyncPersistentActor(name) with InmemRuntimePluginConfig class AsyncPersistHandlerCorrelationCheck(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -332,12 +431,20 @@ object PersistentActorSpec { counter } } + class AsyncPersistHandlerCorrelationCheckWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistHandlerCorrelationCheck(name) with LevelDbRuntimePluginConfig + class AsyncPersistHandlerCorrelationCheckWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AsyncPersistHandlerCorrelationCheck(name) with InmemRuntimePluginConfig class AnyValEventPersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(5)(evt ⇒ sender() ! evt) } } + class AnyValEventPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends AnyValEventPersistentActor(name) with LevelDbRuntimePluginConfig + class AnyValEventPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends AnyValEventPersistentActor(name) with InmemRuntimePluginConfig class HandleRecoveryFinishedEventPersistentActor(name: String, probe: ActorRef) extends SnapshottingPersistentActor(name, probe) { val sendingRecover: Receive = { @@ -359,6 +466,11 @@ object PersistentActorSpec { } } + class HandleRecoveryFinishedEventPersistentActorWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends HandleRecoveryFinishedEventPersistentActor(name, probe) with LevelDbRuntimePluginConfig + class HandleRecoveryFinishedEventPersistentActorWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends HandleRecoveryFinishedEventPersistentActor(name, probe) with InmemRuntimePluginConfig + trait DeferActor extends PersistentActor { def doDefer[A](event: A)(handler: A ⇒ Unit): Unit } @@ -381,6 +493,15 @@ object PersistentActorSpec { } class DeferringAsyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferAsync class DeferringSyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferSync + class DeferringAsyncWithPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringSyncWithPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringAsyncWithPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithPersistActor(name) with InmemRuntimePluginConfig + class DeferringSyncWithPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithPersistActor(name) with InmemRuntimePluginConfig + abstract class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ @@ -392,6 +513,15 @@ object PersistentActorSpec { } class DeferringAsyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferAsync class DeferringSyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferSync + class DeferringAsyncWithAsyncPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithAsyncPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringSyncWithAsyncPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithAsyncPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringAsyncWithAsyncPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithAsyncPersistActor(name) with InmemRuntimePluginConfig + class DeferringSyncWithAsyncPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithAsyncPersistActor(name) with InmemRuntimePluginConfig + abstract class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ @@ -405,6 +535,15 @@ object PersistentActorSpec { } class DeferringAsyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferAsync class DeferringSyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferSync + class DeferringAsyncMixedCallsPPADDPADPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncMixedCallsPPADDPADPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringSyncMixedCallsPPADDPADPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncMixedCallsPPADDPADPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringAsyncMixedCallsPPADDPADPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncMixedCallsPPADDPADPersistActor(name) with InmemRuntimePluginConfig + class DeferringSyncMixedCallsPPADDPADPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncMixedCallsPPADDPADPersistActor(name) with InmemRuntimePluginConfig + abstract class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(_) ⇒ @@ -415,6 +554,15 @@ object PersistentActorSpec { } class DeferringAsyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferAsync class DeferringSyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferSync + class DeferringAsyncWithNoPersistCallsPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithNoPersistCallsPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringSyncWithNoPersistCallsPersistActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithNoPersistCallsPersistActor(name) with LevelDbRuntimePluginConfig + class DeferringAsyncWithNoPersistCallsPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncWithNoPersistCallsPersistActor(name) with InmemRuntimePluginConfig + class DeferringSyncWithNoPersistCallsPersistActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncWithNoPersistCallsPersistActor(name) with InmemRuntimePluginConfig + abstract class DeferringActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ @@ -427,6 +575,14 @@ object PersistentActorSpec { } class DeferringAsyncActor(name: String) extends DeferringActor(name) with DeferAsync class DeferringSyncActor(name: String) extends DeferringActor(name) with DeferSync + class DeferringAsyncActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncActor(name) with LevelDbRuntimePluginConfig + class DeferringSyncActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncActor(name) with LevelDbRuntimePluginConfig + class DeferringAsyncActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringAsyncActor(name) with InmemRuntimePluginConfig + class DeferringSyncActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends DeferringSyncActor(name) with InmemRuntimePluginConfig class StressOrdering(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { @@ -441,6 +597,10 @@ object PersistentActorSpec { sender() ! s } } + class StressOrderingWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends StressOrdering(name) with LevelDbRuntimePluginConfig + class StressOrderingWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends StressOrdering(name) with InmemRuntimePluginConfig class RecoverMessageCausedRestart(name: String) extends NamedPersistentActor(name) { var master: ActorRef = _ @@ -463,6 +623,10 @@ object PersistentActorSpec { } } + class RecoverMessageCausedRestartWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends RecoverMessageCausedRestart(name) with LevelDbRuntimePluginConfig + class RecoverMessageCausedRestartWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends RecoverMessageCausedRestart(name) with InmemRuntimePluginConfig class MultipleAndNestedPersists(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { @@ -478,6 +642,11 @@ object PersistentActorSpec { } } } + class MultipleAndNestedPersistsWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends MultipleAndNestedPersists(name, probe) with LevelDbRuntimePluginConfig + class MultipleAndNestedPersistsWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends MultipleAndNestedPersists(name, probe) with InmemRuntimePluginConfig + class MultipleAndNestedPersistAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case s: String ⇒ @@ -492,6 +661,11 @@ object PersistentActorSpec { } } } + class MultipleAndNestedPersistAsyncsWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends MultipleAndNestedPersistAsyncs(name, probe) with LevelDbRuntimePluginConfig + class MultipleAndNestedPersistAsyncsWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends MultipleAndNestedPersistAsyncs(name, probe) with InmemRuntimePluginConfig + class DeeplyNestedPersistAsyncs(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) { var currentDepths = Map.empty[String, Int].withDefaultValue(1) @@ -513,6 +687,10 @@ object PersistentActorSpec { persistAsync(s + "-" + 1)(weMustGoDeeper) } } + class DeeplyNestedPersistAsyncsWithLevelDbRuntimePluginConfig(name: String, maxDepth: Int, probe: ActorRef, val providedConfig: Config) + extends DeeplyNestedPersistAsyncs(name, maxDepth, probe) with LevelDbRuntimePluginConfig + class DeeplyNestedPersistAsyncsWithInmemRuntimePluginConfig(name: String, maxDepth: Int, probe: ActorRef, val providedConfig: Config) + extends DeeplyNestedPersistAsyncs(name, maxDepth, probe) with InmemRuntimePluginConfig class NestedPersistNormalAndAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { @@ -532,6 +710,11 @@ object PersistentActorSpec { } } } + class NestedPersistNormalAndAsyncsWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistNormalAndAsyncs(name, probe) with LevelDbRuntimePluginConfig + class NestedPersistNormalAndAsyncsWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistNormalAndAsyncs(name, probe) with InmemRuntimePluginConfig + class NestedPersistAsyncsAndNormal(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case s: String ⇒ @@ -550,6 +733,11 @@ object PersistentActorSpec { } } } + class NestedPersistAsyncsAndNormalWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistAsyncsAndNormal(name, probe) with LevelDbRuntimePluginConfig + class NestedPersistAsyncsAndNormalWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistAsyncsAndNormal(name, probe) with InmemRuntimePluginConfig + class NestedPersistInAsyncEnforcesStashing(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case s: String ⇒ @@ -565,6 +753,10 @@ object PersistentActorSpec { } } } + class NestedPersistInAsyncEnforcesStashingWithLevelDbRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistInAsyncEnforcesStashing(name, probe) with LevelDbRuntimePluginConfig + class NestedPersistInAsyncEnforcesStashingWithInmemRuntimePluginConfig(name: String, probe: ActorRef, val providedConfig: Config) + extends NestedPersistInAsyncEnforcesStashing(name, probe) with InmemRuntimePluginConfig class DeeplyNestedPersists(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) { var currentDepths = Map.empty[String, Int].withDefaultValue(1) @@ -587,12 +779,18 @@ object PersistentActorSpec { persist(s + "-" + 1)(weMustGoDeeper) } } + class DeeplyNestedPersistsWithLevelDbRuntimePluginConfig(name: String, maxDepth: Int, probe: ActorRef, val providedConfig: Config) + extends DeeplyNestedPersists(name, maxDepth, probe) with LevelDbRuntimePluginConfig + class DeeplyNestedPersistsWithInmemRuntimePluginConfig(name: String, maxDepth: Int, probe: ActorRef, val providedConfig: Config) + extends DeeplyNestedPersists(name, maxDepth, probe) with InmemRuntimePluginConfig class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor { override def persistenceId: String = "StackableTestPersistentActor" def receiveCommand = { - case "restart" ⇒ throw new Exception("triggering restart") with NoStackTrace { override def toString = "Boom!" } + case "restart" ⇒ throw new Exception("triggering restart") with NoStackTrace { + override def toString = "Boom!" + } } def receiveRecover = { @@ -620,9 +818,15 @@ object PersistentActorSpec { } } + class StackableTestPersistentActorWithLevelDbRuntimePluginConfig(probe: ActorRef, val providedConfig: Config) + extends StackableTestPersistentActor(probe) with LevelDbRuntimePluginConfig + class StackableTestPersistentActorWithInmemRuntimePluginConfig(probe: ActorRef, val providedConfig: Config) + extends StackableTestPersistentActor(probe) with InmemRuntimePluginConfig object StackableTestPersistentActor { - trait BaseActor extends Actor { this: StackableTestPersistentActor ⇒ + + trait BaseActor extends Actor { + this: StackableTestPersistentActor ⇒ override protected[akka] def aroundPreStart() = { probe ! "base aroundPreStart" super.aroundPreStart() @@ -644,12 +848,15 @@ object PersistentActorSpec { } override protected[akka] def aroundReceive(receive: Receive, message: Any) = { - if (message == "restart" && recoveryFinished) { probe ! s"base aroundReceive $message" } + if (message == "restart" && recoveryFinished) { + probe ! s"base aroundReceive $message" + } super.aroundReceive(receive, message) } } - trait MixinActor extends Actor { this: StackableTestPersistentActor ⇒ + trait MixinActor extends Actor { + this: StackableTestPersistentActor ⇒ override protected[akka] def aroundPreStart() = { probe ! "mixin aroundPreStart" super.aroundPreStart() @@ -671,10 +878,13 @@ object PersistentActorSpec { } override protected[akka] def aroundReceive(receive: Receive, message: Any) = { - if (message == "restart" && recoveryFinished) { probe ! s"mixin aroundReceive $message" } + if (message == "restart" && recoveryFinished) { + probe ! s"mixin aroundReceive $message" + } super.aroundReceive(receive, message) } } + } class PersistInRecovery(name: String) extends ExamplePersistentActor(name) { @@ -694,6 +904,10 @@ object PersistentActorSpec { case Cmd(d) ⇒ persist(Evt(d))(updateState) } } + class PersistInRecoveryWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) + extends PersistInRecovery(name) with LevelDbRuntimePluginConfig + class PersistInRecoveryWithInmemRuntimePluginConfig(name: String, val providedConfig: Config) + extends PersistInRecovery(name) with InmemRuntimePluginConfig class ExceptionActor(name: String) extends ExamplePersistentActor(name) { override def receiveCommand = commonBehavior @@ -702,17 +916,106 @@ object PersistentActorSpec { } abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { + import PersistentActorSpec._ override protected def beforeEach() { super.beforeEach() - val persistentActor = namedPersistentActor[Behavior1PersistentActor] + val persistentActor = behavior1PersistentActor persistentActor ! Cmd("a") persistentActor ! GetState expectMsg(List("a-1", "a-2")) } + protected def behavior1PersistentActor: ActorRef = namedPersistentActor[Behavior1PersistentActor] + + protected def behavior2PersistentActor: ActorRef = namedPersistentActor[Behavior2PersistentActor] + + protected def behavior3PersistentActor: ActorRef = namedPersistentActor[Behavior3PersistentActor] + + protected def changeBehaviorInFirstEventHandlerPersistentActor: ActorRef = namedPersistentActor[ChangeBehaviorInFirstEventHandlerPersistentActor] + + protected def changeBehaviorInLastEventHandlerPersistentActor: ActorRef = namedPersistentActor[ChangeBehaviorInLastEventHandlerPersistentActor] + + protected def changeBehaviorInCommandHandlerFirstPersistentActor: ActorRef = namedPersistentActor[ChangeBehaviorInCommandHandlerFirstPersistentActor] + + protected def changeBehaviorInCommandHandlerLastPersistentActor: ActorRef = namedPersistentActor[ChangeBehaviorInCommandHandlerLastPersistentActor] + + protected def snapshottingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + + protected def snapshottingBecomingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingBecomingPersistentActor], name, testActor)) + + protected def replyInEventHandlerPersistentActor: ActorRef = namedPersistentActor[ReplyInEventHandlerPersistentActor] + + protected def anyValEventPersistentActor: ActorRef = namedPersistentActor[AnyValEventPersistentActor] + + protected def asyncPersistPersistentActor: ActorRef = namedPersistentActor[AsyncPersistPersistentActor] + + protected def asyncPersistThreeTimesPersistentActor: ActorRef = namedPersistentActor[AsyncPersistThreeTimesPersistentActor] + + protected def asyncPersistSameEventTwicePersistentActor: ActorRef = namedPersistentActor[AsyncPersistSameEventTwicePersistentActor] + + protected def persistAllNilPersistentActor: ActorRef = namedPersistentActor[PersistAllNilPersistentActor] + + protected def asyncPersistAndPersistMixedSyncAsyncSyncPersistentActor: ActorRef = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor] + + protected def asyncPersistAndPersistMixedSyncAsyncPersistentActor: ActorRef = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncPersistentActor] + + protected def asyncPersistHandlerCorrelationCheck: ActorRef = namedPersistentActor[AsyncPersistHandlerCorrelationCheck] + + protected def deferringWithPersistActor: ActorRef = namedPersistentActor[DeferringWithPersistActor] + + protected def deferringWithAsyncPersistActor: ActorRef = namedPersistentActor[DeferringWithAsyncPersistActor] + + protected def deferringMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActor[DeferringMixedCallsPPADDPADPersistActor] + + protected def deferringWithNoPersistCallsPersistActor: ActorRef = namedPersistentActor[DeferringWithNoPersistCallsPersistActor] + + protected def handleRecoveryFinishedEventPersistentActor: ActorRef = system.actorOf(Props(classOf[HandleRecoveryFinishedEventPersistentActor], name, testActor)) + + protected def stressOrdering: ActorRef = namedPersistentActor[StressOrdering] + + protected def stackableTestPersistentActor: ActorRef = system.actorOf(Props(classOf[StackableTestPersistentActor], testActor)) + + protected def multipleAndNestedPersists: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersists], name, testActor)) + + protected def multipleAndNestedPersistAsyncs: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncs], name, testActor)) + + protected def deeplyNestedPersists(nestedPersists: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersists], name, nestedPersists, testActor)) + + protected def deeplyNestedPersistAsyncs(nestedPersistAsyncs: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncs], name, nestedPersistAsyncs, testActor)) + + protected def nestedPersistNormalAndAsyncs: ActorRef = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncs], name, testActor)) + + protected def nestedPersistAsyncsAndNormal: ActorRef = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormal], name, testActor)) + + protected def nestedPersistInAsyncEnforcesStashing: ActorRef = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor)) + + protected def persistInRecovery: ActorRef = namedPersistentActor[PersistInRecovery] + + protected def recoverMessageCausedRestart: ActorRef = namedPersistentActor[RecoverMessageCausedRestart] + + protected def deferringAsyncWithPersistActor: ActorRef = namedPersistentActor[DeferringAsyncWithPersistActor] + + protected def deferringSyncWithPersistActor: ActorRef = namedPersistentActor[DeferringSyncWithPersistActor] + + protected def deferringAsyncWithAsyncPersistActor: ActorRef = namedPersistentActor[DeferringAsyncWithAsyncPersistActor] + + protected def deferringSyncWithAsyncPersistActor: ActorRef = namedPersistentActor[DeferringSyncWithAsyncPersistActor] + + protected def deferringAsyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActor[DeferringAsyncMixedCallsPPADDPADPersistActor] + + protected def deferringSyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActor[DeferringSyncMixedCallsPPADDPADPersistActor] + + protected def deferringAsyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActor[DeferringAsyncWithNoPersistCallsPersistActor] + + protected def deferringSyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActor[DeferringSyncWithNoPersistCallsPersistActor] + + protected def deferringAsyncActor: ActorRef = namedPersistentActor[DeferringAsyncActor] + + protected def deferringSyncActor: ActorRef = namedPersistentActor[DeferringSyncActor] + "A persistent actor" must { "fail fast if persistenceId is null" in { import akka.testkit.filterEvents @@ -743,31 +1046,31 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } } "recover from persisted events" in { - val persistentActor = namedPersistentActor[Behavior1PersistentActor] + val persistentActor = behavior1PersistentActor persistentActor ! GetState expectMsg(List("a-1", "a-2")) } "handle multiple emitted events in correct order (for a single persist call)" in { - val persistentActor = namedPersistentActor[Behavior1PersistentActor] + val persistentActor = behavior1PersistentActor persistentActor ! Cmd("b") persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-1", "b-2")) } "handle multiple emitted events in correct order (for multiple persist calls)" in { - val persistentActor = namedPersistentActor[Behavior2PersistentActor] + val persistentActor = behavior2PersistentActor persistentActor ! Cmd("b") persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-1", "b-2", "b-3", "b-4")) } "receive emitted events immediately after command" in { - val persistentActor = namedPersistentActor[Behavior3PersistentActor] + val persistentActor = behavior3PersistentActor persistentActor ! Cmd("b") persistentActor ! Cmd("c") persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-10", "b-11", "b-12", "c-10", "c-11", "c-12")) } "recover on command failure" in { - val persistentActor = namedPersistentActor[Behavior3PersistentActor] + val persistentActor = behavior3PersistentActor persistentActor ! Cmd("b") persistentActor ! "boom" persistentActor ! Cmd("c") @@ -776,7 +1079,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-11", "b-12", "c-10", "c-11", "c-12")) } "allow behavior changes in event handler (when handling first event)" in { - val persistentActor = namedPersistentActor[ChangeBehaviorInFirstEventHandlerPersistentActor] + val persistentActor = changeBehaviorInFirstEventHandlerPersistentActor persistentActor ! Cmd("b") persistentActor ! Cmd("c") persistentActor ! Cmd("d") @@ -785,7 +1088,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22")) } "allow behavior changes in event handler (when handling last event)" in { - val persistentActor = namedPersistentActor[ChangeBehaviorInLastEventHandlerPersistentActor] + val persistentActor = changeBehaviorInLastEventHandlerPersistentActor persistentActor ! Cmd("b") persistentActor ! Cmd("c") persistentActor ! Cmd("d") @@ -794,7 +1097,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22")) } "allow behavior changes in command handler (as first action)" in { - val persistentActor = namedPersistentActor[ChangeBehaviorInCommandHandlerFirstPersistentActor] + val persistentActor = changeBehaviorInCommandHandlerFirstPersistentActor persistentActor ! Cmd("b") persistentActor ! Cmd("c") persistentActor ! Cmd("d") @@ -803,7 +1106,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32")) } "allow behavior changes in command handler (as last action)" in { - val persistentActor = namedPersistentActor[ChangeBehaviorInCommandHandlerLastPersistentActor] + val persistentActor = changeBehaviorInCommandHandlerLastPersistentActor persistentActor ! Cmd("b") persistentActor ! Cmd("c") persistentActor ! Cmd("d") @@ -812,7 +1115,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32")) } "support snapshotting" in { - val persistentActor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + val persistentActor1 = snapshottingPersistentActor persistentActor1 ! Cmd("b") persistentActor1 ! "snap" persistentActor1 ! Cmd("c") @@ -820,13 +1123,13 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val persistentActor2 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + val persistentActor2 = snapshottingPersistentActor expectMsg("offered") persistentActor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } "support context.become during recovery" in { - val persistentActor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + val persistentActor1 = snapshottingPersistentActor persistentActor1 ! Cmd("b") persistentActor1 ! "snap" persistentActor1 ! Cmd("c") @@ -834,24 +1137,24 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val persistentActor2 = system.actorOf(Props(classOf[SnapshottingBecomingPersistentActor], name, testActor)) + val persistentActor2 = snapshottingBecomingPersistentActor expectMsg("offered") expectMsg("I am becoming") persistentActor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } "be able to reply within an event handler" in { - val persistentActor = namedPersistentActor[ReplyInEventHandlerPersistentActor] + val persistentActor = replyInEventHandlerPersistentActor persistentActor ! Cmd("a") expectMsg("a") } "be able to persist events that extend AnyVal" in { - val persistentActor = namedPersistentActor[AnyValEventPersistentActor] + val persistentActor = anyValEventPersistentActor persistentActor ! Cmd("a") expectMsg(5) } "be able to opt-out from stashing messages until all events have been processed" in { - val persistentActor = namedPersistentActor[AsyncPersistPersistentActor] + val persistentActor = asyncPersistPersistentActor persistentActor ! Cmd("x") persistentActor ! Cmd("y") expectMsg("x") @@ -860,7 +1163,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("y-2") } "support multiple persistAsync calls for one command, and execute them 'when possible', not hindering command processing" in { - val persistentActor = namedPersistentActor[AsyncPersistThreeTimesPersistentActor] + val persistentActor = asyncPersistThreeTimesPersistentActor val commands = 1 to 10 map { i ⇒ Cmd(s"c-$i") } commands foreach { i ⇒ @@ -880,7 +1183,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi "reply to the original sender() of a command, even when using persistAsync" in { // sanity check, the setting of sender() for PersistentRepl is handled by PersistentActor currently // but as we want to remove it soon, keeping the explicit test here. - val persistentActor = namedPersistentActor[AsyncPersistThreeTimesPersistentActor] + val persistentActor = asyncPersistThreeTimesPersistentActor val commands = 1 to 10 map { i ⇒ Cmd(s"c-$i") } val probes = Vector.fill(10)(TestProbe()) @@ -892,20 +1195,22 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi val ackClass = classOf[String] within(3.seconds) { - probes foreach { _.expectMsgAllClassOf(ackClass, ackClass, ackClass) } + probes foreach { + _.expectMsgAllClassOf(ackClass, ackClass, ackClass) + } } } "support the same event being asyncPersist'ed multiple times" in { - val persistentActor = namedPersistentActor[AsyncPersistSameEventTwicePersistentActor] + val persistentActor = asyncPersistSameEventTwicePersistentActor persistentActor ! Cmd("x") expectMsg("x") expectMsg("x-a-1") expectMsg("x-b-2") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } "support calling persistAll with Nil" in { - val persistentActor = namedPersistentActor[PersistAllNilPersistentActor] + val persistentActor = persistAllNilPersistentActor persistentActor ! Cmd("defer-x") expectMsg("before-nil") expectMsg("after-nil") @@ -916,7 +1221,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("after-nil") } "support a mix of persist calls (sync, async, sync) and persist calls in expected order" in { - val persistentActor = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor] + val persistentActor = asyncPersistAndPersistMixedSyncAsyncSyncPersistentActor persistentActor ! Cmd("a") persistentActor ! Cmd("b") persistentActor ! Cmd("c") @@ -933,10 +1238,10 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("c-ea2-8") expectMsg("c-e3-9") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } "support a mix of persist calls (sync, async) and persist calls" in { - val persistentActor = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncPersistentActor] + val persistentActor = asyncPersistAndPersistMixedSyncAsyncPersistentActor persistentActor ! Cmd("a") persistentActor ! Cmd("b") persistentActor ! Cmd("c") @@ -956,10 +1261,10 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("c-e1-5") expectMsg("c-ea2-6") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } "correlate persistAsync handlers after restart" in { - val persistentActor = namedPersistentActor[AsyncPersistHandlerCorrelationCheck] + val persistentActor = asyncPersistHandlerCorrelationCheck for (n ← 1 to 100) persistentActor ! Cmd(n) persistentActor ! "boom" for (n ← 1 to 20) persistentActor ! Cmd(n) @@ -976,8 +1281,8 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } - test(namedPersistentActor[DeferringAsyncWithPersistActor]) - test(namedPersistentActor[DeferringSyncWithPersistActor]) + test(deferringAsyncWithPersistActor) + test(deferringSyncWithPersistActor) } "allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in { def test(actor: ActorRef): Unit = { @@ -989,8 +1294,8 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } - test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor]) - test(namedPersistentActor[DeferringSyncWithAsyncPersistActor]) + test(deferringAsyncWithAsyncPersistActor) + test(deferringSyncWithAsyncPersistActor) } "invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in { def test(actor: ActorRef): Unit = { @@ -1015,8 +1320,8 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } - test(namedPersistentActor[DeferringAsyncMixedCallsPPADDPADPersistActor]) - test(namedPersistentActor[DeferringSyncMixedCallsPPADDPADPersistActor]) + test(deferringAsyncMixedCallsPPADDPADPersistActor) + test(deferringSyncMixedCallsPPADDPADPersistActor) } "invoke deferred handlers right away, if there are no pending persist handlers registered" in { def test(actor: ActorRef): Unit = { @@ -1027,8 +1332,8 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } - test(namedPersistentActor[DeferringAsyncWithNoPersistCallsPersistActor]) - test(namedPersistentActor[DeferringSyncWithNoPersistCallsPersistActor]) + test(deferringAsyncWithNoPersistCallsPersistActor) + test(deferringSyncWithNoPersistCallsPersistActor) } "invoke deferred handlers, preserving the original sender references" in { def test(actor: ActorRef): Unit = { @@ -1048,11 +1353,11 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } - test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor]) - test(namedPersistentActor[DeferringSyncWithAsyncPersistActor]) + test(deferringAsyncWithAsyncPersistActor) + test(deferringSyncWithAsyncPersistActor) } "handle new messages before deferAsync handler is called" in { - val persistentActor = namedPersistentActor[DeferringAsyncActor] + val persistentActor = deferringAsyncActor persistentActor ! Cmd("x") persistentActor ! Cmd("y") expectMsg("x") @@ -1061,7 +1366,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("y-defer") } "handle defer sequentially" in { - val persistentActor = namedPersistentActor[DeferringSyncActor] + val persistentActor = deferringSyncActor persistentActor ! Cmd("x") persistentActor ! Cmd("y") expectMsg("x") @@ -1070,7 +1375,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("y-defer") } "receive RecoveryFinished if it is handled after all events have been replayed" in { - val persistentActor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + val persistentActor1 = snapshottingPersistentActor persistentActor1 ! Cmd("b") persistentActor1 ! "snap" persistentActor1 ! Cmd("c") @@ -1078,7 +1383,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val persistentActor2 = system.actorOf(Props(classOf[HandleRecoveryFinishedEventPersistentActor], name, testActor)) + val persistentActor2 = handleRecoveryFinishedEventPersistentActor expectMsg("offered") expectMsg(RecoveryCompleted) expectMsg("I am the stashed") @@ -1087,7 +1392,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted)) } "preserve order of incoming messages" in { - val persistentActor = namedPersistentActor[StressOrdering] + val persistentActor = stressOrdering persistentActor ! Cmd("a") val latch = TestLatch(1) persistentActor ! LatchCmd(latch, "b") @@ -1100,7 +1405,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("d") } "be used as a stackable modification" in { - val persistentActor = system.actorOf(Props(classOf[StackableTestPersistentActor], testActor)) + val persistentActor = stackableTestPersistentActor expectMsg("mixin aroundPreStart") expectMsg("base aroundPreStart") expectMsg("preStart") @@ -1124,10 +1429,10 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("base aroundPostStop") expectMsg("postStop") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } "allow multiple persists with nested persist calls" in { - val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersists], name, testActor)) + val persistentActor = multipleAndNestedPersists persistentActor ! "a" persistentActor ! "b" @@ -1144,7 +1449,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("b-inner-2") } "allow multiple persistAsyncs with nested persistAsync calls" in { - val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncs], name, testActor)) + val persistentActor = multipleAndNestedPersistAsyncs persistentActor ! "a" persistentActor ! "b" @@ -1157,7 +1462,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi "allow deeply nested persist calls" in { val nestedPersists = 6 - val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersists], name, nestedPersists, testActor)) + val persistentActor = deeplyNestedPersists(nestedPersists) persistentActor ! "a" persistentActor ! "b" @@ -1170,7 +1475,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi "allow deeply nested persistAsync calls" in { val nestedPersistAsyncs = 6 - val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncs], name, nestedPersistAsyncs, testActor)) + val persistentActor = deeplyNestedPersistAsyncs(nestedPersistAsyncs) persistentActor ! "a" expectMsg("a") @@ -1183,21 +1488,21 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi receiveN(expectedReplies).map(_.toString) should beIndependentlyOrdered("b-", "c-") } "allow mixed nesting of persistAsync in persist calls" in { - val persistentActor = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncs], name, testActor)) + val persistentActor = nestedPersistNormalAndAsyncs persistentActor ! "a" expectMsg("a") receiveN(4) should equal(List("a-outer-1", "a-outer-2", "a-inner-async-1", "a-inner-async-2")) } "allow mixed nesting of persist in persistAsync calls" in { - val persistentActor = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormal], name, testActor)) + val persistentActor = nestedPersistAsyncsAndNormal persistentActor ! "a" expectMsg("a") receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2")) } "make sure persist retains promised semantics when nested in persistAsync callback" in { - val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor)) + val persistentActor = nestedPersistInAsyncEnforcesStashing persistentActor ! "a" expectMsg("a") @@ -1210,7 +1515,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } "be able to delete events" in { - val persistentActor = namedPersistentActor[Behavior1PersistentActor] + val persistentActor = behavior1PersistentActor persistentActor ! Cmd("b") persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-1", "b-2")) @@ -1222,7 +1527,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } "be able to delete all events" in { - val persistentActor = namedPersistentActor[Behavior1PersistentActor] + val persistentActor = behavior1PersistentActor persistentActor ! Cmd("b") persistentActor ! GetState expectMsg(List("a-1", "a-2", "b-1", "b-2")) @@ -1234,13 +1539,13 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } "recover the message which caused the restart" in { - val persistentActor = namedPersistentActor[RecoverMessageCausedRestart] + val persistentActor = recoverMessageCausedRestart persistentActor ! "Boom" expectMsg("failed with TestException while processing Boom") } "be able to persist events that happen during recovery" in { - val persistentActor = namedPersistentActor[PersistInRecovery] + val persistentActor = persistInRecovery persistentActor ! GetState expectMsgAnyOf(List("a-1", "a-2", "rc-1", "rc-2"), List("a-1", "a-2", "rc-1", "rc-2", "rc-3")) persistentActor ! Cmd("invalid") @@ -1261,4 +1566,212 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentActorSpec")) + class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemPersistentActorSpec")) + +/** + * Same test suite as [[LeveldbPersistentActorSpec]], the only difference is that all persistent actors are using the + * provided [[Config]] instead of the [[Config]] coming from the [[ActorSystem]]. + */ +class LeveldbPersistentActorWithRuntimePluginConfigSpec extends PersistentActorSpec( + PersistenceSpec.config("leveldb", "LeveldbPersistentActorWithRuntimePluginConfigSpec") +) { + + val providedActorConfig: Config = { + ConfigFactory.parseString( + s""" + | custom.persistence.journal.leveldb.dir = target/journal-LeveldbPersistentActorWithRuntimePluginConfigSpec + | custom.persistence.snapshot-store.local.dir = target/snapshots-LeveldbPersistentActorWithRuntimePluginConfigSpec/ + """.stripMargin + ).withValue( + s"custom.persistence.journal.leveldb", + system.settings.config.getValue(s"akka.persistence.journal.leveldb") + ).withValue( + "custom.persistence.snapshot-store.local", + system.settings.config.getValue("akka.persistence.snapshot-store.local") + ) + } + + override protected def behavior1PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior1PersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def behavior2PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior2PersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def behavior3PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior3PersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInFirstEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInFirstEventHandlerPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInLastEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInLastEventHandlerPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInCommandHandlerFirstPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInCommandHandlerFirstPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInCommandHandlerLastPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInCommandHandlerLastPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def snapshottingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingPersistentActorWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def snapshottingBecomingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingBecomingPersistentActorWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def replyInEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ReplyInEventHandlerPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def anyValEventPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AnyValEventPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistThreeTimesPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistThreeTimesPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistSameEventTwicePersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistSameEventTwicePersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def persistAllNilPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[PersistAllNilPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistAndPersistMixedSyncAsyncSyncPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistAndPersistMixedSyncAsyncPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistAndPersistMixedSyncAsyncPersistentActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistHandlerCorrelationCheck: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistHandlerCorrelationCheckWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def handleRecoveryFinishedEventPersistentActor: ActorRef = system.actorOf(Props(classOf[HandleRecoveryFinishedEventPersistentActorWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def stressOrdering: ActorRef = namedPersistentActorWithProvidedConfig[StressOrderingWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def stackableTestPersistentActor: ActorRef = system.actorOf(Props(classOf[StackableTestPersistentActorWithLevelDbRuntimePluginConfig], testActor, providedActorConfig)) + + override protected def multipleAndNestedPersists: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersistsWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def multipleAndNestedPersistAsyncs: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncsWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def deeplyNestedPersists(nestedPersists: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersistsWithLevelDbRuntimePluginConfig], name, nestedPersists, testActor, providedActorConfig)) + + override protected def deeplyNestedPersistAsyncs(nestedPersistAsyncs: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncsWithLevelDbRuntimePluginConfig], name, nestedPersistAsyncs, testActor, providedActorConfig)) + + override protected def nestedPersistNormalAndAsyncs: ActorRef = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncsWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def nestedPersistAsyncsAndNormal: ActorRef = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormalWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def nestedPersistInAsyncEnforcesStashing: ActorRef = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashingWithLevelDbRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def persistInRecovery: ActorRef = namedPersistentActorWithProvidedConfig[PersistInRecoveryWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def recoverMessageCausedRestart: ActorRef = namedPersistentActorWithProvidedConfig[RecoverMessageCausedRestartWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithAsyncPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithAsyncPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithAsyncPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithAsyncPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncMixedCallsPPADDPADPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncMixedCallsPPADDPADPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithNoPersistCallsPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithNoPersistCallsPersistActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncActorWithLevelDbRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncActorWithLevelDbRuntimePluginConfig](providedActorConfig) +} + +/** + * Same test suite as [[InmemPersistentActorSpec]], the only difference is that all persistent actors are using the + * provided [[Config]] instead of the [[Config]] coming from the [[ActorSystem]]. + */ +class InmemPersistentActorWithRuntimePluginConfigSpec extends PersistentActorSpec( + PersistenceSpec.config("inmem", "InmemPersistentActorWithRuntimePluginConfigSpec") +) { + + val providedActorConfig: Config = { + ConfigFactory.parseString( + s""" + | custom.persistence.snapshot-store.local.dir = target/snapshots-InmemPersistentActorWithRuntimePluginConfigSpec/ + """.stripMargin + ).withValue( + s"custom.persistence.journal.inmem", + system.settings.config.getValue(s"akka.persistence.journal.inmem") + ).withValue( + "custom.persistence.snapshot-store.local", + system.settings.config.getValue("akka.persistence.snapshot-store.local") + ) + } + + override protected def behavior1PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior1PersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def behavior2PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior2PersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def behavior3PersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[Behavior3PersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInFirstEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInFirstEventHandlerPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInLastEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInLastEventHandlerPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInCommandHandlerFirstPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInCommandHandlerFirstPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def changeBehaviorInCommandHandlerLastPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ChangeBehaviorInCommandHandlerLastPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def snapshottingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingPersistentActorWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def snapshottingBecomingPersistentActor: ActorRef = system.actorOf(Props(classOf[SnapshottingBecomingPersistentActorWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def replyInEventHandlerPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[ReplyInEventHandlerPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def anyValEventPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AnyValEventPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistThreeTimesPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistThreeTimesPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistSameEventTwicePersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistSameEventTwicePersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def persistAllNilPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[PersistAllNilPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistAndPersistMixedSyncAsyncSyncPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistAndPersistMixedSyncAsyncPersistentActor: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistAndPersistMixedSyncAsyncPersistentActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def asyncPersistHandlerCorrelationCheck: ActorRef = namedPersistentActorWithProvidedConfig[AsyncPersistHandlerCorrelationCheckWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def handleRecoveryFinishedEventPersistentActor: ActorRef = system.actorOf(Props(classOf[HandleRecoveryFinishedEventPersistentActorWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def stressOrdering: ActorRef = namedPersistentActorWithProvidedConfig[StressOrderingWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def stackableTestPersistentActor: ActorRef = system.actorOf(Props(classOf[StackableTestPersistentActorWithInmemRuntimePluginConfig], testActor, providedActorConfig)) + + override protected def multipleAndNestedPersists: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersistsWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def multipleAndNestedPersistAsyncs: ActorRef = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncsWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def deeplyNestedPersists(nestedPersists: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersistsWithInmemRuntimePluginConfig], name, nestedPersists, testActor, providedActorConfig)) + + override protected def deeplyNestedPersistAsyncs(nestedPersistAsyncs: Int): ActorRef = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncsWithInmemRuntimePluginConfig], name, nestedPersistAsyncs, testActor, providedActorConfig)) + + override protected def nestedPersistNormalAndAsyncs: ActorRef = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncsWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def nestedPersistAsyncsAndNormal: ActorRef = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormalWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def nestedPersistInAsyncEnforcesStashing: ActorRef = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashingWithInmemRuntimePluginConfig], name, testActor, providedActorConfig)) + + override protected def persistInRecovery: ActorRef = namedPersistentActorWithProvidedConfig[PersistInRecoveryWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def recoverMessageCausedRestart: ActorRef = namedPersistentActorWithProvidedConfig[RecoverMessageCausedRestartWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithAsyncPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithAsyncPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithAsyncPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithAsyncPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncMixedCallsPPADDPADPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncMixedCallsPPADDPADPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncMixedCallsPPADDPADPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncWithNoPersistCallsPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncWithNoPersistCallsPersistActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncWithNoPersistCallsPersistActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringAsyncActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringAsyncActorWithInmemRuntimePluginConfig](providedActorConfig) + + override protected def deferringSyncActor: ActorRef = namedPersistentActorWithProvidedConfig[DeferringSyncActorWithInmemRuntimePluginConfig](providedActorConfig) +}