From 9d74ad11e42c04e2c6e8a22f953734f7fc537ddf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 15 Mar 2019 12:25:08 +0100 Subject: [PATCH] add tests for missing journal and snapshot plugin config, #24687 * extract validation of persistence plugin config * The reason for the duplicate check in EventSourcedSettings is for better user experience. Fail fast before the actor is started. * The checks must still also be done when plugin is loaded, since that is what is used by untyped. * Extracted into static utility methods in Persistence --- .../typed/internal/EventSourcedSettings.scala | 13 ++- .../scaladsl/EventSourcedBehaviorSpec.scala | 85 +++++++++++++++++++ .../scala/akka/persistence/Persistence.scala | 63 +++++++++++--- 3 files changed, 147 insertions(+), 14 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala index e76280154c..05327fd4fb 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.Persistence +import akka.persistence.Persistence.verifyPluginConfigIsDefined import com.typesafe.config.Config /** @@ -40,6 +41,8 @@ import com.typesafe.config.Config val recoveryEventTimeout: FiniteDuration = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis + Persistence.verifyPluginConfigExists(config, snapshotPluginId, "Snapshot store") + EventSourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategy, @@ -49,9 +52,15 @@ import com.typesafe.config.Config snapshotPluginId) } - private[akka] final def journalConfigFor(config: Config, journalPluginId: String): Config = { - val defaultJournalPluginId = config.getString("akka.persistence.journal.plugin") + private def journalConfigFor(config: Config, journalPluginId: String): Config = { + def defaultJournalPluginId = { + val configPath = config.getString("akka.persistence.journal.plugin") + Persistence.verifyPluginConfigIsDefined(configPath, "Default journal") + configPath + } + val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId + Persistence.verifyPluginConfigExists(config, configPath, "Journal") config.getConfig(configPath).withFallback(config.getConfig(Persistence.JournalFallbackConfigPath)) } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 7825604689..e2d6d5a57a 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -878,6 +878,91 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh } } + "fail fast if default journal plugin is not defined" in { + // new ActorSystem without persistence config + val testkit2 = ActorTestKit( + ActorTestKitBase.testNameFromCallStack(), + ConfigFactory.parseString(""" + akka.loggers = [akka.testkit.TestEventListener] + """)) + try { + EventFilter[ActorInitializationException](start = "Default journal plugin is not configured", occurrences = 1) + .intercept { + val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()))) + val probe = testkit2.createTestProbe() + probe.expectTerminated(ref) + }(testkit2.system.toUntyped) + } finally { + testkit2.shutdownTestKit() + } + } + + "fail fast if given journal plugin is not defined" in { + // new ActorSystem without persistence config + val testkit2 = ActorTestKit( + ActorTestKitBase.testNameFromCallStack(), + ConfigFactory.parseString(""" + akka.loggers = [akka.testkit.TestEventListener] + """)) + try { + EventFilter[ActorInitializationException]( + start = "Journal plugin [missing] configuration doesn't exist", + occurrences = 1).intercept { + val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()).withJournalPluginId("missing"))) + val probe = testkit2.createTestProbe() + probe.expectTerminated(ref) + }(testkit2.system.toUntyped) + } finally { + testkit2.shutdownTestKit() + } + } + + "warn if default snapshot plugin is not defined" in { + // new ActorSystem without snapshot plugin config + val testkit2 = ActorTestKit( + ActorTestKitBase.testNameFromCallStack(), + ConfigFactory.parseString(s""" + akka.loggers = [akka.testkit.TestEventListener] + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + """)) + try { + EventFilter + .warning(start = "No default snapshot store configured", occurrences = 1) + .intercept { + val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()))) + val probe = testkit2.createTestProbe[State]() + // verify that it's not terminated + ref ! GetValue(probe.ref) + probe.expectMessage(State(0, Vector.empty)) + }(testkit2.system.toUntyped) + } finally { + testkit2.shutdownTestKit() + } + } + + "fail fast if given snapshot plugin is not defined" in { + // new ActorSystem without snapshot plugin config + val testkit2 = ActorTestKit( + ActorTestKitBase.testNameFromCallStack(), + ConfigFactory.parseString(s""" + akka.loggers = [akka.testkit.TestEventListener] + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + """)) + try { + EventFilter[ActorInitializationException]( + start = "Snapshot store plugin [missing] configuration doesn't exist", + occurrences = 1).intercept { + val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()).withSnapshotPluginId("missing"))) + val probe = testkit2.createTestProbe() + probe.expectTerminated(ref) + }(testkit2.system.toUntyped) + } finally { + testkit2.shutdownTestKit() + } + } + def watcher(toWatch: ActorRef[_]): TestProbe[String] = { val probe = TestProbe[String]() val w = Behaviors.setup[Any] { ctx => diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index ca5fa27e5a..4c313db05e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -171,6 +171,32 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { /** Config path to fall-back to if a setting is not defined in a specific snapshot plugin's config section */ val SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" + + /** + * INTERNAL API + * @throws IllegalArgumentException if config path for the `pluginId` doesn't exist + */ + @InternalApi private[akka] def verifyPluginConfigExists( + config: Config, + pluginId: String, + pluginType: String): Unit = { + if (!isEmpty(pluginId) && !config.hasPath(pluginId)) + throw new IllegalArgumentException(s"$pluginType plugin [$pluginId] configuration doesn't exist.") + } + + /** + * INTERNAL API + * @throws IllegalArgumentException if `pluginId` is empty (undefined) + */ + @InternalApi private[akka] def verifyPluginConfigIsDefined(pluginId: String, pluginType: String): Unit = { + if (isEmpty(pluginId)) + throw new IllegalArgumentException(s"$pluginType plugin is not configured, see 'reference.conf'") + } + + /** Check for default or missing identity. */ + private def isEmpty(text: String) = { + text == null || text.length == 0 + } } /** @@ -199,7 +225,8 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { // Lazy, so user is not forced to configure defaults when she is not using them. private lazy val defaultJournalPluginId = { val configPath = config.getString("journal.plugin") - require(!isEmpty(configPath), "default journal plugin is not configured, see 'reference.conf'") + verifyPluginConfigIsDefined(configPath, "Default journal") + verifyJournalPluginConfigExists(ConfigFactory.empty, configPath) configPath } @@ -213,7 +240,10 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { "To configure a default snapshot-store plugin set the `akka.persistence.snapshot-store.plugin` key. " + "For details see 'reference.conf'") NoSnapshotStorePluginId - } else configPath + } else { + verifySnapshotPluginConfigExists(ConfigFactory.empty, configPath) + configPath + } } // Lazy, so user is not forced to configure defaults when she is not using them. @@ -227,9 +257,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { val settings = new PersistenceSettings(config) - /** Check for default or missing identity. */ - private def isEmpty(text: String) = text == null || text.length == 0 - /** Discovered persistence journal and snapshot store plugins. */ private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) @@ -250,6 +277,18 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } }) + /** + * @throws IllegalArgumentException if `configPath` doesn't exist + */ + private def verifyJournalPluginConfigExists(pluginConfig: Config, configPath: String): Unit = + verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Journal") + + /** + * @throws IllegalArgumentException if `configPath` doesn't exist + */ + private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit = + verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store") + /** * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. * If no adapters are registered for a given journal the EventAdapters object will simply return the identity @@ -268,6 +307,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ final def adaptersFor(journalPluginId: String, journalPluginConfig: Config): EventAdapters = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId + verifyJournalPluginConfigExists(journalPluginConfig, configPath) pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).adapters } @@ -294,6 +334,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): Config = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId + verifyJournalPluginConfigExists(journalPluginConfig, configPath) pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).config } @@ -320,6 +361,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { journalPluginId: String, journalPluginConfig: Config = ConfigFactory.empty): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId + verifyJournalPluginConfigExists(journalPluginConfig, configPath) pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).actor } @@ -335,6 +377,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { snapshotPluginId: String, snapshotPluginConfig: Config = ConfigFactory.empty): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId + verifySnapshotPluginConfigExists(snapshotPluginConfig, configPath) pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath, snapshotPluginConfig).actor } @@ -355,13 +398,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def createPlugin(configPath: String, pluginConfig: Config): ActorRef = { val pluginActorName = configPath - val pluginClassName = pluginConfig.getString("class") match { - case "" => - throw new IllegalArgumentException( - "Plugin class name must be defined in config property " + - s"[$configPath.class]") - case className => className - } + val pluginClassName = pluginConfig.getString("class") + if (isEmpty(pluginClassName)) + throw new IllegalArgumentException(s"Plugin class name must be defined in config property [$configPath.class]") log.debug(s"Create plugin: $pluginActorName $pluginClassName") val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher")