diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 1c5076d191..f773a65e3c 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -870,15 +870,21 @@ A journal plugin can be activated with the following minimal configuration: .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config -The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. - The journal plugin instance is an actor so the methods corresponding to requests from persistent actors are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The journal plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + +The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to +``akka.persistence.dispatchers.default-plugin-dispatcher``. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. @@ -893,15 +899,21 @@ A snapshot store plugin can be activated with the following minimal configuratio .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config -The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. - The snapshot store instance is an actor so the methods corresponding to requests from persistent actors are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The snapshot store plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The snapshot store plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + +The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to +``akka.persistence.dispatchers.default-plugin-dispatcher``. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index 376ea26504..7a44b74e52 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -255,6 +255,16 @@ And the ``EventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher +The ``ReadJournalProvider`` class must have a constructor with one of these signatures: + +* constructor with a ``ExtendedActorSystem`` parameter, a ``com.typesafe.config.Config`` parameter, and a ``String`` parameter for the config path +* constructor with a ``ExtendedActorSystem`` parameter, and a ``com.typesafe.config.Config`` parameter +* constructor with one ``ExtendedActorSystem`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + If the underlying datastore only supports queries that are completed when they reach the end of the "result set", the journal has to submit new queries after a while in order to support "infinite" event streams that include events stored after the initial query diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b9bef5d105..b56b07125c 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -874,8 +874,14 @@ The journal plugin instance is an actor so the methods corresponding to requests are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The journal plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. @@ -890,15 +896,21 @@ A snapshot store plugin can be activated with the following minimal configuratio .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config -The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. - The snapshot store instance is an actor so the methods corresponding to requests from persistent actors are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The snapshot store plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The snapshot store plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + +The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to +``akka.persistence.dispatchers.default-plugin-dispatcher``. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 6026cf38f9..27c4bfb37c 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -250,6 +250,16 @@ And the ``eventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher +The ``ReadJournalProvider`` class must have a constructor with one of these signatures: + +* constructor with a ``ExtendedActorSystem`` parameter, a ``com.typesafe.config.Config`` parameter, and a ``String`` parameter for the config path +* constructor with a ``ExtendedActorSystem`` parameter, and a ``com.typesafe.config.Config`` parameter +* constructor with one ``ExtendedActorSystem`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + If the underlying datastore only supports queries that are completed when they reach the end of the "result set", the journal has to submit new queries after a while in order to support "infinite" event streams that include events stored after the initial query diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 5a3ea8acbc..586aec3d8d 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -937,15 +937,21 @@ A journal plugin can be activated with the following minimal configuration: .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config -The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. - The journal plugin instance is an actor so the methods corresponding to requests from persistent actors are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The journal plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The journal plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + +The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to +``akka.persistence.dispatchers.default-plugin-dispatcher``. Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. @@ -960,15 +966,21 @@ A snapshot store plugin can be activated with the following minimal configuratio .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config -The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. - The snapshot store instance is an actor so the methods corresponding to requests from persistent actors are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other actors to achive parallelism. -The snapshot store plugin class must have a constructor without parameters or a constructor with one ``com.typesafe.config.Config`` -parameter. The plugin section of the actor system's config will be passed in the config constructor parameter. +The snapshot store plugin class must have a constructor with one of these signatures: + +* constructor with one ``com.typesafe.config.Config`` parameter and a ``String`` parameter for the config path +* constructor with one ``com.typesafe.config.Config`` parameter +* constructor without parameters + +The plugin section of the actor system's config will be passed in the config constructor parameter. The config path +of the plugin is passed in the ``String`` parameter. + +The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to +``akka.persistence.dispatchers.default-plugin-dispatcher``. Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 059dc8f329..6a5b8c0347 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -82,11 +82,18 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) = system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, args) - instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: + (classOf[String], configPath) :: Nil) + .recoverWith { + case x: NoSuchMethodException ⇒ instantiate( + (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + } .recoverWith { case x: NoSuchMethodException ⇒ instantiate((classOf[ExtendedActorSystem], system) :: Nil) } .recoverWith { case x: NoSuchMethodException ⇒ instantiate(Nil) } .recoverWith { - case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + case ex: Exception ⇒ Failure.apply( + new IllegalArgumentException("Unable to create read journal plugin instance for path " + + s"[$configPath], class [$pluginClassName]!", ex)) }.get } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala index ddcb824d5b..91d614ef32 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -7,6 +7,7 @@ package akka.persistence.query import akka.NotUsed import akka.stream.scaladsl.Source import com.typesafe.config.{ Config, ConfigFactory } +import akka.actor.ExtendedActorSystem /** * Use for tests only! @@ -29,10 +30,19 @@ class DummyReadJournalForJava(readJournal: DummyReadJournal) extends javadsl.Rea object DummyReadJournalProvider { final val config: Config = ConfigFactory.parseString( s""" - |${DummyReadJournal.Identifier} { - | class = "${classOf[DummyReadJournalProvider].getCanonicalName}" - |} - """.stripMargin) + ${DummyReadJournal.Identifier} { + class = "${classOf[DummyReadJournalProvider].getCanonicalName}" + } + ${DummyReadJournal.Identifier}2 { + class = "${classOf[DummyReadJournalProvider2].getCanonicalName}" + } + ${DummyReadJournal.Identifier}3 { + class = "${classOf[DummyReadJournalProvider3].getCanonicalName}" + } + ${DummyReadJournal.Identifier}4 { + class = "${classOf[DummyReadJournalProvider4].getCanonicalName}" + } + """) } class DummyReadJournalProvider extends ReadJournalProvider { @@ -43,3 +53,10 @@ class DummyReadJournalProvider extends ReadJournalProvider { override val javadslReadJournal: DummyReadJournalForJava = new DummyReadJournalForJava(scaladslReadJournal) } + +class DummyReadJournalProvider2(sys: ExtendedActorSystem) extends DummyReadJournalProvider + +class DummyReadJournalProvider3(sys: ExtendedActorSystem, conf: Config) extends DummyReadJournalProvider + +class DummyReadJournalProvider4(sys: ExtendedActorSystem, conf: Config, confPath: String) extends DummyReadJournalProvider + diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index ac08eeaa13..ff9ecfad40 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -28,6 +28,10 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte "be found by full config key" in { withActorSystem() { system ⇒ PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier) + // other combinations of constructor parameters + PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "2") + PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "3") + PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "4") } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index f76893c117..5f0f9d4735 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -289,9 +289,15 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher") val pluginActorArgs = try { - Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found - List(pluginConfig) - } catch { case NonFatal(_) ⇒ Nil } // otherwise use empty constructor + Reflect.findConstructor(pluginClass, List(pluginConfig, configPath)) // will throw if not found + List(pluginConfig, configPath) + } catch { + case NonFatal(_) ⇒ + try { + Reflect.findConstructor(pluginClass, List(pluginConfig)) // will throw if not found + List(pluginConfig) + } catch { case NonFatal(_) ⇒ Nil } // otherwise use empty constructor + } val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) system.systemActorOf(pluginActorProps, pluginActorName) }