optionally pass plugin conf path to persistence plugins, #19822

This commit is contained in:
Patrik Nordwall 2016-12-13 19:37:03 +01:00
parent 5b5cf4fc7b
commit 57e59c8496
9 changed files with 126 additions and 36 deletions

View file

@ -870,15 +870,21 @@ A journal plugin can be activated with the following minimal configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` The journal plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to
``akka.persistence.dispatchers.default-plugin-dispatcher``.
Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks.
@ -893,15 +899,21 @@ A snapshot store plugin can be activated with the following minimal configuratio
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
The snapshot store instance is an actor so the methods corresponding to requests from persistent actors The snapshot store instance is an actor so the methods corresponding to requests from persistent actors
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The snapshot store plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config`` The snapshot store plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to
``akka.persistence.dispatchers.default-plugin-dispatcher``.
Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks.

View file

@ -255,6 +255,16 @@ And the ``EventsByTag`` could be backed by such an Actor for example:
.. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher .. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher
The ``ReadJournalProvider`` class must have a constructor with one of these signatures:
* constructor with a ``ExtendedActorSystem`` parameter, a ``com.typesafe.config.Config`` parameter, and a ``String`` parameter for the config path
* constructor with a ``ExtendedActorSystem`` parameter, and a ``com.typesafe.config.Config`` parameter
* constructor with one ``ExtendedActorSystem`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
If the underlying datastore only supports queries that are completed when they reach the If the underlying datastore only supports queries that are completed when they reach the
end of the "result set", the journal has to submit new queries after a while in order end of the "result set", the journal has to submit new queries after a while in order
to support "infinite" event streams that include events stored after the initial query to support "infinite" event streams that include events stored after the initial query

View file

@ -874,8 +874,14 @@ The journal plugin instance is an actor so the methods corresponding to requests
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` The journal plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks.
@ -890,15 +896,21 @@ A snapshot store plugin can be activated with the following minimal configuratio
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
The snapshot store instance is an actor so the methods corresponding to requests from persistent actors The snapshot store instance is an actor so the methods corresponding to requests from persistent actors
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The snapshot store plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config`` The snapshot store plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to
``akka.persistence.dispatchers.default-plugin-dispatcher``.
Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks.

View file

@ -250,6 +250,16 @@ And the ``eventsByTag`` could be backed by such an Actor for example:
.. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher .. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher
The ``ReadJournalProvider`` class must have a constructor with one of these signatures:
* constructor with a ``ExtendedActorSystem`` parameter, a ``com.typesafe.config.Config`` parameter, and a ``String`` parameter for the config path
* constructor with a ``ExtendedActorSystem`` parameter, and a ``com.typesafe.config.Config`` parameter
* constructor with one ``ExtendedActorSystem`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
If the underlying datastore only supports queries that are completed when they reach the If the underlying datastore only supports queries that are completed when they reach the
end of the "result set", the journal has to submit new queries after a while in order end of the "result set", the journal has to submit new queries after a while in order
to support "infinite" event streams that include events stored after the initial query to support "infinite" event streams that include events stored after the initial query

View file

@ -937,15 +937,21 @@ A journal plugin can be activated with the following minimal configuration:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` The journal plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to
``akka.persistence.dispatchers.default-plugin-dispatcher``.
Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks.
@ -960,15 +966,21 @@ A snapshot store plugin can be activated with the following minimal configuratio
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
The snapshot store instance is an actor so the methods corresponding to requests from persistent actors The snapshot store instance is an actor so the methods corresponding to requests from persistent actors
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
actors to achive parallelism. actors to achive parallelism.
The snapshot store plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` The snapshot store plugin class must have a constructor with one of these signatures:
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path
* constructor with one ``com.typesafe.config.Config`` parameter
* constructor without parameters
The plugin section of the actor system's config will be passed in the config constructor parameter. The config path
of the plugin is passed in the ``String`` parameter.
The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to
``akka.persistence.dispatchers.default-plugin-dispatcher``.
Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks.

View file

@ -82,11 +82,18 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) = def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) =
system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, args) system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, args)
instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) ::
(classOf[String], configPath) :: Nil)
.recoverWith {
case x: NoSuchMethodException instantiate(
(classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
}
.recoverWith { case x: NoSuchMethodException instantiate((classOf[ExtendedActorSystem], system) :: Nil) } .recoverWith { case x: NoSuchMethodException instantiate((classOf[ExtendedActorSystem], system) :: Nil) }
.recoverWith { case x: NoSuchMethodException instantiate(Nil) } .recoverWith { case x: NoSuchMethodException instantiate(Nil) }
.recoverWith { .recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) case ex: Exception Failure.apply(
new IllegalArgumentException("Unable to create read journal plugin instance for path " +
s"[$configPath], class [$pluginClassName]!", ex))
}.get }.get
} }

View file

@ -7,6 +7,7 @@ package akka.persistence.query
import akka.NotUsed import akka.NotUsed
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.ExtendedActorSystem
/** /**
* Use for tests only! * Use for tests only!
@ -29,10 +30,19 @@ class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.Rea
object DummyReadJournalProvider { object DummyReadJournalProvider {
final val config: Config = ConfigFactory.parseString( final val config: Config = ConfigFactory.parseString(
s""" s"""
|${DummyReadJournal.Identifier} { ${DummyReadJournal.Identifier} {
| class = "${classOf[DummyReadJournalProvider].getCanonicalName}" class = "${classOf[DummyReadJournalProvider].getCanonicalName}"
|} }
""".stripMargin) ${DummyReadJournal.Identifier}2 {
class = "${classOf[DummyReadJournalProvider2].getCanonicalName}"
}
${DummyReadJournal.Identifier}3 {
class = "${classOf[DummyReadJournalProvider3].getCanonicalName}"
}
${DummyReadJournal.Identifier}4 {
class = "${classOf[DummyReadJournalProvider4].getCanonicalName}"
}
""")
} }
class DummyReadJournalProvider extends ReadJournalProvider { class DummyReadJournalProvider extends ReadJournalProvider {
@ -43,3 +53,10 @@ class DummyReadJournalProvider extends ReadJournalProvider {
override val javadslReadJournal: DummyReadJournalForJava = override val javadslReadJournal: DummyReadJournalForJava =
new DummyReadJournalForJava(scaladslReadJournal) new DummyReadJournalForJava(scaladslReadJournal)
} }
class DummyReadJournalProvider2(sys: ExtendedActorSystem) extends DummyReadJournalProvider
class DummyReadJournalProvider3(sys: ExtendedActorSystem, conf: Config) extends DummyReadJournalProvider
class DummyReadJournalProvider4(sys: ExtendedActorSystem, conf: Config, confPath: String) extends DummyReadJournalProvider

View file

@ -28,6 +28,10 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
"be found by full config key" in { "be found by full config key" in {
withActorSystem() { system withActorSystem() { system
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier) PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier)
// other combinations of constructor parameters
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "2")
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "3")
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "4")
} }
} }

View file

@ -289,9 +289,15 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher") val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher")
val pluginActorArgs = try { val pluginActorArgs = try {
Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found Reflect.findConstructor(pluginClass, List(pluginConfig, configPath)) // will throw if not found
List(pluginConfig) List(pluginConfig, configPath)
} catch { case NonFatal(_) Nil } // otherwise use empty constructor } catch {
case NonFatal(_)
try {
Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found
List(pluginConfig)
} catch { case NonFatal(_) Nil } // otherwise use empty constructor
}
val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs)
system.systemActorOf(pluginActorProps, pluginActorName) system.systemActorOf(pluginActorProps, pluginActorName)
} }