support akka configs in InitJoinAck (#1877)
* support akka configs in InitJoinAck * Update SeedNodeProcess.scala * add tests * add akka version * Init join ack test (#4) * modify mixed protocol test to be strict Update ClusterDaemon.scala * Update SeedNodeProcess.scala * refactor * rename methods (review comment)
This commit is contained in:
parent
de12867f94
commit
d7f0d51028
7 changed files with 139 additions and 23 deletions
|
|
@ -610,9 +610,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
private lazy val strictAkkaConfig: Boolean = ConfigUtil.isStrictAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
private lazy val akkaVersion: String = ConfigUtil.getAkkaVersion(context.system.settings.config)
|
||||
|
||||
def initJoin(inputConfig: Config): Unit = {
|
||||
val joiningNodeConfig = if (supportsAkkaConfig && !inputConfig.hasPath("pekko")) {
|
||||
ConfigUtil.changeAkkaToPekkoConfig(inputConfig)
|
||||
val joiningNodeConfig = if (supportsAkkaConfig) {
|
||||
if (inputConfig.hasPath("pekko")) {
|
||||
if (strictAkkaConfig)
|
||||
ConfigUtil.adaptAkkaToPekkoConfig(inputConfig.withoutPath("pekko"))
|
||||
else
|
||||
inputConfig
|
||||
} else {
|
||||
ConfigUtil.adaptAkkaToPekkoConfig(inputConfig)
|
||||
}
|
||||
} else {
|
||||
inputConfig
|
||||
}
|
||||
|
|
@ -661,7 +673,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
// any sensitive keys as defined by this node configuration
|
||||
val clusterConfig =
|
||||
JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config)
|
||||
CompatibleConfig(clusterConfig)
|
||||
val adjustedConfig = if (supportsAkkaConfig) {
|
||||
ConfigUtil.addAkkaConfig(clusterConfig, akkaVersion)
|
||||
} else clusterConfig
|
||||
CompatibleConfig(adjustedConfig)
|
||||
}
|
||||
case Invalid(messages) =>
|
||||
// messages are only logged on the cluster side
|
||||
|
|
|
|||
|
|
@ -19,15 +19,21 @@ package org.apache.pekko.cluster
|
|||
|
||||
import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType }
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import org.apache.pekko
|
||||
import pekko.annotation.InternalApi
|
||||
|
||||
@InternalApi
|
||||
private[cluster] object ConfigUtil {
|
||||
|
||||
private val PekkoPrefix = "org.apache.pekko"
|
||||
private val AkkaPrefix = "akka"
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
|
||||
val newConfig = adaptPekkoToAkkaConfig(cfg)
|
||||
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
|
||||
}
|
||||
|
||||
def adaptPekkoToAkkaConfig(cfg: Config): Config = {
|
||||
import org.apache.pekko.util.ccompat.JavaConverters._
|
||||
val innerSet = cfg.entrySet().asScala
|
||||
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT)
|
||||
|
|
@ -38,11 +44,10 @@ private[cluster] object ConfigUtil {
|
|||
innerSet.foreach { case (key, value) =>
|
||||
newConfig = newConfig.withValue(key, value)
|
||||
}
|
||||
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
|
||||
newConfig
|
||||
}
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
def changeAkkaToPekkoConfig(cfg: Config): Config = {
|
||||
def adaptAkkaToPekkoConfig(cfg: Config): Config = {
|
||||
import org.apache.pekko.util.ccompat.JavaConverters._
|
||||
val innerSet = cfg.entrySet().asScala
|
||||
.filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() != ConfigValueType.OBJECT)
|
||||
|
|
@ -62,6 +67,19 @@ private[cluster] object ConfigUtil {
|
|||
.contains("akka")
|
||||
}
|
||||
|
||||
def getAkkaVersion(cfg: Config): String = {
|
||||
if (cfg.hasPath("akka.version")) {
|
||||
cfg.getString("akka.version")
|
||||
} else {
|
||||
cfg.getString("pekko.remote.akka.version")
|
||||
}
|
||||
}
|
||||
|
||||
def isStrictAkkaConfig(cfg: Config): Boolean = {
|
||||
cfg.getString("pekko.remote.protocol-name") == "akka" &&
|
||||
cfg.getBoolean("pekko.remote.enforce-strict-config-prefix-check-on-join")
|
||||
}
|
||||
|
||||
private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue = {
|
||||
if (cv.valueType() == ConfigValueType.STRING) {
|
||||
val str = cv.unwrapped().toString
|
||||
|
|
|
|||
|
|
@ -47,17 +47,13 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
|
||||
"This node will be shutdown!"
|
||||
|
||||
private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
|
||||
private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
private lazy val akkaVersion: String = {
|
||||
val cfg = context.system.settings.config
|
||||
if (cfg.hasPath("akka.version")) {
|
||||
cfg.getString("akka.version")
|
||||
} else {
|
||||
cfg.getString("pekko.remote.akka.version")
|
||||
}
|
||||
}
|
||||
private lazy val strictAkkaConfig: Boolean = ConfigUtil.isStrictAkkaConfig(
|
||||
context.system.settings.config)
|
||||
|
||||
private lazy val akkaVersion: String = ConfigUtil.getAkkaVersion(context.system.settings.config)
|
||||
|
||||
private def stopOrBecome(behavior: Option[Actor.Receive]): Unit =
|
||||
behavior match {
|
||||
|
|
@ -77,7 +73,7 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
val configToValidate =
|
||||
JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)
|
||||
|
||||
val adjustedConfig = if (needsAkkaConfig)
|
||||
val adjustedConfig = if (supportsAkkaConfig)
|
||||
ConfigUtil.addAkkaConfig(configToValidate, akkaVersion)
|
||||
else configToValidate
|
||||
|
||||
|
|
@ -128,7 +124,19 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
logInitJoinAckReceived(origin)
|
||||
|
||||
// validates config coming from cluster against this node config
|
||||
joinConfigCompatChecker.check(configCheck.clusterConfig, context.system.settings.config) match {
|
||||
val toCheck = if (supportsAkkaConfig) {
|
||||
if (configCheck.clusterConfig.hasPath("pekko")) {
|
||||
if (strictAkkaConfig)
|
||||
ConfigUtil.adaptAkkaToPekkoConfig(configCheck.clusterConfig.withoutPath("pekko"))
|
||||
else
|
||||
configCheck.clusterConfig
|
||||
} else {
|
||||
ConfigUtil.adaptAkkaToPekkoConfig(configCheck.clusterConfig)
|
||||
}
|
||||
} else {
|
||||
configCheck.clusterConfig
|
||||
}
|
||||
joinConfigCompatChecker.check(toCheck, context.system.settings.config) match {
|
||||
case Valid =>
|
||||
// first InitJoinAck reply
|
||||
context.parent ! JoinTo(joinTo)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.testkit.PekkoSpec
|
||||
|
||||
class ConfigUtilSpec extends PekkoSpec {
|
||||
|
||||
val pekkoSbrClass = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
|
||||
val akkaSbrClass = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||
|
||||
"ConfigUtil" must {
|
||||
"support adaptAkkaToPekkoConfig" in {
|
||||
val akkaConfig = ConfigFactory.parseString(s"""
|
||||
akka.cluster.downing-provider-class = "$akkaSbrClass"
|
||||
akka.cluster.split-brain-resolver.active-strategy = keep-majority
|
||||
akka.version = "2.6.21"
|
||||
""")
|
||||
val pekkoConfig = ConfigUtil.adaptAkkaToPekkoConfig(akkaConfig)
|
||||
pekkoConfig.getString("pekko.cluster.downing-provider-class") should ===(pekkoSbrClass)
|
||||
pekkoConfig.getString("pekko.cluster.split-brain-resolver.active-strategy") should ===("keep-majority")
|
||||
pekkoConfig.getString("pekko.version") should ===("2.6.21")
|
||||
}
|
||||
"support adaptPekkoToAkkaConfig" in {
|
||||
val akkaConfig = ConfigFactory.parseString(s"""
|
||||
pekko.cluster.downing-provider-class = "$pekkoSbrClass"
|
||||
pekko.cluster.split-brain-resolver.active-strategy = keep-majority
|
||||
pekko.version = "1.2.3"
|
||||
""")
|
||||
val pekkoConfig = ConfigUtil.adaptPekkoToAkkaConfig(akkaConfig)
|
||||
pekkoConfig.getString("akka.cluster.downing-provider-class") should ===(akkaSbrClass)
|
||||
pekkoConfig.getString("akka.cluster.split-brain-resolver.active-strategy") should ===("keep-majority")
|
||||
pekkoConfig.getString("akka.version") should ===("1.2.3")
|
||||
}
|
||||
"support addAkkaConfig" in {
|
||||
val akkaConfig = ConfigFactory.parseString(s"""
|
||||
pekko.cluster.downing-provider-class = "$pekkoSbrClass"
|
||||
pekko.cluster.split-brain-resolver.active-strategy = keep-majority
|
||||
pekko.version = "1.2.3"
|
||||
""")
|
||||
val pekkoConfig = ConfigUtil.addAkkaConfig(akkaConfig, "2.6.21")
|
||||
pekkoConfig.getString("akka.cluster.downing-provider-class") should ===(akkaSbrClass)
|
||||
pekkoConfig.getString("akka.cluster.split-brain-resolver.active-strategy") should ===("keep-majority")
|
||||
pekkoConfig.getString("akka.version") should ===("2.6.21")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -120,7 +120,7 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec {
|
|||
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||
akka.version = "2.6.21"
|
||||
""")
|
||||
checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)) should ===(Valid)
|
||||
checkInitJoin(oldConfig, ConfigUtil.adaptAkkaToPekkoConfig(newConfig)) should ===(Valid)
|
||||
}
|
||||
|
||||
"be invalid when not equivalent downing-provider (akka/pekko mixed cluster)" in {
|
||||
|
|
@ -133,7 +133,7 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec {
|
|||
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||
akka.version = "2.6.21"
|
||||
""")
|
||||
checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid])
|
||||
checkInitJoin(oldConfig, ConfigUtil.adaptAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,7 +35,10 @@ object MixedProtocolClusterSpec {
|
|||
pekko.remote.classic.netty.tcp.port = 0
|
||||
pekko.remote.artery.advanced.aeron.idle-cpu-level = 3
|
||||
pekko.remote.accept-protocol-names = ["pekko", "akka"]
|
||||
pekko.remote.enforce-strict-config-prefix-check-on-join = on
|
||||
|
||||
pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
|
||||
pekko.cluster.split-brain-resolver.active-strategy = keep-majority
|
||||
pekko.cluster.jmx.multi-mbeans-in-same-jvm = on""")
|
||||
|
||||
val configWithUdp: Config =
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue