add tests for missing journal and snapshot plugin config, #24687
* extract validation of persistence plugin config
* The reason for the duplicate check in EventSourcedSettings
is for better user experience. Fail fast before the actor is
started.
* The checks must still also be done when plugin is loaded, since
that is what is used by untyped.
* Extracted into static utility methods in Persistence
This commit is contained in:
parent
1a90f22fb4
commit
9d74ad11e4
3 changed files with 147 additions and 14 deletions
|
|
@ -11,6 +11,7 @@ import scala.concurrent.duration._
|
||||||
import akka.actor.typed.ActorSystem
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.Persistence
|
import akka.persistence.Persistence
|
||||||
|
import akka.persistence.Persistence.verifyPluginConfigIsDefined
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -40,6 +41,8 @@ import com.typesafe.config.Config
|
||||||
val recoveryEventTimeout: FiniteDuration =
|
val recoveryEventTimeout: FiniteDuration =
|
||||||
journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
|
journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
|
||||||
|
|
||||||
|
Persistence.verifyPluginConfigExists(config, snapshotPluginId, "Snapshot store")
|
||||||
|
|
||||||
EventSourcedSettings(
|
EventSourcedSettings(
|
||||||
stashCapacity = stashCapacity,
|
stashCapacity = stashCapacity,
|
||||||
stashOverflowStrategy,
|
stashOverflowStrategy,
|
||||||
|
|
@ -49,9 +52,15 @@ import com.typesafe.config.Config
|
||||||
snapshotPluginId)
|
snapshotPluginId)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] final def journalConfigFor(config: Config, journalPluginId: String): Config = {
|
private def journalConfigFor(config: Config, journalPluginId: String): Config = {
|
||||||
val defaultJournalPluginId = config.getString("akka.persistence.journal.plugin")
|
def defaultJournalPluginId = {
|
||||||
|
val configPath = config.getString("akka.persistence.journal.plugin")
|
||||||
|
Persistence.verifyPluginConfigIsDefined(configPath, "Default journal")
|
||||||
|
configPath
|
||||||
|
}
|
||||||
|
|
||||||
val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId
|
val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId
|
||||||
|
Persistence.verifyPluginConfigExists(config, configPath, "Journal")
|
||||||
config.getConfig(configPath).withFallback(config.getConfig(Persistence.JournalFallbackConfigPath))
|
config.getConfig(configPath).withFallback(config.getConfig(Persistence.JournalFallbackConfigPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -878,6 +878,91 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"fail fast if default journal plugin is not defined" in {
|
||||||
|
// new ActorSystem without persistence config
|
||||||
|
val testkit2 = ActorTestKit(
|
||||||
|
ActorTestKitBase.testNameFromCallStack(),
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.loggers = [akka.testkit.TestEventListener]
|
||||||
|
"""))
|
||||||
|
try {
|
||||||
|
EventFilter[ActorInitializationException](start = "Default journal plugin is not configured", occurrences = 1)
|
||||||
|
.intercept {
|
||||||
|
val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid())))
|
||||||
|
val probe = testkit2.createTestProbe()
|
||||||
|
probe.expectTerminated(ref)
|
||||||
|
}(testkit2.system.toUntyped)
|
||||||
|
} finally {
|
||||||
|
testkit2.shutdownTestKit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail fast if given journal plugin is not defined" in {
|
||||||
|
// new ActorSystem without persistence config
|
||||||
|
val testkit2 = ActorTestKit(
|
||||||
|
ActorTestKitBase.testNameFromCallStack(),
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.loggers = [akka.testkit.TestEventListener]
|
||||||
|
"""))
|
||||||
|
try {
|
||||||
|
EventFilter[ActorInitializationException](
|
||||||
|
start = "Journal plugin [missing] configuration doesn't exist",
|
||||||
|
occurrences = 1).intercept {
|
||||||
|
val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()).withJournalPluginId("missing")))
|
||||||
|
val probe = testkit2.createTestProbe()
|
||||||
|
probe.expectTerminated(ref)
|
||||||
|
}(testkit2.system.toUntyped)
|
||||||
|
} finally {
|
||||||
|
testkit2.shutdownTestKit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"warn if default snapshot plugin is not defined" in {
|
||||||
|
// new ActorSystem without snapshot plugin config
|
||||||
|
val testkit2 = ActorTestKit(
|
||||||
|
ActorTestKitBase.testNameFromCallStack(),
|
||||||
|
ConfigFactory.parseString(s"""
|
||||||
|
akka.loggers = [akka.testkit.TestEventListener]
|
||||||
|
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||||
|
"""))
|
||||||
|
try {
|
||||||
|
EventFilter
|
||||||
|
.warning(start = "No default snapshot store configured", occurrences = 1)
|
||||||
|
.intercept {
|
||||||
|
val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid())))
|
||||||
|
val probe = testkit2.createTestProbe[State]()
|
||||||
|
// verify that it's not terminated
|
||||||
|
ref ! GetValue(probe.ref)
|
||||||
|
probe.expectMessage(State(0, Vector.empty))
|
||||||
|
}(testkit2.system.toUntyped)
|
||||||
|
} finally {
|
||||||
|
testkit2.shutdownTestKit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail fast if given snapshot plugin is not defined" in {
|
||||||
|
// new ActorSystem without snapshot plugin config
|
||||||
|
val testkit2 = ActorTestKit(
|
||||||
|
ActorTestKitBase.testNameFromCallStack(),
|
||||||
|
ConfigFactory.parseString(s"""
|
||||||
|
akka.loggers = [akka.testkit.TestEventListener]
|
||||||
|
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||||
|
"""))
|
||||||
|
try {
|
||||||
|
EventFilter[ActorInitializationException](
|
||||||
|
start = "Snapshot store plugin [missing] configuration doesn't exist",
|
||||||
|
occurrences = 1).intercept {
|
||||||
|
val ref = testkit2.spawn(Behaviors.setup[Command](counter(_, nextPid()).withSnapshotPluginId("missing")))
|
||||||
|
val probe = testkit2.createTestProbe()
|
||||||
|
probe.expectTerminated(ref)
|
||||||
|
}(testkit2.system.toUntyped)
|
||||||
|
} finally {
|
||||||
|
testkit2.shutdownTestKit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
|
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
|
||||||
val probe = TestProbe[String]()
|
val probe = TestProbe[String]()
|
||||||
val w = Behaviors.setup[Any] { ctx =>
|
val w = Behaviors.setup[Any] { ctx =>
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,32 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
|
||||||
|
|
||||||
/** Config path to fall-back to if a setting is not defined in a specific snapshot plugin's config section */
|
/** Config path to fall-back to if a setting is not defined in a specific snapshot plugin's config section */
|
||||||
val SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
|
val SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
* @throws IllegalArgumentException if config path for the `pluginId` doesn't exist
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] def verifyPluginConfigExists(
|
||||||
|
config: Config,
|
||||||
|
pluginId: String,
|
||||||
|
pluginType: String): Unit = {
|
||||||
|
if (!isEmpty(pluginId) && !config.hasPath(pluginId))
|
||||||
|
throw new IllegalArgumentException(s"$pluginType plugin [$pluginId] configuration doesn't exist.")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
* @throws IllegalArgumentException if `pluginId` is empty (undefined)
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] def verifyPluginConfigIsDefined(pluginId: String, pluginType: String): Unit = {
|
||||||
|
if (isEmpty(pluginId))
|
||||||
|
throw new IllegalArgumentException(s"$pluginType plugin is not configured, see 'reference.conf'")
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Check for default or missing identity. */
|
||||||
|
private def isEmpty(text: String) = {
|
||||||
|
text == null || text.length == 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -199,7 +225,8 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
// Lazy, so user is not forced to configure defaults when she is not using them.
|
// Lazy, so user is not forced to configure defaults when she is not using them.
|
||||||
private lazy val defaultJournalPluginId = {
|
private lazy val defaultJournalPluginId = {
|
||||||
val configPath = config.getString("journal.plugin")
|
val configPath = config.getString("journal.plugin")
|
||||||
require(!isEmpty(configPath), "default journal plugin is not configured, see 'reference.conf'")
|
verifyPluginConfigIsDefined(configPath, "Default journal")
|
||||||
|
verifyJournalPluginConfigExists(ConfigFactory.empty, configPath)
|
||||||
configPath
|
configPath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -213,7 +240,10 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
"To configure a default snapshot-store plugin set the `akka.persistence.snapshot-store.plugin` key. " +
|
"To configure a default snapshot-store plugin set the `akka.persistence.snapshot-store.plugin` key. " +
|
||||||
"For details see 'reference.conf'")
|
"For details see 'reference.conf'")
|
||||||
NoSnapshotStorePluginId
|
NoSnapshotStorePluginId
|
||||||
} else configPath
|
} else {
|
||||||
|
verifySnapshotPluginConfigExists(ConfigFactory.empty, configPath)
|
||||||
|
configPath
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lazy, so user is not forced to configure defaults when she is not using them.
|
// Lazy, so user is not forced to configure defaults when she is not using them.
|
||||||
|
|
@ -227,9 +257,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
val settings = new PersistenceSettings(config)
|
val settings = new PersistenceSettings(config)
|
||||||
|
|
||||||
/** Check for default or missing identity. */
|
|
||||||
private def isEmpty(text: String) = text == null || text.length == 0
|
|
||||||
|
|
||||||
/** Discovered persistence journal and snapshot store plugins. */
|
/** Discovered persistence journal and snapshot store plugins. */
|
||||||
private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
|
private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
|
||||||
|
|
||||||
|
|
@ -250,6 +277,18 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IllegalArgumentException if `configPath` doesn't exist
|
||||||
|
*/
|
||||||
|
private def verifyJournalPluginConfigExists(pluginConfig: Config, configPath: String): Unit =
|
||||||
|
verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Journal")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IllegalArgumentException if `configPath` doesn't exist
|
||||||
|
*/
|
||||||
|
private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit =
|
||||||
|
verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
|
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
|
||||||
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
|
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
|
||||||
|
|
@ -268,6 +307,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
*/
|
*/
|
||||||
final def adaptersFor(journalPluginId: String, journalPluginConfig: Config): EventAdapters = {
|
final def adaptersFor(journalPluginId: String, journalPluginConfig: Config): EventAdapters = {
|
||||||
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
||||||
|
verifyJournalPluginConfigExists(journalPluginConfig, configPath)
|
||||||
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).adapters
|
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).adapters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -294,6 +334,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
journalPluginId: String,
|
journalPluginId: String,
|
||||||
journalPluginConfig: Config = ConfigFactory.empty): Config = {
|
journalPluginConfig: Config = ConfigFactory.empty): Config = {
|
||||||
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
||||||
|
verifyJournalPluginConfigExists(journalPluginConfig, configPath)
|
||||||
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).config
|
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,6 +361,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
journalPluginId: String,
|
journalPluginId: String,
|
||||||
journalPluginConfig: Config = ConfigFactory.empty): ActorRef = {
|
journalPluginConfig: Config = ConfigFactory.empty): ActorRef = {
|
||||||
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
|
||||||
|
verifyJournalPluginConfigExists(journalPluginConfig, configPath)
|
||||||
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).actor
|
pluginHolderFor(configPath, JournalFallbackConfigPath, journalPluginConfig).actor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -335,6 +377,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
snapshotPluginId: String,
|
snapshotPluginId: String,
|
||||||
snapshotPluginConfig: Config = ConfigFactory.empty): ActorRef = {
|
snapshotPluginConfig: Config = ConfigFactory.empty): ActorRef = {
|
||||||
val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
|
val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
|
||||||
|
verifySnapshotPluginConfigExists(snapshotPluginConfig, configPath)
|
||||||
pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath, snapshotPluginConfig).actor
|
pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath, snapshotPluginConfig).actor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -355,13 +398,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
private def createPlugin(configPath: String, pluginConfig: Config): ActorRef = {
|
private def createPlugin(configPath: String, pluginConfig: Config): ActorRef = {
|
||||||
val pluginActorName = configPath
|
val pluginActorName = configPath
|
||||||
val pluginClassName = pluginConfig.getString("class") match {
|
val pluginClassName = pluginConfig.getString("class")
|
||||||
case "" =>
|
if (isEmpty(pluginClassName))
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(s"Plugin class name must be defined in config property [$configPath.class]")
|
||||||
"Plugin class name must be defined in config property " +
|
|
||||||
s"[$configPath.class]")
|
|
||||||
case className => className
|
|
||||||
}
|
|
||||||
log.debug(s"Create plugin: $pluginActorName $pluginClassName")
|
log.debug(s"Create plugin: $pluginActorName $pluginClassName")
|
||||||
val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
|
val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
|
||||||
val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher")
|
val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue