From b80cd745eba86cb344c33e447f788a282d09ec1e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Jun 2020 12:47:16 +0200 Subject: [PATCH] Allow update from Lightbend SBR in ClusterReceptionistConfigCompatChecker (#29209) * and don't allow different strategies --- .../JoinConfigCompatCheckCluster.scala | 35 +++++- .../cluster/JoinConfigCompatChecker.scala | 39 +++--- .../JoinConfigCompatCheckClusterSpec.scala | 117 ++++++++++++++++++ 3 files changed, 171 insertions(+), 20 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckClusterSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatCheckCluster.scala b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatCheckCluster.scala index f7f79c75f9..f9f7bda409 100644 --- a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatCheckCluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatCheckCluster.scala @@ -9,15 +9,44 @@ import scala.collection.{ immutable => im } import com.typesafe.config.Config import akka.annotation.InternalApi +import akka.cluster.sbr.SplitBrainResolverProvider + +/** + * INTERNAL API + */ +@InternalApi private[akka] object JoinConfigCompatCheckCluster { + private val DowningProviderPath = "akka.cluster.downing-provider-class" + private val SbrStrategyPath = "akka.cluster.split-brain-resolver.active-strategy" + + private val AkkaSbrProviderClass = classOf[SplitBrainResolverProvider].getName + private val LightbendSbrProviderClass = "com.lightbend.akka.sbr.SplitBrainResolverProvider" +} /** * INTERNAL API */ @InternalApi final class JoinConfigCompatCheckCluster extends JoinConfigCompatChecker { + import JoinConfigCompatCheckCluster._ - override def requiredKeys = im.Seq("akka.cluster.downing-provider-class") + override def requiredKeys: im.Seq[String] = List(DowningProviderPath, SbrStrategyPath) - override def check(toCheck: Config, actualConfig: Config): ConfigValidation = - JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig) + override def check(toCheck: Config, actualConfig: Config): ConfigValidation = { + val toCheckDowningProvider = toCheck.getString(DowningProviderPath) + val actualDowningProvider = actualConfig.getString(DowningProviderPath) + val downingProviderResult = + if (toCheckDowningProvider == actualDowningProvider || Set(toCheckDowningProvider, actualDowningProvider) == Set( + AkkaSbrProviderClass, + LightbendSbrProviderClass)) + Valid + else + JoinConfigCompatChecker.checkEquality(List(DowningProviderPath), toCheck, actualConfig) + + val sbrStrategyResult = + if (toCheck.hasPath(SbrStrategyPath) && actualConfig.hasPath(SbrStrategyPath)) + JoinConfigCompatChecker.checkEquality(List(SbrStrategyPath), toCheck, actualConfig) + else Valid + + downingProviderResult ++ sbrStrategyResult + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala index bf8f6090c0..712814dcb6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala +++ b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala @@ -62,27 +62,32 @@ object JoinConfigCompatChecker { * @param actualConfig - the Config instance containing the expected values */ def fullMatch(requiredKeys: im.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation = { + exists(requiredKeys, toCheck) ++ checkEquality(requiredKeys, toCheck, actualConfig) + } - def checkEquality = { + /** + * INTERNAL API + */ + @InternalApi private[akka] def checkEquality( + keys: im.Seq[String], + toCheck: Config, + actualConfig: Config): ConfigValidation = { - def checkCompat(entry: util.Map.Entry[String, ConfigValue]) = { - val key = entry.getKey - actualConfig.hasPath(key) && actualConfig.getValue(key) == entry.getValue - } - - // retrieve all incompatible keys - // NOTE: we only check the key if effectively required - // because config may contain more keys than required for this checker - val incompatibleKeys = - toCheck.entrySet().asScala.collect { - case entry if requiredKeys.contains(entry.getKey) && !checkCompat(entry) => s"${entry.getKey} is incompatible" - } - - if (incompatibleKeys.isEmpty) Valid - else Invalid(incompatibleKeys.to(im.Seq)) + def checkCompat(entry: util.Map.Entry[String, ConfigValue]) = { + val key = entry.getKey + actualConfig.hasPath(key) && actualConfig.getValue(key) == entry.getValue } - exists(requiredKeys, toCheck) ++ checkEquality + // retrieve all incompatible keys + // NOTE: we only check the key if effectively required + // because config may contain more keys than required for this checker + val incompatibleKeys = + toCheck.entrySet().asScala.collect { + case entry if keys.contains(entry.getKey) && !checkCompat(entry) => s"${entry.getKey} is incompatible" + } + + if (incompatibleKeys.isEmpty) Valid + else Invalid(incompatibleKeys.to(im.Seq)) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckClusterSpec.scala new file mode 100644 index 0000000000..8fc4c779b8 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckClusterSpec.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import akka.actor.ExtendedActorSystem +import akka.testkit.AkkaSpec + +class JoinConfigCompatCheckClusterSpec extends AkkaSpec { + + private val extSystem = system.asInstanceOf[ExtendedActorSystem] + private val clusterSettings = new ClusterSettings(system.settings.config, system.name) + private val joinConfigCompatChecker: JoinConfigCompatChecker = + JoinConfigCompatChecker.load(extSystem, clusterSettings) + + // Corresponding to the check of InitJoin + def checkInitJoin(oldConfig: Config, newConfig: Config): ConfigValidation = { + // joiningNodeConfig only contains the keys that are required according to JoinConfigCompatChecker on this node + val joiningNodeConfig = { + val requiredNonSensitiveKeys = + JoinConfigCompatChecker.removeSensitiveKeys(joinConfigCompatChecker.requiredKeys, clusterSettings) + JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, newConfig) + } + + val configWithoutSensitiveKeys = { + val allowedConfigPaths = + JoinConfigCompatChecker.removeSensitiveKeys(oldConfig, clusterSettings) + // build a stripped down config instead where sensitive config paths are removed + // we don't want any check to happen on those keys + JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, oldConfig) + } + + joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) + } + + // Corresponding to the check of InitJoinAck in SeedNodeProcess + def checkInitJoinAck(oldConfig: Config, newConfig: Config): ConfigValidation = { + // validates config coming from cluster against this node config + val configCheckReply = { + val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(newConfig, clusterSettings) + // Send back to joining node a subset of current configuration + // containing the keys initially sent by the joining node minus + // any sensitive keys as defined by this node configuration + JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, oldConfig) + } + joinConfigCompatChecker.check(configCheckReply, newConfig) + } + + "JoinConfigCompatCheckCluster" must { + "be valid when no downing-provider" in { + val oldConfig = ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "" + """).withFallback(system.settings.config) + val newConfig = ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "" + """).withFallback(system.settings.config) + checkInitJoin(oldConfig, newConfig) should ===(Valid) + } + + "be valid when same downing-provider" in { + val oldConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + """).withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + """).withFallback(system.settings.config) + checkInitJoin(oldConfig, newConfig) should ===(Valid) + } + + "be valid when updating from Lightbend sbr" in { + val oldConfig = + ConfigFactory + .parseString(""" + akka.cluster.downing-provider-class = "com.lightbend.akka.sbr.SplitBrainResolverProvider" + """) + .withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + """).withFallback(system.settings.config) + checkInitJoin(oldConfig, newConfig) should ===(Valid) + } + + "be invalid when different downing-provider" in { + val oldConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.testkit.AutoDowning" + """).withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + """).withFallback(system.settings.config) + checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid]) + } + + "be invalid when different sbr strategy" in { + val oldConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + akka.cluster.split-brain-resolver.active-strategy = keep-majority + """).withFallback(system.settings.config) + val newConfig = + ConfigFactory.parseString(""" + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + akka.cluster.split-brain-resolver.active-strategy = keep-oldest + """).withFallback(system.settings.config) + checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid]) + checkInitJoinAck(oldConfig, newConfig).getClass should ===(classOf[Invalid]) + } + } +}