=per #19221 Eager initialization of persistence plugins
* Initialization is performed during Persistence initialization, so for this to work the persistence extension should be specified under akka.extensions
This commit is contained in:
parent
ae5a7484b4
commit
70130549df
6 changed files with 37 additions and 4 deletions
|
|
@ -16,6 +16,8 @@ akka.persistence {
|
|||
# Persistent actor or view can override `journalPluginId` method
|
||||
# in order to rely on a different journal plugin.
|
||||
plugin = ""
|
||||
# List of journal plugins to start automatically. Use "" for the default journal plugin.
|
||||
auto-start-journals = []
|
||||
}
|
||||
snapshot-store {
|
||||
# Absolute path to the snapshot plugin configuration entry used by
|
||||
|
|
@ -27,6 +29,8 @@ akka.persistence {
|
|||
# Note that Cluster Sharding is using snapshots, so if you
|
||||
# use Cluster Sharding you need to define a snapshot store plugin.
|
||||
plugin = ""
|
||||
# List of snapshot stores to start automatically. Use "" for the default snapshot store.
|
||||
auto-start-snapshot-stores = []
|
||||
}
|
||||
# used as default-snapshot store if no plugin configured
|
||||
# (see `akka.persistence.snapshot-store`)
|
||||
|
|
|
|||
|
|
@ -5,15 +5,14 @@
|
|||
package akka.persistence
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.function.Consumer
|
||||
import akka.actor._
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters, ReplayFilter }
|
||||
import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters }
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import com.typesafe.config.Config
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration._
|
||||
import java.util.Locale
|
||||
import akka.util.Reflect
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
|
|
@ -168,6 +167,20 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
|
||||
private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
|
||||
|
||||
|
||||
config.getStringList("journal.auto-start-journals").forEach(new Consumer[String] {
|
||||
override def accept(id: String): Unit = {
|
||||
log.info(s"Auto-starting journal plugin `$id`")
|
||||
journalFor(id)
|
||||
}
|
||||
})
|
||||
config.getStringList("snapshot-store.auto-start-snapshot-stores").forEach(new Consumer[String] {
|
||||
override def accept(id: String): Unit = {
|
||||
log.info(s"Auto-starting snapshot store `$id`")
|
||||
snapshotStoreFor(id)
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
|
||||
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import akka.persistence._
|
|||
import akka.persistence.journal.PersistencePluginProxy
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object PersistencePluginProxySpec {
|
||||
lazy val config = ConfigFactory.parseString(
|
||||
|
|
@ -54,6 +53,8 @@ object PersistencePluginProxySpec {
|
|||
|
||||
def targetAddressConfig(system: ActorSystem) = ConfigFactory.parseString(
|
||||
s"""
|
||||
|akka.extensions = ["akka.persistence.Persistence"]
|
||||
|akka.persistence.journal.auto-start-journals = [""]
|
||||
|akka.persistence.journal.proxy.target-journal-address = "${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}"
|
||||
|akka.persistence.snapshot-store.proxy.target-snapshot-store-address = "${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}"
|
||||
""".stripMargin)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue