Merge pull request #28598 from akka/wip-PluginHolder-patriknw
Don't start journal when accessing event adapters, #28597
This commit is contained in:
commit
b02ac89caa
4 changed files with 38 additions and 15 deletions
|
|
@ -0,0 +1,2 @@
|
||||||
|
# #28597 Don't start journal when accessing event adapters
|
||||||
|
ProblemFilters.exclude[Problem]("akka.persistence.Persistence#PluginHolder*")
|
||||||
|
|
@ -151,8 +151,11 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
|
||||||
def lookup() = Persistence
|
def lookup() = Persistence
|
||||||
|
|
||||||
/** INTERNAL API. */
|
/** INTERNAL API. */
|
||||||
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config)
|
private[persistence] case class PluginHolder(actorFactory: () => ActorRef, adapters: EventAdapters, config: Config)
|
||||||
extends Extension
|
extends Extension {
|
||||||
|
// lazy creation of actor so that it's not started when only looking up adapters
|
||||||
|
lazy val actor: ActorRef = actorFactory()
|
||||||
|
}
|
||||||
|
|
||||||
/** Config path to fall-back to if a setting is not defined in a specific plugin's config section */
|
/** Config path to fall-back to if a setting is not defined in a specific plugin's config section */
|
||||||
val JournalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
|
val JournalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
|
||||||
|
|
@ -428,10 +431,10 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||||
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
|
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
|
||||||
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
|
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
|
||||||
val config: Config = mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig(fallbackPath))
|
val config: Config = mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig(fallbackPath))
|
||||||
val plugin: ActorRef = createPlugin(configPath, config)
|
val pluginActorFactory = () => createPlugin(configPath, config)
|
||||||
val adapters: EventAdapters = createAdapters(configPath, mergedConfig)
|
val adapters: EventAdapters = createAdapters(configPath, mergedConfig)
|
||||||
|
|
||||||
PluginHolder(plugin, adapters, config)
|
PluginHolder(pluginActorFactory, adapters, config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import java.io.File
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
||||||
import akka.testkit.{ EventFilter, TestProbe }
|
import akka.testkit.TestProbe
|
||||||
import akka.util.unused
|
import akka.util.unused
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
|
|
@ -233,7 +233,6 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
|
||||||
s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")
|
s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")
|
||||||
|
|
||||||
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 =>
|
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 =>
|
||||||
EventFilter[ActorInitializationException](occurrences = 1, pattern = ".*undefined event-adapter.*").intercept {
|
|
||||||
intercept[IllegalArgumentException] {
|
intercept[IllegalArgumentException] {
|
||||||
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
|
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
|
||||||
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
|
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
|
||||||
|
|
@ -241,7 +240,6 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// needs persistence between actor systems, thus not running with the inmem journal
|
// needs persistence between actor systems, thus not running with the inmem journal
|
||||||
class LeveldbEndToEndEventAdapterSpec
|
class LeveldbEndToEndEventAdapterSpec
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,9 @@ import akka.persistence.journal.inmem.InmemJournal
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.testkit.ImplicitSender
|
import akka.testkit.ImplicitSender
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
import akka.util.unused
|
||||||
|
|
||||||
object LoadJournalSpec {
|
object LoadPluginSpec {
|
||||||
|
|
||||||
case object GetConfig
|
case object GetConfig
|
||||||
|
|
||||||
|
|
@ -18,25 +19,44 @@ object LoadJournalSpec {
|
||||||
case GetConfig => sender() ! config
|
case GetConfig => sender() ! config
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object JournalWithStartupNotification {
|
||||||
|
final case class Started(configPath: String)
|
||||||
|
}
|
||||||
|
class JournalWithStartupNotification(@unused config: Config, configPath: String) extends InmemJournal {
|
||||||
|
context.system.eventStream.publish(JournalWithStartupNotification.Started(configPath))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class LoadJournalSpec
|
class LoadPluginSpec
|
||||||
extends PersistenceSpec(
|
extends PersistenceSpec(
|
||||||
PersistenceSpec.config(
|
PersistenceSpec.config(
|
||||||
"inmem",
|
"inmem",
|
||||||
"LoadJournalSpec",
|
"LoadJournalSpec",
|
||||||
extraConfig = Some("""
|
extraConfig = Some("""
|
||||||
akka.persistence.journal.inmem.class = "akka.persistence.LoadJournalSpec$JournalWithConfig"
|
akka.persistence.journal.inmem.class = "akka.persistence.LoadPluginSpec$JournalWithConfig"
|
||||||
akka.persistence.journal.inmem.extra-property = 17
|
akka.persistence.journal.inmem.extra-property = 17
|
||||||
|
|
||||||
|
test-plugin {
|
||||||
|
class = "akka.persistence.LoadPluginSpec$JournalWithStartupNotification"
|
||||||
|
}
|
||||||
""")))
|
""")))
|
||||||
with ImplicitSender {
|
with ImplicitSender {
|
||||||
import LoadJournalSpec._
|
import LoadPluginSpec._
|
||||||
|
|
||||||
"A journal with config parameter" must {
|
"A journal" must {
|
||||||
"be created with plugin config" in {
|
"be created with plugin config" in {
|
||||||
val journalRef = Persistence(system).journalFor("akka.persistence.journal.inmem")
|
val journalRef = Persistence(system).journalFor("akka.persistence.journal.inmem")
|
||||||
journalRef ! GetConfig
|
journalRef ! GetConfig
|
||||||
expectMsgType[Config].getInt("extra-property") should be(17)
|
expectMsgType[Config].getInt("extra-property") should be(17)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not be created via eventAdapter" in {
|
||||||
|
system.eventStream.subscribe(testActor, classOf[JournalWithStartupNotification.Started])
|
||||||
|
Persistence(system).adaptersFor("test-plugin")
|
||||||
|
expectNoMessage()
|
||||||
|
Persistence(system).journalFor("test-plugin")
|
||||||
|
expectMsg(JournalWithStartupNotification.Started("test-plugin"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue