Join cluster check adjusted to support akka (#1866)
* update cluster initjoin check to support akka * add test * extra test
This commit is contained in:
parent
980430a399
commit
9227912ea6
4 changed files with 79 additions and 10 deletions
|
|
@ -607,7 +607,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
case other => super.unhandled(other)
|
||||
}
|
||||
|
||||
def initJoin(joiningNodeConfig: Config): Unit = {
|
||||
private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
def initJoin(inputConfig: Config): Unit = {
|
||||
val joiningNodeConfig = if (supportsAkkaConfig && !inputConfig.hasPath("pekko")) {
|
||||
ConfigUtil.changeAkkaToPekkoConfig(inputConfig)
|
||||
} else {
|
||||
inputConfig
|
||||
}
|
||||
val joiningNodeVersion =
|
||||
if (joiningNodeConfig.hasPath("pekko.version")) joiningNodeConfig.getString("pekko.version")
|
||||
else "unknown"
|
||||
|
|
|
|||
|
|
@ -23,13 +23,16 @@ import scala.annotation.nowarn
|
|||
|
||||
private[cluster] object ConfigUtil {
|
||||
|
||||
private val PekkoPrefix = "org.apache.pekko"
|
||||
private val AkkaPrefix = "akka"
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
|
||||
import org.apache.pekko.util.ccompat.JavaConverters._
|
||||
val innerSet = cfg.entrySet().asScala
|
||||
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT)
|
||||
.map { entry =>
|
||||
entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue)
|
||||
entry.getKey.replace("pekko", "akka") -> adjustPackageNameToAkkaIfNecessary(entry.getValue)
|
||||
}
|
||||
var newConfig = cfg
|
||||
innerSet.foreach { case (key, value) =>
|
||||
|
|
@ -38,11 +41,45 @@ private[cluster] object ConfigUtil {
|
|||
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
|
||||
}
|
||||
|
||||
private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
|
||||
@nowarn("msg=deprecated")
|
||||
def changeAkkaToPekkoConfig(cfg: Config): Config = {
|
||||
import org.apache.pekko.util.ccompat.JavaConverters._
|
||||
val innerSet = cfg.entrySet().asScala
|
||||
.filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() != ConfigValueType.OBJECT)
|
||||
.map { entry =>
|
||||
entry.getKey.replace("akka", "pekko") -> adjustPackageNameToPekkoIfNecessary(entry.getValue)
|
||||
}
|
||||
var newConfig = cfg
|
||||
innerSet.foreach { case (key, value) =>
|
||||
newConfig = newConfig.withValue(key, value)
|
||||
}
|
||||
newConfig
|
||||
}
|
||||
|
||||
def supportsAkkaConfig(cfg: Config): Boolean = {
|
||||
cfg
|
||||
.getStringList("pekko.remote.accept-protocol-names")
|
||||
.contains("akka")
|
||||
}
|
||||
|
||||
private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue = {
|
||||
if (cv.valueType() == ConfigValueType.STRING) {
|
||||
val str = cv.unwrapped().toString
|
||||
if (str.startsWith("org.apache.pekko")) {
|
||||
ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
|
||||
if (str.startsWith(PekkoPrefix)) {
|
||||
ConfigValueFactory.fromAnyRef(str.replace(PekkoPrefix, AkkaPrefix))
|
||||
} else {
|
||||
cv
|
||||
}
|
||||
} else {
|
||||
cv
|
||||
}
|
||||
}
|
||||
|
||||
private def adjustPackageNameToPekkoIfNecessary(cv: ConfigValue): ConfigValue = {
|
||||
if (cv.valueType() == ConfigValueType.STRING) {
|
||||
val str = cv.unwrapped().toString
|
||||
if (str.startsWith(AkkaPrefix)) {
|
||||
ConfigValueFactory.fromAnyRef(str.replace(AkkaPrefix, PekkoPrefix))
|
||||
} else {
|
||||
cv
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,11 +47,8 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
|
||||
"This node will be shutdown!"
|
||||
|
||||
private lazy val needsAkkaConfig: Boolean = {
|
||||
context.system.settings.config
|
||||
.getStringList("pekko.remote.accept-protocol-names")
|
||||
.contains("akka")
|
||||
}
|
||||
private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
private lazy val akkaVersion: String = {
|
||||
val cfg = context.system.settings.config
|
||||
|
|
|
|||
|
|
@ -109,5 +109,32 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec {
|
|||
checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid])
|
||||
checkInitJoinAck(oldConfig, newConfig).getClass should ===(classOf[Invalid])
|
||||
}
|
||||
|
||||
"be valid when equivalent downing-provider (akka/pekko mixed cluster)" in {
|
||||
val oldConfig =
|
||||
ConfigFactory.parseString("""
|
||||
pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
|
||||
""").withFallback(system.settings.config)
|
||||
val newConfig =
|
||||
ConfigFactory.parseString("""
|
||||
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||
akka.version = "2.6.21"
|
||||
""")
|
||||
checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)) should ===(Valid)
|
||||
}
|
||||
|
||||
"be invalid when not equivalent downing-provider (akka/pekko mixed cluster)" in {
|
||||
val oldConfig =
|
||||
ConfigFactory.parseString("""
|
||||
pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.testkit.AutoDowning"
|
||||
""").withFallback(system.settings.config)
|
||||
val newConfig =
|
||||
ConfigFactory.parseString("""
|
||||
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||
akka.version = "2.6.21"
|
||||
""")
|
||||
checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid])
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue