From 9227912ea666f277d7ebc9dfedb53e6b2e768fa4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 27 May 2025 19:49:57 +0100 Subject: [PATCH] Join cluster check adjusted to support akka (#1866) * update cluster initjoin check to support akka * add test * extra test --- .../apache/pekko/cluster/ClusterDaemon.scala | 10 ++++- .../org/apache/pekko/cluster/ConfigUtil.scala | 45 +++++++++++++++++-- .../pekko/cluster/SeedNodeProcess.scala | 7 +-- .../JoinConfigCompatCheckClusterSpec.scala | 27 +++++++++++ 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 746c1ceace..b289f370ca 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -607,7 +607,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh case other => super.unhandled(other) } - def initJoin(joiningNodeConfig: Config): Unit = { + private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig( + context.system.settings.config) + + def initJoin(inputConfig: Config): Unit = { + val joiningNodeConfig = if (supportsAkkaConfig && !inputConfig.hasPath("pekko")) { + ConfigUtil.changeAkkaToPekkoConfig(inputConfig) + } else { + inputConfig + } val joiningNodeVersion = if (joiningNodeConfig.hasPath("pekko.version")) joiningNodeConfig.getString("pekko.version") else "unknown" diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala index 96a34a7d67..62fdd0e325 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala @@ -23,13 +23,16 @@ import scala.annotation.nowarn private[cluster] object ConfigUtil { + private val PekkoPrefix = "org.apache.pekko" + private val AkkaPrefix = "akka" + @nowarn("msg=deprecated") def addAkkaConfig(cfg: Config, akkaVersion: String): Config = { import org.apache.pekko.util.ccompat.JavaConverters._ val innerSet = cfg.entrySet().asScala .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT) .map { entry => - entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue) + entry.getKey.replace("pekko", "akka") -> adjustPackageNameToAkkaIfNecessary(entry.getValue) } var newConfig = cfg innerSet.foreach { case (key, value) => @@ -38,11 +41,45 @@ private[cluster] object ConfigUtil { newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) } - private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = { + @nowarn("msg=deprecated") + def changeAkkaToPekkoConfig(cfg: Config): Config = { + import org.apache.pekko.util.ccompat.JavaConverters._ + val innerSet = cfg.entrySet().asScala + .filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() != ConfigValueType.OBJECT) + .map { entry => + entry.getKey.replace("akka", "pekko") -> adjustPackageNameToPekkoIfNecessary(entry.getValue) + } + var newConfig = cfg + innerSet.foreach { case (key, value) => + newConfig = newConfig.withValue(key, value) + } + newConfig + } + + def supportsAkkaConfig(cfg: Config): Boolean = { + cfg + .getStringList("pekko.remote.accept-protocol-names") + .contains("akka") + } + + private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue = { if (cv.valueType() == ConfigValueType.STRING) { val str = cv.unwrapped().toString - if (str.startsWith("org.apache.pekko")) { - ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka")) + if (str.startsWith(PekkoPrefix)) { + ConfigValueFactory.fromAnyRef(str.replace(PekkoPrefix, AkkaPrefix)) + } else { + cv + } + } else { + cv + } + } + + private def adjustPackageNameToPekkoIfNecessary(cv: ConfigValue): ConfigValue = { + if (cv.valueType() == ConfigValueType.STRING) { + val str = cv.unwrapped().toString + if (str.startsWith(AkkaPrefix)) { + ConfigValueFactory.fromAnyRef(str.replace(AkkaPrefix, PekkoPrefix)) } else { cv } diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala index 67a60b8189..985875dba2 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala @@ -47,11 +47,8 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon "Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " + "This node will be shutdown!" - private lazy val needsAkkaConfig: Boolean = { - context.system.settings.config - .getStringList("pekko.remote.accept-protocol-names") - .contains("akka") - } + private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig( + context.system.settings.config) private lazy val akkaVersion: String = { val cfg = context.system.settings.config diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala index 76962b1492..dc45d94956 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala @@ -109,5 +109,32 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec { checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid]) checkInitJoinAck(oldConfig, newConfig).getClass should ===(classOf[Invalid]) } + + "be valid when equivalent downing-provider (akka/pekko mixed cluster)" in { + val oldConfig = + ConfigFactory.parseString(""" + pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider" + """).withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + akka.version = "2.6.21" + """) + checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)) should ===(Valid) + } + + "be invalid when not equivalent downing-provider (akka/pekko mixed cluster)" in { + val oldConfig = + ConfigFactory.parseString(""" + pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.testkit.AutoDowning" + """).withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + akka.version = "2.6.21" + """) + checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should ===(classOf[Invalid]) + } + } }