Rolling update config checker, #24009

* adds config compatibility check
* doc'ed what happens when joining a cluster not supporting this feature
* added extra docs over sensitive paths
This commit is contained in:
Renato Cavalcanti 2018-02-20 15:47:09 +01:00 committed by Patrik Nordwall
parent a4e9881a6f
commit c83e4adfea
19 changed files with 4832 additions and 123 deletions

View file

@ -151,6 +151,13 @@ akka.cluster.sharding {
} }
# //#sharding-ext-config # //#sharding-ext-config
akka.cluster {
configuration-compatibility-check {
checkers {
akka-cluster-sharding = "akka.cluster.sharding.JoinConfigCompatCheckSharding"
}
}
}
# Protobuf serializer for Cluster Sharding messages # Protobuf serializer for Cluster Sharding messages
akka.actor { akka.actor {

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.annotation.InternalApi
import akka.cluster.{ ConfigValidation, JoinConfigCompatChecker }
import com.typesafe.config.Config
import scala.collection.{ immutable im }
/**
* INTERNAL API
*/
@InternalApi
final class JoinConfigCompatCheckSharding extends JoinConfigCompatChecker {
override def requiredKeys = im.Seq("akka.cluster.sharding.state-store-mode")
override def check(toCheck: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
}

View file

@ -0,0 +1,76 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.ActorSystem
import akka.cluster.{ Cluster, ClusterReadView }
import akka.testkit.{ AkkaSpec, LongRunningTest }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import scala.collection.{ immutable im }
class JoinConfigCompatCheckShardingSpec extends AkkaSpec() {
def initCluster(system: ActorSystem): ClusterReadView = {
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
val clusterView = cluster.readView
awaitCond(clusterView.isSingletonCluster)
clusterView
}
val baseConfig: Config =
ConfigFactory.parseString(
"""
akka.actor.provider = "cluster"
akka.coordinated-shutdown.terminate-actor-system = on
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""
)
"A Joining Node" must {
/** This test verifies the built-in JoinConfigCompatCheckerSharding */
"NOT be allowed to join a cluster using a different value for akka.cluster.sharding.state-store-mode" taggedAs LongRunningTest in {
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# use 'persistence' for state store
sharding.state-store-mode = "persistence"
configuration-compatibility-check {
enforce-on-join = on
}
}
"""
)
val seedNode = ActorSystem(system.name, baseConfig)
val joiningNode = ActorSystem(system.name, joinNodeConfig.withFallback(baseConfig))
val clusterView = initCluster(seedNode)
val joiningNodeCluster = Cluster(joiningNode)
try {
// join with compatible node
joiningNodeCluster.joinSeedNodes(im.Seq(clusterView.selfAddress))
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(joiningNodeCluster.readView.isTerminated)
}
} finally {
shutdown(seedNode)
shutdown(joiningNode)
}
}
}
}

View file

@ -0,0 +1,19 @@
# #24009 Rolling update config checker
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.JoinSeedNodeProcess.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.FirstSeedNodeProcess.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.initJoin")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.InternalClusterAction$InitJoin$")
ProblemFilters.exclude[MissingFieldProblem]("akka.cluster.InternalClusterAction#InitJoin.serialVersionUID")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.productElement")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.productArity")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.canEqual")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.productIterator")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.productPrefix")
ProblemFilters.exclude[FinalMethodProblem]("akka.cluster.InternalClusterAction#InitJoin.toString")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.InternalClusterAction$InitJoinAck$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoinAck.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoinAck.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.InternalClusterAction#InitJoinAck.apply")

View file

