diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index b289f370ca..df5a9b71ce 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -610,9 +610,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig( context.system.settings.config) + private lazy val strictAkkaConfig: Boolean = ConfigUtil.isStrictAkkaConfig( + context.system.settings.config) + + private lazy val akkaVersion: String = ConfigUtil.getAkkaVersion(context.system.settings.config) + def initJoin(inputConfig: Config): Unit = { - val joiningNodeConfig = if (supportsAkkaConfig && !inputConfig.hasPath("pekko")) { - ConfigUtil.changeAkkaToPekkoConfig(inputConfig) + val joiningNodeConfig = if (supportsAkkaConfig) { + if (inputConfig.hasPath("pekko")) { + if (strictAkkaConfig) + ConfigUtil.adaptAkkaToPekkoConfig(inputConfig.withoutPath("pekko")) + else + inputConfig + } else { + ConfigUtil.adaptAkkaToPekkoConfig(inputConfig) + } } else { inputConfig } @@ -661,7 +673,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // any sensitive keys as defined by this node configuration val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config) - CompatibleConfig(clusterConfig) + val adjustedConfig = if (supportsAkkaConfig) { + ConfigUtil.addAkkaConfig(clusterConfig, akkaVersion) + } else clusterConfig + CompatibleConfig(adjustedConfig) } case Invalid(messages) => // messages are only logged on the cluster side diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala index 62fdd0e325..553f4d5737 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala @@ -19,15 +19,21 @@ package org.apache.pekko.cluster import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType } -import scala.annotation.nowarn +import org.apache.pekko +import pekko.annotation.InternalApi +@InternalApi private[cluster] object ConfigUtil { private val PekkoPrefix = "org.apache.pekko" private val AkkaPrefix = "akka" - @nowarn("msg=deprecated") def addAkkaConfig(cfg: Config, akkaVersion: String): Config = { + val newConfig = adaptPekkoToAkkaConfig(cfg) + newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) + } + + def adaptPekkoToAkkaConfig(cfg: Config): Config = { import org.apache.pekko.util.ccompat.JavaConverters._ val innerSet = cfg.entrySet().asScala .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT) @@ -38,11 +44,10 @@ private[cluster] object ConfigUtil { innerSet.foreach { case (key, value) => newConfig = newConfig.withValue(key, value) } - newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) + newConfig } - @nowarn("msg=deprecated") - def changeAkkaToPekkoConfig(cfg: Config): Config = { + def adaptAkkaToPekkoConfig(cfg: Config): Config = { import org.apache.pekko.util.ccompat.JavaConverters._ val innerSet = cfg.entrySet().asScala .filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() != ConfigValueType.OBJECT) @@ -62,6 +67,19 @@ private[cluster] object ConfigUtil { .contains("akka") } + def getAkkaVersion(cfg: Config): String = { + if (cfg.hasPath("akka.version")) { + cfg.getString("akka.version") + } else { + cfg.getString("pekko.remote.akka.version") + } + } + + def isStrictAkkaConfig(cfg: Config): Boolean = { + cfg.getString("pekko.remote.protocol-name") == "akka" && + cfg.getBoolean("pekko.remote.enforce-strict-config-prefix-check-on-join") + } + private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue = { if (cv.valueType() == ConfigValueType.STRING) { val str = cv.unwrapped().toString diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala index 985875dba2..d2627da586 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala @@ -47,17 +47,13 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon "Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " + "This node will be shutdown!" - private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig( + private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig( context.system.settings.config) - private lazy val akkaVersion: String = { - val cfg = context.system.settings.config - if (cfg.hasPath("akka.version")) { - cfg.getString("akka.version") - } else { - cfg.getString("pekko.remote.akka.version") - } - } + private lazy val strictAkkaConfig: Boolean = ConfigUtil.isStrictAkkaConfig( + context.system.settings.config) + + private lazy val akkaVersion: String = ConfigUtil.getAkkaVersion(context.system.settings.config) private def stopOrBecome(behavior: Option[Actor.Receive]): Unit = behavior match { @@ -77,7 +73,7 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config) - val adjustedConfig = if (needsAkkaConfig) + val adjustedConfig = if (supportsAkkaConfig) ConfigUtil.addAkkaConfig(configToValidate, akkaVersion) else configToValidate @@ -128,7 +124,19 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon logInitJoinAckReceived(origin) // validates config coming from cluster against this node config - joinConfigCompatChecker.check(configCheck.clusterConfig, context.system.settings.config) match { + val toCheck = if (supportsAkkaConfig) { + if (configCheck.clusterConfig.hasPath("pekko")) { + if (strictAkkaConfig) + ConfigUtil.adaptAkkaToPekkoConfig(configCheck.clusterConfig.withoutPath("pekko")) + else + configCheck.clusterConfig + } else { + ConfigUtil.adaptAkkaToPekkoConfig(configCheck.clusterConfig) + } + } else { + configCheck.clusterConfig + } + joinConfigCompatChecker.check(toCheck, context.system.settings.config) match { case Valid => // first InitJoinAck reply context.parent ! JoinTo(joinTo) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/ConfigUtilSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/ConfigUtilSpec.scala new file mode 100644 index 0000000000..04c1f4b0d4 --- /dev/null +++ b/cluster/src/test/scala/org/apache/pekko/cluster/ConfigUtilSpec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.ConfigFactory + +import org.apache.pekko +import pekko.testkit.PekkoSpec + +class ConfigUtilSpec extends PekkoSpec { + + val pekkoSbrClass = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider" + val akkaSbrClass = "akka.cluster.sbr.SplitBrainResolverProvider" + + "ConfigUtil" must { + "support adaptAkkaToPekkoConfig" in { + val akkaConfig = ConfigFactory.parseString(s""" + akka.cluster.downing-provider-class = "$akkaSbrClass" + akka.cluster.split-brain-resolver.active-strategy = keep-majority + akka.version = "2.6.21" + """) + val pekkoConfig = ConfigUtil.adaptAkkaToPekkoConfig(akkaConfig) + pekkoConfig.getString("pekko.cluster.downing-provider-class") should ===(pekkoSbrClass) + pekkoConfig.getString("pekko.cluster.split-brain-resolver.active-strategy") should ===("keep-majority") + pekkoConfig.getString("pekko.version") should ===("2.6.21") + } + "support adaptPekkoToAkkaConfig" in { + val akkaConfig = ConfigFactory.parseString(s""" + pekko.cluster.downing-provider-class = "$pekkoSbrClass" + pekko.cluster.split-brain-resolver.active-strategy = keep-majority + pekko.version = "1.2.3" + """) + val pekkoConfig = ConfigUtil.adaptPekkoToAkkaConfig(akkaConfig) + pekkoConfig.getString("akka.cluster.downing-provider-class") should ===(akkaSbrClass) + pekkoConfig.getString("akka.cluster.split-brain-resolver.active-strategy") should ===("keep-majority") + pekkoConfig.getString("akka.version") should ===("1.2.3") + } + "support addAkkaConfig" in { + val akkaConfig = ConfigFactory.parseString(s""" + pekko.cluster.downing-provider-class = "$pekkoSbrClass" + pekko.cluster.split-brain-resolver.active-strategy = keep-majority + pekko.version = "1.2.3" + """) + val pekkoConfig = ConfigUtil.addAkkaConfig(akkaConfig, "2.6.21") + pekkoConfig.getString("akka.cluster.downing-provider-class") should ===(akkaSbrClass) + pekkoConfig.getString("akka.cluster.split-brain-resolver.active-strategy") should ===("keep-majority") + pekkoConfig.getString("akka.version") should ===("2.6.21") + } + } +} diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala index dc45d94956..d84c646708 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala @@ -120,7 +120,7 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec { akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" akka.version = "2.6.21" """) - checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)) should ===(Valid) + checkInitJoin(oldConfig, ConfigUtil.adaptAkkaToPekkoConfig(newConfig)) should ===(Valid) } "be invalid when not equivalent downing-provider (akka/pekko mixed cluster)" in { @@ -133,7 +133,7 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec { akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" akka.version = "2.6.21" """) - checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid]) + checkInitJoin(oldConfig, ConfigUtil.adaptAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid]) } } diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index 0ac533b38b..bbf9712651 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -35,7 +35,10 @@ object MixedProtocolClusterSpec { pekko.remote.classic.netty.tcp.port = 0 pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 pekko.remote.accept-protocol-names = ["pekko", "akka"] + pekko.remote.enforce-strict-config-prefix-check-on-join = on + pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider" + pekko.cluster.split-brain-resolver.active-strategy = keep-majority pekko.cluster.jmx.multi-mbeans-in-same-jvm = on""") val configWithUdp: Config = diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 1338d9a8b7..3c598e8085 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -177,7 +177,7 @@ pekko { # When receiving requests from other remote actors, what are the valid # prefixes to check against. Useful for when dealing with rolling cluster - # migrations with compatible systems such as Lightbend's Akka. + # migrations with compatible systems such as Akka. # By default, we only support "pekko" protocol. # If you want to also support Akka, change this config to: # pekko.remote.accept-protocol-names = ["pekko", "akka"] @@ -188,7 +188,7 @@ pekko { # The protocol name to use when sending requests to other remote actors. # Useful when dealing with rolling migration, i.e. temporarily change # the protocol name to match another compatible actor implementation - # such as Lightbend's "akka" (whilst making sure accept-protocol-names + # such as "akka" (whilst making sure accept-protocol-names # contains "akka") so that you can gracefully migrate all nodes to Apache # Pekko and then change the protocol-name back to "pekko" once all # nodes have been are running on Apache Pekko. @@ -196,6 +196,13 @@ pekko { # set to "pekko" or "akka". protocol-name = "pekko" + # When receiving join requests or related acks from other remote nodes, + # should we ensure the config details that are passed in these messages + # are prefixed with the protocol name (i.e. "pekko" or "akka")? + # The default is "off" as the accept-protocol-names check should be sufficient. + # This setting was created to enable edge case tests and users should not change it. + enforce-strict-config-prefix-check-on-join = off + # When pekko.remote.accept-protocol-names contains "akka", then we # need to know the Akka version. If you include the Akka jars on the classpath, # we can use the akka.version from their configuration. This configuration