Allow update from Lightbend SBR in ClusterReceptionistConfigCompatChecker (#29209)
* and don't allow different strategies
This commit is contained in:
parent
14f6befd3e
commit
b80cd745eb
3 changed files with 171 additions and 20 deletions
|
|
@ -9,15 +9,44 @@ import scala.collection.{ immutable => im }
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
import akka.cluster.sbr.SplitBrainResolverProvider
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] object JoinConfigCompatCheckCluster {
|
||||||
|
private val DowningProviderPath = "akka.cluster.downing-provider-class"
|
||||||
|
private val SbrStrategyPath = "akka.cluster.split-brain-resolver.active-strategy"
|
||||||
|
|
||||||
|
private val AkkaSbrProviderClass = classOf[SplitBrainResolverProvider].getName
|
||||||
|
private val LightbendSbrProviderClass = "com.lightbend.akka.sbr.SplitBrainResolverProvider"
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
final class JoinConfigCompatCheckCluster extends JoinConfigCompatChecker {
|
final class JoinConfigCompatCheckCluster extends JoinConfigCompatChecker {
|
||||||
|
import JoinConfigCompatCheckCluster._
|
||||||
|
|
||||||
override def requiredKeys = im.Seq("akka.cluster.downing-provider-class")
|
override def requiredKeys: im.Seq[String] = List(DowningProviderPath, SbrStrategyPath)
|
||||||
|
|
||||||
override def check(toCheck: Config, actualConfig: Config): ConfigValidation =
|
override def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
|
||||||
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
|
val toCheckDowningProvider = toCheck.getString(DowningProviderPath)
|
||||||
|
val actualDowningProvider = actualConfig.getString(DowningProviderPath)
|
||||||
|
val downingProviderResult =
|
||||||
|
if (toCheckDowningProvider == actualDowningProvider || Set(toCheckDowningProvider, actualDowningProvider) == Set(
|
||||||
|
AkkaSbrProviderClass,
|
||||||
|
LightbendSbrProviderClass))
|
||||||
|
Valid
|
||||||
|
else
|
||||||
|
JoinConfigCompatChecker.checkEquality(List(DowningProviderPath), toCheck, actualConfig)
|
||||||
|
|
||||||
|
val sbrStrategyResult =
|
||||||
|
if (toCheck.hasPath(SbrStrategyPath) && actualConfig.hasPath(SbrStrategyPath))
|
||||||
|
JoinConfigCompatChecker.checkEquality(List(SbrStrategyPath), toCheck, actualConfig)
|
||||||
|
else Valid
|
||||||
|
|
||||||
|
downingProviderResult ++ sbrStrategyResult
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,27 +62,32 @@ object JoinConfigCompatChecker {
|
||||||
* @param actualConfig - the Config instance containing the expected values
|
* @param actualConfig - the Config instance containing the expected values
|
||||||
*/
|
*/
|
||||||
def fullMatch(requiredKeys: im.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation = {
|
def fullMatch(requiredKeys: im.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation = {
|
||||||
|
exists(requiredKeys, toCheck) ++ checkEquality(requiredKeys, toCheck, actualConfig)
|
||||||
|
}
|
||||||
|
|
||||||
def checkEquality = {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] def checkEquality(
|
||||||
|
keys: im.Seq[String],
|
||||||
|
toCheck: Config,
|
||||||
|
actualConfig: Config): ConfigValidation = {
|
||||||
|
|
||||||
def checkCompat(entry: util.Map.Entry[String, ConfigValue]) = {
|
def checkCompat(entry: util.Map.Entry[String, ConfigValue]) = {
|
||||||
val key = entry.getKey
|
val key = entry.getKey
|
||||||
actualConfig.hasPath(key) && actualConfig.getValue(key) == entry.getValue
|
actualConfig.hasPath(key) && actualConfig.getValue(key) == entry.getValue
|
||||||
}
|
|
||||||
|
|
||||||
// retrieve all incompatible keys
|
|
||||||
// NOTE: we only check the key if effectively required
|
|
||||||
// because config may contain more keys than required for this checker
|
|
||||||
val incompatibleKeys =
|
|
||||||
toCheck.entrySet().asScala.collect {
|
|
||||||
case entry if requiredKeys.contains(entry.getKey) && !checkCompat(entry) => s"${entry.getKey} is incompatible"
|
|
||||||
}
|
|
||||||
|
|
||||||
if (incompatibleKeys.isEmpty) Valid
|
|
||||||
else Invalid(incompatibleKeys.to(im.Seq))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exists(requiredKeys, toCheck) ++ checkEquality
|
// retrieve all incompatible keys
|
||||||
|
// NOTE: we only check the key if effectively required
|
||||||
|
// because config may contain more keys than required for this checker
|
||||||
|
val incompatibleKeys =
|
||||||
|
toCheck.entrySet().asScala.collect {
|
||||||
|
case entry if keys.contains(entry.getKey) && !checkCompat(entry) => s"${entry.getKey} is incompatible"
|
||||||
|
}
|
||||||
|
|
||||||
|
if (incompatibleKeys.isEmpty) Valid
|
||||||
|
else Invalid(incompatibleKeys.to(im.Seq))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
import akka.actor.ExtendedActorSystem
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
class JoinConfigCompatCheckClusterSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
private val extSystem = system.asInstanceOf[ExtendedActorSystem]
|
||||||
|
private val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
||||||
|
private val joinConfigCompatChecker: JoinConfigCompatChecker =
|
||||||
|
JoinConfigCompatChecker.load(extSystem, clusterSettings)
|
||||||
|
|
||||||
|
// Corresponding to the check of InitJoin
|
||||||
|
def checkInitJoin(oldConfig: Config, newConfig: Config): ConfigValidation = {
|
||||||
|
// joiningNodeConfig only contains the keys that are required according to JoinConfigCompatChecker on this node
|
||||||
|
val joiningNodeConfig = {
|
||||||
|
val requiredNonSensitiveKeys =
|
||||||
|
JoinConfigCompatChecker.removeSensitiveKeys(joinConfigCompatChecker.requiredKeys, clusterSettings)
|
||||||
|
JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, newConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
val configWithoutSensitiveKeys = {
|
||||||
|
val allowedConfigPaths =
|
||||||
|
JoinConfigCompatChecker.removeSensitiveKeys(oldConfig, clusterSettings)
|
||||||
|
// build a stripped down config instead where sensitive config paths are removed
|
||||||
|
// we don't want any check to happen on those keys
|
||||||
|
JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, oldConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Corresponding to the check of InitJoinAck in SeedNodeProcess
|
||||||
|
def checkInitJoinAck(oldConfig: Config, newConfig: Config): ConfigValidation = {
|
||||||
|
// validates config coming from cluster against this node config
|
||||||
|
val configCheckReply = {
|
||||||
|
val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(newConfig, clusterSettings)
|
||||||
|
// Send back to joining node a subset of current configuration
|
||||||
|
// containing the keys initially sent by the joining node minus
|
||||||
|
// any sensitive keys as defined by this node configuration
|
||||||
|
JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, oldConfig)
|
||||||
|
}
|
||||||
|
joinConfigCompatChecker.check(configCheckReply, newConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
"JoinConfigCompatCheckCluster" must {
|
||||||
|
"be valid when no downing-provider" in {
|
||||||
|
val oldConfig = ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = ""
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
val newConfig = ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = ""
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
checkInitJoin(oldConfig, newConfig) should ===(Valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be valid when same downing-provider" in {
|
||||||
|
val oldConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
val newConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
checkInitJoin(oldConfig, newConfig) should ===(Valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be valid when updating from Lightbend sbr" in {
|
||||||
|
val oldConfig =
|
||||||
|
ConfigFactory
|
||||||
|
.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "com.lightbend.akka.sbr.SplitBrainResolverProvider"
|
||||||
|
""")
|
||||||
|
.withFallback(system.settings.config)
|
||||||
|
val newConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
checkInitJoin(oldConfig, newConfig) should ===(Valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be invalid when different downing-provider" in {
|
||||||
|
val oldConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.testkit.AutoDowning"
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
val newConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid])
|
||||||
|
}
|
||||||
|
|
||||||
|
"be invalid when different sbr strategy" in {
|
||||||
|
val oldConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
akka.cluster.split-brain-resolver.active-strategy = keep-majority
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
val newConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
|
||||||
|
akka.cluster.split-brain-resolver.active-strategy = keep-oldest
|
||||||
|
""").withFallback(system.settings.config)
|
||||||
|
checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid])
|
||||||
|
checkInitJoinAck(oldConfig, newConfig).getClass should ===(classOf[Invalid])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue