Allow reuse of journal loading logic (#24990)

* Allow reuse of journal loading logic

Probably can be generialised even more. At present i've tested this with
the akka persistence updater extension.

* Make the implicit explicit

* Added mima filter for removing internal class

* Review feedback
This commit is contained in:
Christopher Batey 2018-04-26 14:00:15 +01:00 committed by Konrad `ktoso` Malawski
parent 90c2ce9f13
commit 9051e2fcda
5 changed files with 121 additions and 70 deletions

View file

@ -178,6 +178,7 @@ PR_TARGET_BRANCH=origin/example sbt validatePullRequest
``` ```
## Binary compatibility ## Binary compatibility
Binary compatibility rules and guarantees are described in depth in the [Binary Compatibility Rules Binary compatibility rules and guarantees are described in depth in the [Binary Compatibility Rules
](http://doc.akka.io/docs/akka/snapshot/common/binary-compatibility-rules.html) section of the documentation. ](http://doc.akka.io/docs/akka/snapshot/common/binary-compatibility-rules.html) section of the documentation.

View file

@ -0,0 +1,3 @@
# Internal API changes
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.query.PersistenceQuery$PluginHolder")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.query.PersistenceQuery$PluginHolder$")

View file

@ -4,15 +4,14 @@
package akka.persistence.query package akka.persistence.query
import java.util.concurrent.atomic.AtomicReference
import akka.actor._ import akka.actor._
import akka.event.Logging import akka.annotation.InternalApi
import akka.persistence.query.scaladsl.ReadJournal
import scala.annotation.tailrec import akka.persistence.{ PersistencePlugin, PluginProvider }
import scala.util.Failure
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.reflect.ClassTag
/** /**
* Persistence extension for queries. * Persistence extension for queries.
*/ */
@ -24,23 +23,20 @@ object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdPr
def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system) def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system)
def lookup() = PersistenceQuery def lookup(): PersistenceQuery.type = PersistenceQuery
/** INTERNAL API. */ @InternalApi
private[persistence] case class PluginHolder( private[akka] val pluginProvider: PluginProvider[ReadJournalProvider, ReadJournal, javadsl.ReadJournal] =
scaladslPlugin: scaladsl.ReadJournal, javadslPlugin: akka.persistence.query.javadsl.ReadJournal) new PluginProvider[ReadJournalProvider, scaladsl.ReadJournal, javadsl.ReadJournal] {
extends Extension override def scalaDsl(t: ReadJournalProvider): ReadJournal = t.scaladslReadJournal()
override def javaDsl(t: ReadJournalProvider): javadsl.ReadJournal = t.javadslReadJournal()
}
} }
class PersistenceQuery(system: ExtendedActorSystem) extends Extension { class PersistenceQuery(system: ExtendedActorSystem)
import PersistenceQuery._ extends PersistencePlugin[scaladsl.ReadJournal, javadsl.ReadJournal, ReadJournalProvider](system)(ClassTag(classOf[ReadJournalProvider]), PersistenceQuery.pluginProvider)
with Extension {
private val log = Logging(system, getClass)
/** Discovered query plugins. */
private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/** /**
* Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given
* read journal configuration entry. * read journal configuration entry.
@ -49,7 +45,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
* config. * config.
*/ */
final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T = final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T =
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T] pluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T]
/** /**
* Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given
@ -63,57 +59,9 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
* read journal configuration entry. * read journal configuration entry.
*/ */
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T = final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T =
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T] pluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T]
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty()) final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T = getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty())
@tailrec private def readJournalPluginFor(readJournalPluginId: String, readJournalPluginConfig: Config): PluginHolder = {
val configPath = readJournalPluginId
val extensionIdMap = readJournalPluginExtensionIds.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system)
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val provider = createPlugin(configPath, readJournalPluginConfig)
PluginHolder(provider.scaladslReadJournal(), provider.javadslReadJournal())
}
}
readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
readJournalPluginFor(readJournalPluginId, readJournalPluginConfig) // Recursive invocation.
}
}
private def createPlugin(configPath: String, readJournalPluginConfig: Config): ReadJournalProvider = {
val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config)
require(
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'")
val pluginConfig = mergedConfig.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${configPath} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) =
system.dynamicAccess.createInstanceFor[ReadJournalProvider](pluginClass, args)
instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) ::
(classOf[String], configPath) :: Nil)
.recoverWith {
case x: NoSuchMethodException instantiate(
(classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
}
.recoverWith { case x: NoSuchMethodException instantiate((classOf[ExtendedActorSystem], system) :: Nil) }
.recoverWith { case x: NoSuchMethodException instantiate(Nil) }
.recoverWith {
case ex: Exception Failure.apply(
new IllegalArgumentException("Unable to create read journal plugin instance for path " +
s"[$configPath], class [$pluginClassName]!", ex))
}.get
}
/** Check for default or missing identity. */
private def isEmpty(text: String) = text == null || text.length == 0
} }

View file

@ -64,7 +64,7 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
withActorSystem() { system withActorSystem() { system
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "-unknown") PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "-unknown")
}.getMessage should include("missing persistence read journal") }.getMessage should include("missing persistence plugin")
} }
} }

View file

@ -0,0 +1,99 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.PersistencePlugin.PluginHolder
import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.util.Failure
/**
* INTERNAL API
*/
@InternalApi
private[akka] object PersistencePlugin {
final private[persistence] case class PluginHolder[ScalaDsl, JavaDsl](
scaladslPlugin: ScalaDsl, javadslPlugin: JavaDsl)
extends Extension
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] trait PluginProvider[T, ScalaDsl, JavaDsl] {
def scalaDsl(t: T): ScalaDsl
def javaDsl(t: T): JavaDsl
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] abstract class PersistencePlugin[ScalaDsl, JavaDsl, T: ClassTag](system: ExtendedActorSystem)(implicit ev: PluginProvider[T, ScalaDsl, JavaDsl]) {
private val plugins = new AtomicReference[Map[String, ExtensionId[PluginHolder[ScalaDsl, JavaDsl]]]](Map.empty)
private val log = Logging(system, getClass)
@tailrec
final protected def pluginFor(pluginId: String, readJournalPluginConfig: Config): PluginHolder[ScalaDsl, JavaDsl] = {
val configPath = pluginId
val extensionIdMap = plugins.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system)
case None
val extensionId = new ExtensionId[PluginHolder[ScalaDsl, JavaDsl]] {
override def createExtension(system: ExtendedActorSystem): PluginHolder[ScalaDsl, JavaDsl] = {
val provider = createPlugin(configPath, readJournalPluginConfig)
PluginHolder(
ev.scalaDsl(provider),
ev.javaDsl(provider)
)
}
}
plugins.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
pluginFor(pluginId, readJournalPluginConfig)
}
}
private def createPlugin(configPath: String, readJournalPluginConfig: Config): T = {
val mergedConfig = readJournalPluginConfig.withFallback(system.settings.config)
require(
!isEmpty(configPath) && mergedConfig.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val pluginConfig = mergedConfig.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: $configPath $pluginClassName")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
def instantiate(args: collection.immutable.Seq[(Class[_], AnyRef)]) =
system.dynamicAccess.createInstanceFor[T](pluginClass, args)
instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) ::
(classOf[String], configPath) :: Nil)
.recoverWith {
case x: NoSuchMethodException instantiate(
(classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
}
.recoverWith { case x: NoSuchMethodException instantiate((classOf[ExtendedActorSystem], system) :: Nil) }
.recoverWith { case x: NoSuchMethodException instantiate(Nil) }
.recoverWith {
case ex: Exception Failure.apply(
new IllegalArgumentException("Unable to create read journal plugin instance for path " +
s"[$configPath], class [$pluginClassName]!", ex))
}.get
}
/** Check for default or missing identity. */
private def isEmpty(text: String): Boolean = text == null || text.length == 0
}