@ -41,13 +41,50 @@ message Welcome {
/** /**
* InitJoin * InitJoin
* Sends Empty
*/ */
message InitJoin {
optional string currentConfig = 1;
}
/** /**
* InitJoinAck * InitJoinAck
* Sends an Address
*/ */
message InitJoinAck {
required Address address = 1;
optional ConfigCheck configCheck = 2;
}
message ConfigCheck {
extensions 100 to 102;
enum Type {
UncheckedConfig = 1;
IncompatibleConfig = 2;
CompatibleConfig = 3;
}
required Type type = 1;
optional string clusterConfig = 2;
}
message UncheckedConfig {
extend ConfigCheck {
required UncheckedConfig checkConfig = 100;
}
}
message IncompatibleConfig {
extend ConfigCheck {
required IncompatibleConfig checkConfig = 101;
}
}
message CompatibleConfig {
extend ConfigCheck {
required CompatibleConfig checkConfig = 102;
}
required string clusterConfig = 2;
}
/** /**
* InitJoinNack * InitJoinNack
@ -186,7 +223,7 @@ message VectorClock {
/** /**
* An empty message * An empty message
*/ */
message Empty { message Empty {
} }
/** /**
@ -213,18 +250,18 @@ message UniqueAddress {
* Cluster routing * Cluster routing
****************************************/ ****************************************/
message ClusterRouterPool { message ClusterRouterPool {
required Pool pool = 1; required Pool pool = 1;
required ClusterRouterPoolSettings settings = 2; required ClusterRouterPoolSettings settings = 2;
} }
message Pool { message Pool {
required uint32 serializerId = 1; required uint32 serializerId = 1;
required string manifest = 2; required string manifest = 2;
required bytes data = 3; required bytes data = 3;
} }
message ClusterRouterPoolSettings { message ClusterRouterPoolSettings {
required uint32 totalInstances = 1; required uint32 totalInstances = 1;
required uint32 maxInstancesPerNode = 2; required uint32 maxInstancesPerNode = 2;
required bool allowLocalRoutees = 3; required bool allowLocalRoutees = 3;

View file

@ -280,6 +280,40 @@ akka {
verbose-gossip-logging = off verbose-gossip-logging = off
} }
configuration-compatibility-check {
# Enforce configuration compatibility checks when joining a cluster.
# Set to off to allow joining nodes to join a cluster even when configuration incompatibilities are detected or
# when the cluster does not support this feature. Compatibility checks are always performed and warning and
# error messsages are logged.
#
# This is particularly useful for rolling updates on clusters that do not support that feature. Since the old
# cluster won't be able to send the compatibility confirmation to the joining node, the joining node won't be able
# to 'know' if its allowed to join.
enforce-on-join = on
checkers {
akka-cluster = "akka.cluster.JoinConfigCompatCheckCluster"
}
# Some configuration properties might not be appropriate to transfer between nodes
# and such properties can be excluded from the configuration compatibility check by adding
# the paths of the properties to this list. Sensitive paths are grouped by key. Modules and third-party libraries
# can define their own set of sensitive paths without clashing with each other (as long they use unique keys).
#
# All properties starting with the paths defined here are excluded, i.e. you can add the path of a whole
# section here to skip everything inside that section.
sensitive-config-paths {
akka = [
"user.home", "user.name", "user.dir",
"socksNonProxyHosts", "http.nonProxyHosts", "ftp.nonProxyHosts",
"akka.remote.secure-cookie",
"akka.remote.netty.ssl.security",
"akka.remote.artery.ssl"
]
}
}
} }
actor.deployment.default.cluster { actor.deployment.default.cluster {

View file

@ -62,6 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
import InfoLogger._ import InfoLogger._
import settings._ import settings._
private val joinConfigCompatChecker: JoinConfigCompatChecker = JoinConfigCompatChecker.load(system, settings)
/** /**
* The address including a `uid` of this cluster member. * The address including a `uid` of this cluster member.
* The `uid` is needed to be able to distinguish different * The `uid` is needed to be able to distinguish different
@ -167,7 +168,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
// create supervisor for daemons under path "/system/cluster" // create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = { private val clusterDaemons: ActorRef = {
system.systemActorOf(Props(classOf[ClusterDaemon], settings). system.systemActorOf(Props(classOf[ClusterDaemon], settings, joinConfigCompatChecker).
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster") withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
} }

View file

@ -10,9 +10,12 @@ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.Done import akka.Done
import akka.actor.CoordinatedShutdown.Reason
import akka.cluster.ClusterUserAction.JoinTo
import akka.pattern.ask import akka.pattern.ask
import akka.remote.QuarantinedEvent import akka.remote.QuarantinedEvent
import akka.util.Timeout import akka.util.Timeout
import com.typesafe.config.{ Config, ConfigFactory }
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -32,6 +35,7 @@ trait ClusterMessage extends Serializable
* [[akka.cluster.Cluster]] extension * [[akka.cluster.Cluster]] extension
* or JMX. * or JMX.
*/ */
@InternalApi
private[cluster] object ClusterUserAction { private[cluster] object ClusterUserAction {
/** /**
@ -58,6 +62,7 @@ private[cluster] object ClusterUserAction {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[cluster] object InternalClusterAction { private[cluster] object InternalClusterAction {
/** /**
@ -92,17 +97,22 @@ private[cluster] object InternalClusterAction {
*/ */
case object JoinSeedNode extends DeadLetterSuppression case object JoinSeedNode extends DeadLetterSuppression
/** sealed trait ConfigCheck
* see JoinSeedNode case object UncheckedConfig extends ConfigCheck
*/ case object IncompatibleConfig extends ConfigCheck
@SerialVersionUID(1L) final case class CompatibleConfig(clusterConfig: Config) extends ConfigCheck
case object InitJoin extends ClusterMessage with DeadLetterSuppression
/** /**
* see JoinSeedNode * see JoinSeedNode
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class InitJoinAck(address: Address) extends ClusterMessage with DeadLetterSuppression case class InitJoin(configOfJoiningNode: Config) extends ClusterMessage with DeadLetterSuppression
/**
* see JoinSeedNode
*/
@SerialVersionUID(1L)
final case class InitJoinAck(address: Address, configCheck: ConfigCheck) extends ClusterMessage with DeadLetterSuppression
/** /**
* see JoinSeedNode * see JoinSeedNode
@ -162,7 +172,8 @@ private[cluster] object InternalClusterAction {
* *
* Supervisor managing the different Cluster daemons. * Supervisor managing the different Cluster daemons.
*/ */
private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging @InternalApi
private[cluster] final class ClusterDaemon(settings: ClusterSettings, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._ import InternalClusterAction._
// Important - don't use Cluster(context.system) in constructor because that would // Important - don't use Cluster(context.system) in constructor because that would
@ -196,7 +207,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
} }
def createChildren(): Unit = { def createChildren(): Unit = {
coreSupervisor = Some(context.actorOf(Props[ClusterCoreSupervisor]. coreSupervisor = Some(context.actorOf(Props(classOf[ClusterCoreSupervisor], joinConfigCompatChecker).
withDispatcher(context.props.dispatcher), name = "core")) withDispatcher(context.props.dispatcher), name = "core"))
context.actorOf(Props[ClusterHeartbeatReceiver]. context.actorOf(Props[ClusterHeartbeatReceiver].
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
@ -225,7 +236,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
* ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state * ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state
* would be obsolete. Shutdown the member if any those actors crashed. * would be obsolete. Shutdown the member if any those actors crashed.
*/ */
private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging @InternalApi
private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
// Important - don't use Cluster(context.system) in constructor because that would // Important - don't use Cluster(context.system) in constructor because that would
@ -238,7 +250,7 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
def createChildren(): Unit = { def createChildren(): Unit = {
val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
withDispatcher(context.props.dispatcher), name = "publisher") withDispatcher(context.props.dispatcher), name = "publisher")
coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher, joinConfigCompatChecker).
withDispatcher(context.props.dispatcher), name = "daemon"))) withDispatcher(context.props.dispatcher), name = "daemon")))
} }
@ -274,7 +286,7 @@ private[cluster] object ClusterCoreDaemon {
* INTERNAL API. * INTERNAL API.
*/ */
@InternalApi @InternalApi
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._ import InternalClusterAction._
import ClusterCoreDaemon._ import ClusterCoreDaemon._
@ -475,9 +487,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case ReapUnreachableTick reapUnreachableMembers() case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions() case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats() case PublishStatsTick publishInternalStats()
case InitJoin case InitJoin(joiningNodeConfig)
logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress)
initJoin() initJoin(joiningNodeConfig)
case Join(node, roles) joining(node, roles) case Join(node, roles) joining(node, roles)
case ClusterUserAction.Down(address) downing(address) case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address) case ClusterUserAction.Leave(address) leaving(address)
@ -509,7 +521,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case other super.unhandled(other) case other super.unhandled(other)
} }
def initJoin(): Unit = { def initJoin(joiningNodeConfig: Config): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status val selfStatus = latestGossip.member(selfUniqueAddress).status
if (removeUnreachableWithMemberStatus.contains(selfStatus)) { if (removeUnreachableWithMemberStatus.contains(selfStatus)) {
// prevents a Down and Exiting node from being used for joining // prevents a Down and Exiting node from being used for joining
@ -517,7 +529,30 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
sender() ! InitJoinNack(selfAddress) sender() ! InitJoinNack(selfAddress)
} else { } else {
logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender()) logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender())
sender() ! InitJoinAck(selfAddress) // run config compatibility check using config provided by
// joining node and current (full) config on cluster side
val configWithoutSensitiveKeys = {
val allowedConfigPaths = JoinConfigCompatChecker.removeSensitiveKeys(context.system.settings.config, cluster.settings)
// build a stripped down config instead where sensitive config paths are removed
// we don't want any check to happen on those keys
JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, context.system.settings.config)
}
joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match {
case Valid
val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings)
// Send back to joining node a subset of current configuration
// containing the keys initially sent by the joining node minus
// any sensitive keys as defined by this node configuration
val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config)
sender() ! InitJoinAck(selfAddress, CompatibleConfig(clusterConfig))
case Invalid(messages)
// messages are only logged on the cluster side
log.warning("Found incompatible settings when [{}] tried to join: {}", sender().path.address, messages.mkString(", "))
sender() ! InitJoinAck(selfAddress, IncompatibleConfig)
}
} }
} }
@ -533,10 +568,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// use unique name of this actor, stopSeedNodeProcess doesn't wait for termination // use unique name of this actor, stopSeedNodeProcess doesn't wait for termination
seedNodeProcessCounter += 1 seedNodeProcessCounter += 1
if (newSeedNodes.head == selfAddress) { if (newSeedNodes.head == selfAddress) {
Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], newSeedNodes). Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], newSeedNodes, joinConfigCompatChecker).
withDispatcher(UseDispatcher), name = "firstSeedNodeProcess-" + seedNodeProcessCounter)) withDispatcher(UseDispatcher), name = "firstSeedNodeProcess-" + seedNodeProcessCounter))
} else { } else {
Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], newSeedNodes). Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], newSeedNodes, joinConfigCompatChecker).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess-" + seedNodeProcessCounter)) withDispatcher(UseDispatcher), name = "joinSeedNodeProcess-" + seedNodeProcessCounter))
} }
} }
@ -949,7 +984,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.isMultiDc) if (latestGossip.isMultiDc)
latestGossip.overview.seen.count(membershipState.isInSameDc) < latestGossip.members.count(_.dataCenter == cluster.selfDataCenter) / 2 latestGossip.overview.seen.count(membershipState.isInSameDc) < latestGossip.members.count(_.dataCenter == cluster.selfDataCenter) / 2
else else
(latestGossip.overview.seen.size < latestGossip.members.size / 2) latestGossip.overview.seen.size < latestGossip.members.size / 2
} }
/** /**
@ -1274,7 +1309,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
versionedGossip.clearSeen() versionedGossip.clearSeen()
else { else {
// Nobody else has seen this gossip but us // Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) val seenVersionedGossip = versionedGossip onlySeen selfUniqueAddress
// Update the state with the new gossip // Update the state with the new gossip
seenVersionedGossip seenVersionedGossip
} }
@ -1284,7 +1319,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def assertLatestGossip(): Unit = def assertLatestGossip(): Unit =
if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size) if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size)
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}") throw new IllegalStateException(s"Too many vector clock entries in gossip state $latestGossip")
def publishMembershipState(): Unit = { def publishMembershipState(): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging) if (cluster.settings.Debug.VerboseGossipLogging)
@ -1303,6 +1338,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} }
/**
* INTERNAL API.
*/
private[cluster] case object IncompatibleConfigurationDetected extends Reason
/** /**
* INTERNAL API. * INTERNAL API.
* *
@ -1315,11 +1355,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* it will reply with InitJoinAck and then the first seed node will join * it will reply with InitJoinAck and then the first seed node will join
* that other seed node to join existing cluster. * that other seed node to join existing cluster.
*/ */
private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging { @InternalApi
private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging {
import InternalClusterAction._ import InternalClusterAction._
import ClusterUserAction.JoinTo import ClusterUserAction.JoinTo
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.settings._
import cluster.InfoLogger._ import cluster.InfoLogger._
def selfAddress = cluster.selfAddress def selfAddress = cluster.selfAddress
@ -1341,8 +1383,11 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
def receive = { def receive = {
case JoinSeedNode case JoinSeedNode
if (timeout.hasTimeLeft) { if (timeout.hasTimeLeft) {
val requiredNonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joinConfigCompatChecker.requiredKeys, cluster.settings)
// configToValidate only contains the keys that are required according to JoinConfigCompatChecker on this node
val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)
// send InitJoin to remaining seed nodes (except myself) // send InitJoin to remaining seed nodes (except myself)
remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin } remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) }
} else { } else {
// no InitJoinAck received, initialize new cluster by joining myself // no InitJoinAck received, initialize new cluster by joining myself
if (log.isDebugEnabled) if (log.isDebugEnabled)
@ -1352,11 +1397,60 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
context.parent ! JoinTo(selfAddress) context.parent ! JoinTo(selfAddress)
context.stop(self) context.stop(self)
} }
case InitJoinAck(address)
case InitJoinAck(address, CompatibleConfig(clusterConfig))
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
// first InitJoinAck reply, join existing cluster // validates config coming from cluster against this node config
joinConfigCompatChecker.check(clusterConfig, context.system.settings.config) match {
case Valid
// first InitJoinAck reply
context.parent ! JoinTo(address)
context.stop(self)
case Invalid(messages) if ByPassConfigCompatCheck
log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " +
"Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", "))
context.parent ! JoinTo(address)
context.stop(self)
case Invalid(messages)
log.error("Cluster validated this node config, but sent back incompatible settings: {}. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version. " +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!", messages.mkString(", "))
context.stop(self)
CoordinatedShutdown(context.system).run(IncompatibleConfigurationDetected)
}
case InitJoinAck(address, UncheckedConfig)
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining a cluster without configuration compatibility check feature.")
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.stop(self) context.stop(self)
case InitJoinAck(address, IncompatibleConfig)
// first InitJoinAck reply, but incompatible
if (ByPassConfigCompatCheck) {
// only join if set to ignore config validation
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining cluster with incompatible configurations. " +
"Join will be performed because compatibility check is configured to not be enforced.")
context.parent ! JoinTo(address)
context.stop(self)
} else {
log.error(
"Couldn't join seed nodes because of incompatible cluster configuration. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version." +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!")
context.stop(self)
CoordinatedShutdown(context.system).run(IncompatibleConfigurationDetected)
}
case InitJoinNack(address) case InitJoinNack(address)
logInfo("Received InitJoinNack message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinNack message from [{}] to [{}]", sender(), selfAddress)
remainingSeedNodes -= address remainingSeedNodes -= address
@ -1393,33 +1487,90 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
* 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2 * 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2
* *
*/ */
private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging { @InternalApi
private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging {
import InternalClusterAction._ import InternalClusterAction._
import ClusterUserAction.JoinTo import ClusterUserAction.JoinTo
def selfAddress = Cluster(context.system).selfAddress val cluster = Cluster(context.system)
import cluster.settings._
def selfAddress = cluster.selfAddress
if (seedNodes.isEmpty || seedNodes.head == selfAddress) if (seedNodes.isEmpty || seedNodes.head == selfAddress)
throw new IllegalArgumentException("Join seed node should not be done") throw new IllegalArgumentException("Join seed node should not be done")
context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout) context.setReceiveTimeout(SeedNodeTimeout)
var attempt = 0 var attempt = 0
// all seed nodes, except this one
val otherSeedNodes = seedNodes.toSet - selfAddress
override def preStart(): Unit = self ! JoinSeedNode override def preStart(): Unit = self ! JoinSeedNode
def receive = { def receive = {
case JoinSeedNode case JoinSeedNode
val requiredNonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joinConfigCompatChecker.requiredKeys, cluster.settings)
// configToValidate only contains the keys that are required according to JoinConfigCompatChecker on this node
val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)
// send InitJoin to all seed nodes (except myself) // send InitJoin to all seed nodes (except myself)
attempt += 1 attempt += 1
seedNodes.collect { otherSeedNodes.foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) }
case a if a != selfAddress context.actorSelection(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin } case InitJoinAck(address, CompatibleConfig(clusterConfig))
case InitJoinAck(address) log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
// first InitJoinAck reply // validates config coming from cluster against this node config
joinConfigCompatChecker.check(clusterConfig, context.system.settings.config) match {
case Valid
// first InitJoinAck reply
context.parent ! JoinTo(address)
context.become(done)
case Invalid(messages) if ByPassConfigCompatCheck
log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " +
"Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", "))
context.parent ! JoinTo(address)
context.become(done)
case Invalid(messages)
log.error("Cluster validated this node config, but sent back incompatible settings: {}. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version. " +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!", messages.mkString(", "))
context.stop(self)
CoordinatedShutdown(context.system).run(IncompatibleConfigurationDetected)
}
case InitJoinAck(address, UncheckedConfig)
log.warning("Joining a cluster without configuration compatibility check feature.")
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.become(done) context.become(done)
case InitJoinAck(address, IncompatibleConfig)
// first InitJoinAck reply, but incompatible
if (ByPassConfigCompatCheck) {
log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining cluster with incompatible configurations. " +
"Join will be performed because compatibility check is configured to not be enforced.")
// only join if set to ignore config validation
context.parent ! JoinTo(address)
context.become(done)
} else {
log.error(
"Couldn't join seed nodes because of incompatible cluster configuration. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version." +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!")
context.stop(self)
CoordinatedShutdown(context.system).run(IncompatibleConfigurationDetected)
}
case InitJoinNack(_) // that seed was uninitialized case InitJoinNack(_) // that seed was uninitialized
case ReceiveTimeout case ReceiveTimeout
if (attempt >= 2) if (attempt >= 2)
log.warning( log.warning(
@ -1430,8 +1581,8 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
} }
def done: Actor.Receive = { def done: Actor.Receive = {
case InitJoinAck(_) // already received one, skip rest case InitJoinAck(_, _) // already received one, skip rest
case ReceiveTimeout context.stop(self) case ReceiveTimeout context.stop(self)
} }
} }
@ -1440,6 +1591,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
* *
* The supplied callback will be run, once, when current cluster member come up with the same status. * The supplied callback will be run, once, when current cluster member come up with the same status.
*/ */
@InternalApi
private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging { private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging {
import ClusterEvent._ import ClusterEvent._
private val cluster = Cluster(context.system) private val cluster = Cluster(context.system)
@ -1487,6 +1639,7 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status:
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[cluster] final case class GossipStats( private[cluster] final case class GossipStats(
receivedGossipCount: Long = 0L, receivedGossipCount: Long = 0L,
@ -1530,8 +1683,8 @@ private[cluster] final case class GossipStats(
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[cluster] final case class VectorClockStats( private[cluster] final case class VectorClockStats(
versionSize: Int = 0, versionSize: Int = 0,
seenLatest: Int = 0) seenLatest: Int = 0)

View file

@ -147,9 +147,9 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val SelfDataCenter: DataCenter = cc.getString("multi-data-center.self-data-center") val SelfDataCenter: DataCenter = cc.getString("multi-data-center.self-data-center")
val Roles: Set[String] = { val Roles: Set[String] = {
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring ( val configuredRoles = immutableSeq(cc.getStringList("roles")).toSet requiring (
_.forall(!_.startsWith(DcRolePrefix)), _.forall(!_.startsWith(DcRolePrefix)),
s"Roles must not start with '${DcRolePrefix}' as that is reserved for the cluster self-data-center setting") s"Roles must not start with '$DcRolePrefix' as that is reserved for the cluster self-data-center setting")
configuredRoles + s"$DcRolePrefix$SelfDataCenter" configuredRoles + s"$DcRolePrefix$SelfDataCenter"
} }
@ -175,6 +175,25 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration") val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration")
val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
val ByPassConfigCompatCheck: Boolean = !cc.getBoolean("configuration-compatibility-check.enforce-on-join")
val ConfigCompatCheckers: Set[String] = {
import scala.collection.JavaConverters._
cc.getConfig("configuration-compatibility-check.checkers")
.root.unwrapped.values().asScala
.map(_.toString).toSet
}
val SensitiveConfigPaths = {
import scala.collection.JavaConverters._
val sensitiveKeys =
cc.getConfig("configuration-compatibility-check.sensitive-config-paths")
.root.unwrapped.values().asScala
.flatMap(_.asInstanceOf[java.util.List[String]].asScala)
sensitiveKeys.toSet
}
object Debug { object Debug {
val VerboseHeartbeatLogging: Boolean = cc.getBoolean("debug.verbose-heartbeat-logging") val VerboseHeartbeatLogging: Boolean = cc.getBoolean("debug.verbose-heartbeat-logging")
val VerboseGossipLogging: Boolean = cc.getBoolean("debug.verbose-gossip-logging") val VerboseGossipLogging: Boolean = cc.getBoolean("debug.verbose-gossip-logging")

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.annotation.InternalApi
import com.typesafe.config.Config
import scala.collection.{ immutable im }
/**
* INTERNAL API
*/
@InternalApi
final class JoinConfigCompatCheckCluster extends JoinConfigCompatChecker {
override def requiredKeys = im.Seq("akka.cluster.downing-provider-class")
override def check(toCheck: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
}

View file

@ -0,0 +1,178 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import java.util
import akka.actor.ExtendedActorSystem
import akka.annotation.{ DoNotInherit, InternalApi }
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
import scala.collection.JavaConverters._
import scala.collection.{ immutable im }
abstract class JoinConfigCompatChecker {
/** The configuration keys that are required for this checker */
def requiredKeys: im.Seq[String]
/**
* Runs the Config check.
*
* Implementers are free to define what makes Config entry compatible or not.
* We do provide some pre-build checks tough: [[JoinConfigCompatChecker.exists()]] and [[JoinConfigCompatChecker.fullMatch()]]
*
* @param toCheck - the Config instance to be checked
* @param actualConfig - the Config instance containing the actual values
* @return a [[ConfigValidation]]. Can be [[Valid]] or [[Invalid]], the later must contain a descriptive list of error messages.
*/
def check(toCheck: Config, actualConfig: Config): ConfigValidation
}
object JoinConfigCompatChecker {
/**
* Checks that all `requiredKeys` are available in `toCheck` Config.
*
* @param requiredKeys - a Seq of required keys
* @param toCheck - the Config instance to be checked
*/
def exists(requiredKeys: im.Seq[String], toCheck: Config): ConfigValidation = {
val allKeys = toCheck.entrySet().asScala.map(_.getKey)
// return all not found required keys
val result =
requiredKeys.collect {
case requiredKey if !allKeys.contains(requiredKey) requiredKey + " is missing"
}
if (result.isEmpty) Valid
else Invalid(result.to[im.Seq])
}
/**
* Checks that all `requiredKeys` are available in `toCheck` Config
* and its values match exactly the values in `currentConfig`.
*
* @param requiredKeys - a Seq of required keys
* @param toCheck - the Config instance to be checked
* @param actualConfig - the Config instance containing the expected values
*/
def fullMatch(requiredKeys: im.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation = {
def checkEquality = {
def checkCompat(entry: util.Map.Entry[String, ConfigValue]) = {
val key = entry.getKey
actualConfig.hasPath(key) && actualConfig.getValue(key) == entry.getValue
}
// retrieve all incompatible keys
// NOTE: we only check the key if effectively required
// because config may contain more keys than required for this checker
val incompatibleKeys =
toCheck.entrySet().asScala
.collect {
case entry if requiredKeys.contains(entry.getKey) && !checkCompat(entry) s"${entry.getKey} is incompatible"
}
if (incompatibleKeys.isEmpty) Valid
else Invalid(incompatibleKeys.to[im.Seq])
}
exists(requiredKeys, toCheck) ++ checkEquality
}
/**
* INTERNAL API
* Builds a new Config object containing only the required entries defined by `requiredKeys`
*
* This method is used from the joining side to prepare the [[Config]] instance that will be sent over the wire.
* We don't send the full config to avoid unnecessary data transfer, but also to avoid leaking any sensitive
* information that users may have added to their configuration.
*/
@InternalApi
private[cluster] def filterWithKeys(requiredKeys: im.Seq[String], config: Config): Config = {
val filtered =
config.entrySet().asScala
.collect {
case e if requiredKeys.contains(e.getKey) (e.getKey, e.getValue)
}
ConfigFactory.parseMap(filtered.toMap.asJava)
}
/**
* INTERNAL API
* Removes sensitive keys, as defined in 'akka.cluster.configuration-compatibility-check.sensitive-config-paths',
* from the passed `requiredKeys` Seq.
*/
@InternalApi
private[cluster] def removeSensitiveKeys(requiredKeys: im.Seq[String], clusterSettings: ClusterSettings): im.Seq[String] = {
requiredKeys.filter { key
!clusterSettings.SensitiveConfigPaths.exists(s key.startsWith(s))
}
}
/**
* INTERNAL API
* Builds a Seq of keys using the passed `Config` not including any sensitive keys,
* as defined in 'akka.cluster.configuration-compatibility-check.sensitive-config-paths'.
*/
@InternalApi
private[cluster] def removeSensitiveKeys(config: Config, clusterSettings: ClusterSettings): im.Seq[String] = {
val existingKeys = config.entrySet().asScala.map(_.getKey).to[im.Seq]
removeSensitiveKeys(existingKeys, clusterSettings)
}
/**
* INTERNAL API
*
* This method loads the [[JoinConfigCompatChecker]] defined in the configuration.
* Checkers are then combined to be used whenever a join node tries to join an existing cluster.
*/
@InternalApi
private[cluster] def load(system: ExtendedActorSystem, clusterSettings: ClusterSettings): JoinConfigCompatChecker = {
val checkers =
clusterSettings.ConfigCompatCheckers.map { fqcn
system.dynamicAccess
.createInstanceFor[JoinConfigCompatChecker](fqcn, im.Seq.empty)
.get // can't continue if we can't load it
}
// composite checker
new JoinConfigCompatChecker {
override val requiredKeys: im.Seq[String] = checkers.flatMap(_.requiredKeys).to[im.Seq]
override def check(toValidate: Config, clusterConfig: Config): ConfigValidation =
checkers.foldLeft(Valid: ConfigValidation) { (acc, checker)
acc ++ checker.check(toValidate, clusterConfig)
}
}
}
}
@DoNotInherit
sealed trait ConfigValidation {
def ++(that: ConfigValidation) = concat(that)
def concat(that: ConfigValidation) = {
(this, that) match {
case (Invalid(a), Invalid(b)) Invalid(a ++ b)
case (_, i @ Invalid(_)) i
case (i @ Invalid(_), _) i
case _ Valid
}
}
}
case object Valid extends ConfigValidation {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
final case class Invalid(errorMessages: im.Seq[String]) extends ConfigValidation

View file

@ -18,9 +18,10 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration.Deadline import scala.concurrent.duration.Deadline
import java.io.NotSerializableException import java.io.NotSerializableException
import akka.cluster.InternalClusterAction.ExitingConfirmed import akka.cluster.InternalClusterAction._
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
import akka.routing.Pool import akka.routing.Pool
import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions }
/** /**
* Protobuf serializer of cluster messages. * Protobuf serializer of cluster messages.
@ -40,7 +41,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
val roles = Set.empty[String] ++ m.getRolesList.asScala val roles = Set.empty[String] ++ m.getRolesList.asScala
InternalClusterAction.Join( InternalClusterAction.Join(
uniqueAddressFromProto(m.getNode), uniqueAddressFromProto(m.getNode),
if (roles.find(_.startsWith(ClusterSettings.DcRolePrefix)).isDefined) roles if (roles.exists(_.startsWith(ClusterSettings.DcRolePrefix))) roles
else roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter) else roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter)
) )
}, },
@ -51,8 +52,28 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
}, },
classOf[ClusterUserAction.Leave] (bytes ClusterUserAction.Leave(addressFromBinary(bytes))), classOf[ClusterUserAction.Leave] (bytes ClusterUserAction.Leave(addressFromBinary(bytes))),
classOf[ClusterUserAction.Down] (bytes ClusterUserAction.Down(addressFromBinary(bytes))), classOf[ClusterUserAction.Down] (bytes ClusterUserAction.Down(addressFromBinary(bytes))),
InternalClusterAction.InitJoin.getClass (_ InternalClusterAction.InitJoin), classOf[InternalClusterAction.InitJoin] {
classOf[InternalClusterAction.InitJoinAck] (bytes InternalClusterAction.InitJoinAck(addressFromBinary(bytes))), case bytes
val m = cm.InitJoin.parseFrom(bytes)
if (m.hasCurrentConfig)
InternalClusterAction.InitJoin(ConfigFactory.parseString(m.getCurrentConfig))
else
InternalClusterAction.InitJoin(ConfigFactory.empty)
},
classOf[InternalClusterAction.InitJoinAck] {
case bytes
val i = cm.InitJoinAck.parseFrom(bytes)
val configCheck =
if (i.hasConfigCheck) {
i.getConfigCheck.getType match {
case cm.ConfigCheck.Type.CompatibleConfig CompatibleConfig(ConfigFactory.parseString(i.getConfigCheck.getClusterConfig))
case cm.ConfigCheck.Type.IncompatibleConfig IncompatibleConfig
case cm.ConfigCheck.Type.UncheckedConfig UncheckedConfig
}
} else UncheckedConfig
InternalClusterAction.InitJoinAck(addressFromProto(i.getAddress), configCheck)
},
classOf[InternalClusterAction.InitJoinNack] (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), classOf[InternalClusterAction.InitJoinNack] (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.Heartbeat] (bytes ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.Heartbeat] (bytes ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRsp] (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRsp] (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))),
@ -65,19 +86,19 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
def includeManifest: Boolean = true def includeManifest: Boolean = true
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from) case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from) case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray case m: GossipStatus gossipStatusToProto(m).toByteArray
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip)) case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address) case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address) case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray case InternalClusterAction.InitJoin(config) initJoinToProto(config).toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address) case InternalClusterAction.InitJoinAck(address, configCheck) initJoinAckToProto(address, configCheck).toByteArray
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address) case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case InternalClusterAction.ExitingConfirmed(node) uniqueAddressToProtoByteArray(node) case InternalClusterAction.ExitingConfirmed(node) uniqueAddressToProtoByteArray(node)
case rp: ClusterRouterPool clusterRouterPoolToProtoByteArray(rp) case rp: ClusterRouterPool clusterRouterPoolToProtoByteArray(rp)
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
} }
@ -245,6 +266,33 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
private def joinToProto(node: UniqueAddress, roles: Set[String]): cm.Join = private def joinToProto(node: UniqueAddress, roles: Set[String]): cm.Join =
cm.Join.newBuilder().setNode(uniqueAddressToProto(node)).addAllRoles(roles.asJava).build() cm.Join.newBuilder().setNode(uniqueAddressToProto(node)).addAllRoles(roles.asJava).build()
private def initJoinToProto(currentConfig: Config): cm.InitJoin = {
cm.InitJoin.newBuilder()
.setCurrentConfig(currentConfig.root.render(ConfigRenderOptions.concise))
.build()
}
private def initJoinAckToProto(address: Address, configCheck: ConfigCheck): cm.InitJoinAck = {
val configCheckBuilder = cm.ConfigCheck.newBuilder()
configCheck match {
case UncheckedConfig
configCheckBuilder.setType(cm.ConfigCheck.Type.UncheckedConfig)
case IncompatibleConfig
configCheckBuilder.setType(cm.ConfigCheck.Type.IncompatibleConfig)
case CompatibleConfig(conf)
configCheckBuilder
.setType(cm.ConfigCheck.Type.CompatibleConfig)
.setClusterConfig(conf.root.render(ConfigRenderOptions.concise))
}
cm.InitJoinAck.newBuilder().
setAddress(addressToProto(address)).
setConfigCheck(configCheckBuilder.build()).
build()
}
private def welcomeToProto(from: UniqueAddress, gossip: Gossip): cm.Welcome = private def welcomeToProto(from: UniqueAddress, gossip: Gossip): cm.Welcome =
cm.Welcome.newBuilder().setFrom(uniqueAddressToProto(from)).setGossip(gossipToProto(gossip)).build() cm.Welcome.newBuilder().setFrom(uniqueAddressToProto(from)).setGossip(gossipToProto(gossip)).build()
@ -258,6 +306,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
val hashMapping = allHashes.zipWithIndex.toMap val hashMapping = allHashes.zipWithIndex.toMap
def mapUniqueAddress(uniqueAddress: UniqueAddress): Integer = mapWithErrorMessage(addressMapping, uniqueAddress, "address") def mapUniqueAddress(uniqueAddress: UniqueAddress): Integer = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
def mapRole(role: String): Integer = mapWithErrorMessage(roleMapping, role, "role") def mapRole(role: String): Integer = mapWithErrorMessage(roleMapping, role, "role")
def memberToProto(member: Member) = def memberToProto(member: Member) =
@ -419,7 +468,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
totalInstances = crps.getTotalInstances, totalInstances = crps.getTotalInstances,
maxInstancesPerNode = crps.getMaxInstancesPerNode, maxInstancesPerNode = crps.getMaxInstancesPerNode,
allowLocalRoutees = crps.getAllowLocalRoutees, allowLocalRoutees = crps.getAllowLocalRoutees,
useRoles = if (crps.hasUseRole) { crps.getUseRolesList.asScala.toSet + crps.getUseRole } else { crps.getUseRolesList.asScala.toSet } useRoles = if (crps.hasUseRole) {
crps.getUseRolesList.asScala.toSet + crps.getUseRole
} else {
crps.getUseRolesList.asScala.toSet
}
) )
} }

View file

@ -73,7 +73,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
awaitAssert(clusterView.status should ===(MemberStatus.Up)) awaitAssert(clusterView.status should ===(MemberStatus.Up))
} }
"publish inital state as snapshot to subscribers" in { "publish initial state as snapshot to subscribers" in {
try { try {
cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent]) cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent])
expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
@ -82,7 +82,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
} }
} }
"publish inital state as events to subscribers" in { "publish initial state as events to subscribers" in {
try { try {
cluster.subscribe(testActor, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberEvent]) cluster.subscribe(testActor, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberEvent])
expectMsgClass(classOf[ClusterEvent.MemberUp]) expectMsgClass(classOf[ClusterEvent.MemberUp])

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.actor.{ ActorSystem, Address }
import akka.cluster.MemberStatus.Removed
import akka.testkit.TestKitBase
import com.typesafe.config.{ Config, ConfigFactory }
import scala.collection.{ immutable im }
/**
* Builds on TestKitBase to provide some extra utilities to run cluster test.
*
* All functionality is provided through ClusterTest stateful class.
*/
trait ClusterTestKit extends TestKitBase {
/**
* Cluster test util to help manage ActorSystem creation and cluster formation.
* Useful for writing single jvm, but multi ActorSystem tests.
*
* NOTE: This class is stateful and not thread safe. A new instance must be created per test method.
*/
class ClusterTestUtil(val name: String) {
private var actorSystems: List[ActorSystem] = List.empty
/**
* Register an [[ActorSystem]].
*/
def register(actorSystem: ActorSystem) = {
actorSystems = actorSystems :+ actorSystem
actorSystem
}
/**
* Register an [[ActorSystem]].
*
* The [ActorSystem]] will be prepended to list and be considered the first node
*/
def registerAsFirst(actorSystem: ActorSystem) = {
actorSystems = actorSystem +: actorSystems
actorSystem
}
/**
* Creates a new [[ActorSystem]] using the passed [[Config]] and register it.
*/
def newActorSystem(config: Config): ActorSystem =
register(ActorSystem(name, config))
/**
* Creates a new [[ActorSystem]] using the passed [[Config]] and register it.
*
* This newly created [[ActorSystem]] will be prepended to list and be considered the first node
*/
def newActorSystemAsFirst(config: Config): ActorSystem =
registerAsFirst(ActorSystem(name, config))
/**
* Create a cluster using the registered [[ActorSystem]]s.
*
* The head of the list is considered the first node and will first create a cluster with itself.
* Other nodes will join the cluster after the first one.
*/
def formCluster(): Unit = {
require(actorSystems.nonEmpty, "Can't form a cluster with an empty list of ActorSystems")
val firstCluster = Cluster(actorSystems.head)
firstCluster.join(firstCluster.selfAddress)
val firstNode = firstCluster.readView
awaitCond(firstNode.isSingletonCluster)
// let the others join
actorSystems
.drop(1) // <- safe tail access
.foreach(joinCluster)
}
/**
* Makes the passed [[ActorSystem]] joins the cluster.
* The passed system must have been previously registered on this [[ClusterTestUtil]].
*/
def joinCluster(actorSystem: ActorSystem): Unit = {
require(isRegistered(actorSystem), "Unknown actor system")
val addresses = actorSystems.map(s Cluster(s).selfAddress)
val joiningNodeCluster = Cluster(actorSystem)
joiningNodeCluster.joinSeedNodes(addresses)
}
private def isRegistered(actorSystem: ActorSystem): Boolean =
actorSystems.contains(actorSystem)
/** Shuts down all registered [[ActorSystem]]s */
def shutdownAll(): Unit = actorSystems.foreach(sys shutdown(sys))
/**
* Force the passed [[ActorSystem]] to quit the cluster and shutdown.
* Once original system is removed, a new [[ActorSystem]] is started using the same address.
*/
def quitAndRestart(actorSystem: ActorSystem, config: Config) = {
require(isRegistered(actorSystem), "Unknown actor system")
// is this first seed node?
val firstSeedNode = actorSystems.headOption.contains(actorSystem)
val cluster = Cluster(actorSystem)
val port = cluster.selfAddress.port.get
// remove old before starting the new one
cluster.leave(cluster.readView.selfAddress)
awaitCond(cluster.readView.status == Removed, message = s"awaiting node [${cluster.readView.selfAddress}] to be 'Removed'")
shutdown(actorSystem)
awaitCond(cluster.isTerminated)
// remove from internal list
actorSystems = actorSystems.filterNot(_ == actorSystem)
val newConfig = ConfigFactory.parseString(
s"""
akka.remote.netty.tcp.port = $port
akka.remote.artery.canonical.port = $port
"""
).withFallback(config)
if (firstSeedNode) newActorSystemAsFirst(newConfig)
else newActorSystem(newConfig)
}
}
}

View file

@ -0,0 +1,655 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.actor.{ ActorSystem, Address }
import akka.cluster.InternalClusterAction.LeaderActionsTick
import akka.cluster.MemberStatus.{ Removed, Up }
import akka.testkit.{ AkkaSpec, LongRunningTest }
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.collection.{ immutable im }
object JoinConfigCompatCheckerSpec {
}
class JoinConfigCompatCheckerSpec extends AkkaSpec() with ClusterTestKit {
"A Joining Node" must {
"be allowed to join a cluster when its configuration is compatible" taggedAs LongRunningTest in {
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
awaitCond(Cluster(joiningNode).readView.status == Up, message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to join a cluster when its configuration is incompatible" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is incompatible
config-compat-test = "test2"
configuration-compatibility-check {
enforce-on-join = on
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.formCluster()
try {
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(Cluster(joiningNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to join a cluster when one of its required properties are not available on cluster side" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that cluster config are being sent back and checked on joining node as well
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is not available on cluster side
akka.cluster.config-compat-test-extra = on
configuration-compatibility-check {
enforce-on-join = on
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
akka-cluster-extra = "akka.cluster.JoinConfigCompatCheckerExtraTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.formCluster()
try {
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(Cluster(joiningNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to join a cluster when one of the cluster required properties are not available on the joining side" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that cluster config are being sent back and checked on joining node as well
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is required on cluster side
# config-compat-test = "test"
configuration-compatibility-check {
enforce-on-join = on
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(baseConfig))
clusterTestUtil.formCluster()
try {
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(Cluster(joiningNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"be allowed to join a cluster when one of its required properties are not available on cluster side but it's configured to NOT enforce it" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that validation on joining side takes 'configuration-compatibility-check.enforce-on-join' in consideration
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is not available on cluster side
akka.cluster.config-compat-test-extra = on
configuration-compatibility-check {
enforce-on-join = off
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
akka-cluster-extra = "akka.cluster.JoinConfigCompatCheckerExtraTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.formCluster()
try {
// join with compatible node
awaitCond(Cluster(joiningNode).readView.status == Up, message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}
"be allowed to join a cluster when its configuration is incompatible but it's configured to NOT enforce it" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config,
// but node will ignore the the config check and join anyway
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
configuration-compatibility-check {
# not enforcing config compat check
enforce-on-join = off
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
}
}
# this config is incompatible
config-compat-test = "test2"
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.formCluster()
try {
// join with compatible node
awaitCond(Cluster(joiningNode).readView.status == Up, message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}
/** This test verifies the built-in JoinConfigCompatCheckerAkkaCluster */
"NOT be allowed to join a cluster using a different value for akka.cluster.downing-provider-class" taggedAs LongRunningTest in {
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# using explicit downing provider class
downing-provider-class = "akka.cluster.AutoDowning"
configuration-compatibility-check {
enforce-on-join = on
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(baseConfig)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(baseConfig))
clusterTestUtil.formCluster()
try {
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(Cluster(joiningNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
}
"A First Node" must {
"be allowed to re-join a cluster when its configuration is compatible" taggedAs LongRunningTest in {
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, configWithChecker)
clusterTestUtil.joinCluster(restartedNode)
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.status == Up, message = "awaiting restarted first node to be 'Up'")
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to re-join a cluster when its configuration is incompatible" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is incompatible
config-compat-test = "test2"
configuration-compatibility-check {
enforce-on-join = on
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.joinCluster(restartedNode)
// node will shutdown after unsuccessful join attempt
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to re-join a cluster when one of its required properties are not available on cluster side" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that cluster config are being sent back and checked on joining node as well
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is not available on cluster side
akka.cluster.config-compat-test-extra = on
configuration-compatibility-check {
enforce-on-join = on
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
akka-cluster-extra = "akka.cluster.JoinConfigCompatCheckerExtraTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.joinCluster(restartedNode)
// node will shutdown after unsuccessful join attempt
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"NOT be allowed to re-join a cluster when one of the cluster required properties are not available on the joining side" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that cluster config are being sent back and checked on joining node as well
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is required on cluster side
# config-compat-test = "test"
configuration-compatibility-check {
enforce-on-join = on
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, joinNodeConfig.withFallback(baseConfig))
clusterTestUtil.joinCluster(restartedNode)
// node will shutdown after unsuccessful join attempt
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"be allowed to re-join a cluster when one of its required properties are not available on cluster side but it's configured to NOT enforce it" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config
// because there is one missing required configuration property.
// This test verifies that validation on joining side takes 'configuration-compatibility-check.enforce-on-join' in consideration
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# this config is not available on cluster side
akka.cluster.config-compat-test-extra = on
configuration-compatibility-check {
enforce-on-join = off
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
akka-cluster-extra = "akka.cluster.JoinConfigCompatCheckerExtraTest"
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// join with compatible node
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.joinCluster(restartedNode)
// node will will have joined the cluster
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.status == Up, message = "awaiting restarted node to be 'Up'")
}
} finally {
clusterTestUtil.shutdownAll()
}
}
"be allowed to re-join a cluster when its configuration is incompatible but it's configured to NOT enforce it" taggedAs LongRunningTest in {
// this config is NOT compatible with the cluster config,
// but node will ignore the the config check and join anyway
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
configuration-compatibility-check {
# not enforcing config compat check
enforce-on-join = off
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
}
}
# this config is incompatible
config-compat-test = "test2"
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
val firstNode = clusterTestUtil.newActorSystem(configWithChecker)
// second node
val secondNode = clusterTestUtil.newActorSystem(configWithChecker)
clusterTestUtil.formCluster()
try {
// join with compatible node
// we must wait second node to join the cluster before shutting down the first node
awaitCond(Cluster(secondNode).readView.status == Up, message = "awaiting second node to be 'Up'")
val restartedNode = clusterTestUtil.quitAndRestart(firstNode, joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.joinCluster(restartedNode)
// node will will have joined the cluster
within(20.seconds) {
awaitCond(Cluster(restartedNode).readView.status == Up, message = "awaiting restarted node to be 'Up'")
}
} finally {
clusterTestUtil.shutdownAll()
}
}
}
"A Cluster" must {
"NOT exchange sensitive config paths with joining node" taggedAs LongRunningTest in {
// this config has sensitive properties that are not compatible with the cluster
// the cluster will ignore them, because they are on the sensitive-config-path
// the cluster won't let it be leaked back to the joining node neither which will fail the join attempt.
val joinNodeConfig =
ConfigFactory.parseString(
"""
akka.cluster {
# these config are compatible,
# but won't be leaked back to joining node which will cause it to fail to join
sensitive.properties {
username = "abc"
password = "def"
}
configuration-compatibility-check {
enforce-on-join = on
checkers {
# rogue checker to trick the cluster to leak sensitive data
rogue-checker = "akka.cluster.RogueJoinConfigCompatCheckerTest"
}
# unset sensitive config paths
# this will allow the joining node to leak sensitive info and try
# get back these same properties from the cluster
sensitive-config-paths {
akka = []
}
}
}
"""
)
val clusterTestUtil = new ClusterTestUtil(system.name)
// first node
clusterTestUtil.newActorSystem(configWithChecker)
val joiningNode = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
clusterTestUtil.formCluster()
try {
// node will shutdown after unsuccessful join attempt
within(5.seconds) {
awaitCond(Cluster(joiningNode).readView.isTerminated)
}
} finally {
clusterTestUtil.shutdownAll()
}
}
}
val baseConfig: Config =
ConfigFactory.parseString(
"""
akka.actor.provider = "cluster"
akka.coordinated-shutdown.terminate-actor-system = on
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""
)
val configWithChecker: Config =
ConfigFactory.parseString(
"""
akka.cluster {
config-compat-test = "test"
sensitive.properties {
username = "abc"
password = "def"
}
configuration-compatibility-check {
enforce-on-join = on
checkers {
akka-cluster-test = "akka.cluster.JoinConfigCompatCheckerTest"
}
sensitive-config-paths {
akka = [ "akka.cluster.sensitive.properties" ]
}
}
}
"""
).withFallback(baseConfig)
}
class JoinConfigCompatCheckerTest extends JoinConfigCompatChecker {
override def requiredKeys = im.Seq("akka.cluster.config-compat-test")
override def check(toValidate: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toValidate, actualConfig)
}
class JoinConfigCompatCheckerExtraTest extends JoinConfigCompatChecker {
override def requiredKeys = im.Seq("akka.cluster.config-compat-test-extra")
override def check(toValidate: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toValidate, actualConfig)
}
/** Rogue checker that tries to leak sensitive information */
class RogueJoinConfigCompatCheckerTest extends JoinConfigCompatChecker {
override def requiredKeys = im.Seq("akka.cluster.sensitive.properties.password", "akka.cluster.sensitive.properties.username")
/** this check always returns Valid. The goal is to try to make the cluster leak those properties */
override def check(toValidate: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toValidate, actualConfig)
}

View file

@ -0,0 +1,159 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.{ immutable im }
class JoinConfigCompatPreDefinedChecksSpec extends WordSpec with Matchers {
// Test for some of the pre-build helpers we offer
"JoinConfigCompatChecker.exists" must {
val requiredKeys = im.Seq(
"akka.cluster.min-nr-of-members",
"akka.cluster.retry-unsuccessful-join-after",
"akka.cluster.allow-weakly-up-members"
)
"pass when all required keys are provided" in {
val result =
JoinConfigCompatChecker.exists(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
| akka.cluster.retry-unsuccessful-join-after = 10s
| akka.cluster.allow-weakly-up-members = on
|}
""".stripMargin)
)
result shouldBe Valid
}
"fail when some required keys are NOT provided" in {
val Invalid(incompatibleKeys) =
JoinConfigCompatChecker.exists(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
|}
""".stripMargin)
)
incompatibleKeys should have size 2
incompatibleKeys should contain("akka.cluster.retry-unsuccessful-join-after is missing")
incompatibleKeys should contain("akka.cluster.allow-weakly-up-members is missing")
}
}
"JoinConfigCompatChecker.fullMatch" must {
val requiredKeys = im.Seq(
"akka.cluster.min-nr-of-members",
"akka.cluster.retry-unsuccessful-join-after",
"akka.cluster.allow-weakly-up-members"
)
val clusterConfig =
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
| akka.cluster.retry-unsuccessful-join-after = 10s
| akka.cluster.allow-weakly-up-members = on
|}
""".stripMargin)
"pass when all required keys are provided and all match cluster config" in {
val result =
JoinConfigCompatChecker.fullMatch(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
| akka.cluster.retry-unsuccessful-join-after = 10s
| akka.cluster.allow-weakly-up-members = on
|}
""".stripMargin),
clusterConfig
)
result shouldBe Valid
}
"fail when some required keys are NOT provided" in {
val Invalid(incompatibleKeys) =
JoinConfigCompatChecker.fullMatch(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
|}
""".stripMargin),
clusterConfig
)
incompatibleKeys should have size 2
incompatibleKeys should contain("akka.cluster.retry-unsuccessful-join-after is missing")
incompatibleKeys should contain("akka.cluster.allow-weakly-up-members is missing")
}
"fail when all required keys are passed, but some values don't match cluster config" in {
val Invalid(incompatibleKeys) =
JoinConfigCompatChecker.fullMatch(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
| akka.cluster.retry-unsuccessful-join-after = 15s
| akka.cluster.allow-weakly-up-members = off
|}
""".stripMargin),
clusterConfig
)
incompatibleKeys should have size 2
incompatibleKeys should contain("akka.cluster.retry-unsuccessful-join-after is incompatible")
incompatibleKeys should contain("akka.cluster.allow-weakly-up-members is incompatible")
}
"fail when all required keys are passed, but some are missing and others don't match cluster config" in {
val Invalid(incompatibleKeys) =
JoinConfigCompatChecker.fullMatch(
requiredKeys,
config(
"""
|{
| akka.cluster.min-nr-of-members = 1
| akka.cluster.allow-weakly-up-members = off
|}
""".stripMargin),
clusterConfig
)
incompatibleKeys should have size 2
incompatibleKeys should contain("akka.cluster.retry-unsuccessful-join-after is missing")
incompatibleKeys should contain("akka.cluster.allow-weakly-up-members is incompatible")
}
}
def config(str: String): Config = ConfigFactory.parseString(str).resolve()
}

View file

@ -5,11 +5,13 @@ package akka.cluster.protobuf
import akka.cluster._ import akka.cluster._
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.cluster.InternalClusterAction.CompatibleConfig
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
import akka.routing.RoundRobinPool import akka.routing.RoundRobinPool
import collection.immutable.SortedSet import collection.immutable.SortedSet
import akka.testkit.{ AkkaSpec, TestKit } import akka.testkit.{ AkkaSpec, TestKit }
import com.typesafe.config.ConfigFactory
class ClusterMessageSerializerSpec extends AkkaSpec( class ClusterMessageSerializerSpec extends AkkaSpec(
"akka.actor.provider = cluster") { "akka.actor.provider = cluster") {
@ -52,8 +54,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A"))) checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A")))
checkSerialization(ClusterUserAction.Leave(address)) checkSerialization(ClusterUserAction.Leave(address))
checkSerialization(ClusterUserAction.Down(address)) checkSerialization(ClusterUserAction.Down(address))
checkSerialization(InternalClusterAction.InitJoin) checkSerialization(InternalClusterAction.InitJoin(ConfigFactory.empty))
checkSerialization(InternalClusterAction.InitJoinAck(address)) checkSerialization(InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)))
checkSerialization(InternalClusterAction.InitJoinNack(address)) checkSerialization(InternalClusterAction.InitJoinNack(address))
checkSerialization(ClusterHeartbeatSender.Heartbeat(address)) checkSerialization(ClusterHeartbeatSender.Heartbeat(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress)) checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress))

View file

@ -994,3 +994,27 @@ Related config properties: `akka.cluster.use-dispatcher = akka.cluster.cluster-d
Corresponding default values: `akka.cluster.use-dispatcher =`. Corresponding default values: `akka.cluster.use-dispatcher =`.
@@@ @@@
### Configuration Compatibility Check
Creating a cluster is about deploying two or more nodes and make then behave as if they were one single application. Therefore it's extremely important that all nodes in a cluster are configured with compatible settings.
The Configuration Compatibility Check feature ensures that all nodes in a cluster have a compatible configuration. Whenever a new node is joining an existing cluster, a subset of its configuration settings (only those that are required to be checked) is sent to the nodes in the cluster for verification. Once the configuration is checked on the cluster side, the cluster sends back its own set of required configuration settings. The joining node will then verify if it's compliant with the cluster configuration. The joining node will only proceed if all checks pass, on both sides.
New custom checkers can be added by extending `akka.cluster.JoinConfigCompatChecker` and including them in the configuration. Each checker must be associated with a unique key:
```
akka.cluster.configuration-compatibility-check.checkers {
my-custom-config = "com.company.MyCustomJoinConfigCompatChecker"
}
```
@@@ note
Configuration Compatibility Check is enabled by default, but can be disabled by setting `akka.cluster.configuration-compatibility-check.enforce-on-join = off`. This is specially useful when performing rolling updates. Obviously this should only be done if a complete cluster shutdown isn't an option. A cluster with nodes with different configuration settings may lead to data loss or data corruption.
This setting should only be disabled on the joining nodes. The checks are always performed on both sides, and warnings are logged. In case of incompatibilities, it is the responsibility of the joining node to decide if the process should be interrupted or not.
If you are performing a rolling update on cluster using Akka 2.5.9 or prior (thus, not supporting this feature), the checks will not be performed because the running cluster has no means to verify the configuration sent by the joining node, nor to send back its own configuration.
@@@