From dbeb07a3ebd81aae9599103b543e29bcda668761 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Aug 2015 17:26:08 +0200 Subject: [PATCH] =per #17844 Clarify docs around journal dispatchers and never run plugin actor on default dispatcher --- akka-docs/rst/java/lambda-persistence.rst | 15 +++++++++++++-- akka-docs/rst/java/persistence.rst | 15 +++++++++++++-- akka-docs/rst/scala/persistence.rst | 15 +++++++++++++-- .../src/main/resources/reference.conf | 4 +++- .../main/scala/akka/persistence/Persistence.scala | 12 +++--------- 5 files changed, 45 insertions(+), 16 deletions(-) diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 4be673daae..60c09fc16f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -688,8 +688,13 @@ A journal plugin can be activated with the following minimal configuration: .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` -for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + +The journal plugin instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Snapshot store plugin API ------------------------- @@ -705,6 +710,12 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +The snapshot store instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. + Pre-packaged plugins ==================== diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index e2e7dfba93..bdbe0f58b7 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -693,8 +693,13 @@ A journal plugin can be activated with the following minimal configuration: .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` -for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + +The journal plugin instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Snapshot store plugin API ------------------------- @@ -710,6 +715,12 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +The snapshot store instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. + Plugin TCK ---------- In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK `_ for short). diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 565a0f636b..946c81c8a0 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -745,8 +745,13 @@ A journal plugin can be activated with the following minimal configuration: .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher -used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` -for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + +The journal plugin instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks. Snapshot store plugin API ------------------------- @@ -762,6 +767,12 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +The snapshot store instance is an actor so the methods corresponding to requests from persistent actors +are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other +actors to achive parallelism. + +Don't run snapshot store tasks/futures on the system default dispatcher, since that might starve other tasks. + Plugin TCK ---------- In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK `_ for short). diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index fade70fa4d..2c32a1aa1c 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -63,6 +63,7 @@ akka.persistence { type = PinnedDispatcher executor = "thread-pool-executor" } + # Default dispatcher for message replay. default-replay-dispatcher { type = Dispatcher executor = "fork-join-executor" @@ -71,6 +72,7 @@ akka.persistence { parallelism-max = 8 } } + # Default dispatcher for streaming snapshot IO default-stream-dispatcher { type = Dispatcher executor = "fork-join-executor" @@ -176,7 +178,7 @@ akka.persistence.journal.leveldb-shared { # Dispatcher for shared store actor. store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. - replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" # Storage location of LevelDB files. dir = "journal" # Use fsync on write. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index effd6fbd3b..3a383f75e5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -161,12 +161,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { val settings = new PersistenceSettings(config) - private def journalDispatchSelector(klaz: Class[_]): String = - if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId // TODO sure this is not inverted? - - private def snapshotDispatchSelector(klaz: Class[_]): String = - DefaultPluginDispatcherId - /** Check for default or missing identity. */ private def isEmpty(text: String) = text == null || text.length == 0 @@ -247,7 +241,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } } - private def createPlugin(configPath: String)(dispatcherSelector: Class[_] ⇒ String) = { + private def createPlugin(configPath: String): ActorRef = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), s"'reference.conf' is missing persistence plugin config path: '$configPath'") val pluginActorName = configPath @@ -256,7 +250,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { log.debug(s"Create plugin: $pluginActorName $pluginClassName") val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false - val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) + val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else DefaultPluginDispatcherId val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) system.systemActorOf(pluginActorProps, pluginActorName) @@ -274,7 +268,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] { override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val plugin = createPlugin(configPath)(journalDispatchSelector) + val plugin = createPlugin(configPath) val adapters = createAdapters(configPath) PluginHolder(plugin, adapters) }