From 9051e2fcda8c68feaf5edb98392acf62a4a727e2 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 26 Apr 2018 14:00:15 +0100 Subject: [PATCH] Allow reuse of journal loading logic (#24990) * Allow reuse of journal loading logic Probably can be generialised even more. At present i've tested this with the akka persistence updater extension. * Make the implicit explicit * Added mima filter for removing internal class * Review feedback --- CONTRIBUTING.md | 1 + .../mima-filters/2.5.12.backwards.excludes | 3 + .../persistence/query/PersistenceQuery.scala | 86 ++++------------ .../query/PersistenceQuerySpec.scala | 2 +- .../akka/persistence/PersistencePlugin.scala | 99 +++++++++++++++++++ 5 files changed, 121 insertions(+), 70 deletions(-) create mode 100644 akka-persistence-query/src/main/mima-filters/2.5.12.backwards.excludes create mode 100644 akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index cca85e2228..b437f2a6a1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -178,6 +178,7 @@ PR_TARGET_BRANCH=origin/example sbt validatePullRequest ``` ## Binary compatibility + Binary compatibility rules and guarantees are described in depth in the [Binary Compatibility Rules ](http://doc.akka.io/docs/akka/snapshot/common/binary-compatibility-rules.html) section of the documentation. diff --git a/akka-persistence-query/src/main/mima-filters/2.5.12.backwards.excludes b/akka-persistence-query/src/main/mima-filters/2.5.12.backwards.excludes new file mode 100644 index 0000000000..5c32756433 --- /dev/null +++ b/akka-persistence-query/src/main/mima-filters/2.5.12.backwards.excludes @@ -0,0 +1,3 @@ +# Internal API changes +ProblemFilters.exclude[MissingClassProblem]("akka.persistence.query.PersistenceQuery$PluginHolder") +ProblemFilters.exclude[MissingClassProblem]("akka.persistence.query.PersistenceQuery$PluginHolder$") diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 0077e657ae..8280d08ea4 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -4,15 +4,14 @@ package akka.persistence.query -import java.util.concurrent.atomic.AtomicReference - import akka.actor._ -import akka.event.Logging - -import scala.annotation.tailrec -import scala.util.Failure +import akka.annotation.InternalApi +import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.{ PersistencePlugin, PluginProvider } import com.typesafe.config.{ Config, ConfigFactory } +import scala.reflect.ClassTag + /** * Persistence extension for queries. */ @@ -24,23 +23,20 @@ object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdPr def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system) - def lookup() = PersistenceQuery + def lookup(): PersistenceQuery.type = PersistenceQuery - /** INTERNAL API. */ - private[persistence] case class PluginHolder( - scaladslPlugin: scaladsl.ReadJournal, javadslPlugin: akka.persistence.query.javadsl.ReadJournal) - extends Extension + @InternalApi + private[akka] val pluginProvider: PluginProvider[ReadJournalProvider, ReadJournal, javadsl.ReadJournal] = + new PluginProvider[ReadJournalProvider, scaladsl.ReadJournal, javadsl.ReadJournal] { + override def scalaDsl(t: ReadJournalProvider): ReadJournal = t.scaladslReadJournal() + override def javaDsl(t: ReadJournalProvider): javadsl.ReadJournal = t.javadslReadJournal() + } } -class PersistenceQuery(system: ExtendedActorSystem) extends Extension { - import PersistenceQuery._ - - private val log = Logging(system, getClass) - - /** Discovered query plugins. */ - private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) - +class PersistenceQuery(system: ExtendedActorSystem) + extends PersistencePlugin[scaladsl.ReadJournal, javadsl.ReadJournal, ReadJournalProvider](system)(ClassTag(classOf[ReadJournalProvider]), PersistenceQuery.pluginProvider) + with Extension { /** * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given * read journal configuration entry. @@ -49,7 +45,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * config. */ final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T = - readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T] + pluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T] /** * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given @@ -63,57 +59,9 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * read journal configuration entry. */ final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T = - readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T] + pluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T] final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty()) - @tailrec private def readJournalPluginFor(readJournalPluginId: String, readJournalPluginConfig: Config): PluginHolder = { - val configPath = readJournalPluginId - val extensionIdMap = readJournalPluginExtensionIds.get - extensionIdMap.get(configPath) match { - case Some(extensionId) ⇒ - extensionId(system) - case None ⇒ - val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val provider = createPlugin(configPath, readJournalPluginConfig) - PluginHolder(provider.scaladslReadJournal(), provider.javadslReadJournal()) - } - } - readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - readJournalPluginFor(readJournalPluginId, readJournalPluginConfig) // Recursive invocation. - } - } - - private def createPlugin(configPath: String, readJournalPluginConfig: Config): ReadJournalProvider = { - val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config) - require( - !isEmpty(configPath) && mergedConfig.hasPath(configPath), - s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") - val pluginConfig = mergedConfig.getConfig(configPath) - val pluginClassName = pluginConfig.getString("class") - log.debug(s"Create plugin: ${configPath} ${pluginClassName}") - val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get - - def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) = - system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, args) - - instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: - (classOf[String], configPath) :: Nil) - .recoverWith { - case x: NoSuchMethodException ⇒ instantiate( - (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) - } - .recoverWith { case x: NoSuchMethodException ⇒ instantiate((classOf[ExtendedActorSystem], system) :: Nil) } - .recoverWith { case x: NoSuchMethodException ⇒ instantiate(Nil) } - .recoverWith { - case ex: Exception ⇒ Failure.apply( - new IllegalArgumentException("Unable to create read journal plugin instance for path " + - s"[$configPath], class [$pluginClassName]!", ex)) - }.get - } - - /** Check for default or missing identity. */ - private def isEmpty(text: String) = text == null || text.length == 0 } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index cae8379228..7bea826633 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -64,7 +64,7 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte withActorSystem() { system ⇒ intercept[IllegalArgumentException] { PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "-unknown") - }.getMessage should include("missing persistence read journal") + }.getMessage should include("missing persistence plugin") } } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala b/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala new file mode 100644 index 0000000000..c19994a6be --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.PersistencePlugin.PluginHolder +import com.typesafe.config.Config + +import scala.annotation.tailrec +import scala.reflect.ClassTag +import scala.util.Failure + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object PersistencePlugin { + final private[persistence] case class PluginHolder[ScalaDsl, JavaDsl]( + scaladslPlugin: ScalaDsl, javadslPlugin: JavaDsl) + extends Extension +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait PluginProvider[T, ScalaDsl, JavaDsl] { + def scalaDsl(t: T): ScalaDsl + def javaDsl(t: T): JavaDsl +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] abstract class PersistencePlugin[ScalaDsl, JavaDsl, T: ClassTag](system: ExtendedActorSystem)(implicit ev: PluginProvider[T, ScalaDsl, JavaDsl]) { + + private val plugins = new AtomicReference[Map[String, ExtensionId[PluginHolder[ScalaDsl, JavaDsl]]]](Map.empty) + private val log = Logging(system, getClass) + + @tailrec + final protected def pluginFor(pluginId: String, readJournalPluginConfig: Config): PluginHolder[ScalaDsl, JavaDsl] = { + val configPath = pluginId + val extensionIdMap = plugins.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system) + case None ⇒ + val extensionId = new ExtensionId[PluginHolder[ScalaDsl, JavaDsl]] { + override def createExtension(system: ExtendedActorSystem): PluginHolder[ScalaDsl, JavaDsl] = { + val provider = createPlugin(configPath, readJournalPluginConfig) + PluginHolder( + ev.scalaDsl(provider), + ev.javaDsl(provider) + ) + } + } + plugins.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + pluginFor(pluginId, readJournalPluginConfig) + } + } + + private def createPlugin(configPath: String, readJournalPluginConfig: Config): T = { + val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config) + require( + !isEmpty(configPath) && mergedConfig.hasPath(configPath), + s"'reference.conf' is missing persistence plugin config path: '$configPath'") + val pluginConfig = mergedConfig.getConfig(configPath) + val pluginClassName = pluginConfig.getString("class") + log.debug(s"Create plugin: $configPath $pluginClassName") + val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + + def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) = + system.dynamicAccess.createInstanceFor[T](pluginClass, args) + + instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: + (classOf[String], configPath) :: Nil) + .recoverWith { + case x: NoSuchMethodException ⇒ instantiate( + (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + } + .recoverWith { case x: NoSuchMethodException ⇒ instantiate((classOf[ExtendedActorSystem], system) :: Nil) } + .recoverWith { case x: NoSuchMethodException ⇒ instantiate(Nil) } + .recoverWith { + case ex: Exception ⇒ Failure.apply( + new IllegalArgumentException("Unable to create read journal plugin instance for path " + + s"[$configPath], class [$pluginClassName]!", ex)) + }.get + } + + /** Check for default or missing identity. */ + private def isEmpty(text: String): Boolean = text == null || text.length == 0 +}