Don't start journal when accessing event adapters, #28597

This commit is contained in:
Patrik Nordwall 2020-02-11 19:31:55 +01:00
parent dcdaa5a0dd
commit 01b95c50ec
3 changed files with 34 additions and 9 deletions

View file

@ -0,0 +1,2 @@
# #28597 Don't start journal when accessing event adapters
ProblemFilters.exclude[Problem]("akka.persistence.Persistence#PluginHolder*")

View file

@ -165,8 +165,11 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
def lookup() = Persistence
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config)
extends Extension
private[persistence] case class PluginHolder(actorFactory: () => ActorRef, adapters: EventAdapters, config: Config)
extends Extension {
// lazy creation of actor so that it's not started when only lookup up adapters
lazy val actor: ActorRef = actorFactory()
}
/** Config path to fall-back to if a setting is not defined in a specific plugin's config section */
val JournalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
@ -442,10 +445,10 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val config: Config = mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig(fallbackPath))
val plugin: ActorRef = createPlugin(configPath, config)
val pluginActorFactory = () => createPlugin(configPath, config)
val adapters: EventAdapters = createAdapters(configPath, mergedConfig)
PluginHolder(plugin, adapters, config)
PluginHolder(pluginActorFactory, adapters, config)
}
}

View file

@ -8,8 +8,9 @@ import akka.persistence.journal.inmem.InmemJournal
import com.typesafe.config.Config
import akka.testkit.ImplicitSender
import akka.actor.Actor
import akka.util.unused
object LoadJournalSpec {
object LoadPluginSpec {
case object GetConfig
@ -18,25 +19,44 @@ object LoadJournalSpec {
case GetConfig => sender() ! config
}
}
object JournalWithStartupNotification {
final case class Started(configPath: String)
}
class JournalWithStartupNotification(@unused config: Config, configPath: String) extends InmemJournal {
context.system.eventStream.publish(JournalWithStartupNotification.Started(configPath))
}
}
class LoadJournalSpec
class LoadPluginSpec
extends PersistenceSpec(
PersistenceSpec.config(
"inmem",
"LoadJournalSpec",
extraConfig = Some("""
akka.persistence.journal.inmem.class = "akka.persistence.LoadJournalSpec$JournalWithConfig"
akka.persistence.journal.inmem.class = "akka.persistence.LoadPluginSpec$JournalWithConfig"
akka.persistence.journal.inmem.extra-property = 17
test-plugin {
class = "akka.persistence.LoadPluginSpec$JournalWithStartupNotification"
}
""")))
with ImplicitSender {
import LoadJournalSpec._
import LoadPluginSpec._
"A journal with config parameter" must {
"A journal" must {
"be created with plugin config" in {
val journalRef = Persistence(system).journalFor("akka.persistence.journal.inmem")
journalRef ! GetConfig
expectMsgType[Config].getInt("extra-property") should be(17)
}
"not be created via eventAdapter" in {
system.eventStream.subscribe(testActor, classOf[JournalWithStartupNotification.Started])
Persistence(system).adaptersFor("test-plugin")
expectNoMessage()
Persistence(system).journalFor("test-plugin")
expectMsg(JournalWithStartupNotification.Started("test-plugin"))
}
}
}