Support joining 2.5.9 or earlier, compat InitJoinAck, #25491

* Detect that joining node is 2.5.9 or earlier by empty ConfigCheck
  config in InitJoin message. Then send back Address, which was the
  old representation of InitJoinAck
* Include akka.version in logging to facilitate troubleshooting
This commit is contained in:
Patrik Nordwall 2018-08-23 20:37:42 +02:00 committed by GitHub
parent 336400832b
commit 25079cb568
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 22 deletions

View file

@ -99,7 +99,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
// ClusterJmx is initialized as the last thing in the constructor
private var clusterJmx: Option[ClusterJmx] = None
logInfo("Starting up...")
logInfo("Starting up, Akka version [{}] ...", system.settings.ConfigVersion)
val failureDetector: FailureDetectorRegistry[Address] = {
val createFailureDetector = ()

View file

@ -99,6 +99,13 @@ private[cluster] object InternalClusterAction {
sealed trait ConfigCheck
case object UncheckedConfig extends ConfigCheck
case object IncompatibleConfig extends ConfigCheck
/**
* Node with version 2.5.9 or earlier is joining. The serialized
* representation of `InitJoinAck` must be a plain `Address` for
* such a joining node.
*/
case object ConfigCheckUnsupportedByJoiningNode extends ConfigCheck
final case class CompatibleConfig(clusterConfig: Config) extends ConfigCheck
/**
@ -526,13 +533,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
def initJoin(joiningNodeConfig: Config): Unit = {
val joiningNodeVersion =
if (joiningNodeConfig.hasPath("akka.version")) joiningNodeConfig.getString("akka.version")
else "unknown"
// When joiningNodeConfig is empty the joining node has version 2.5.9 or earlier.
val configCheckUnsupportedByJoiningNode = joiningNodeConfig.isEmpty
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (removeUnreachableWithMemberStatus.contains(selfStatus)) {
// prevents a Down and Exiting node from being used for joining
logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender())
logInfo("Sending InitJoinNack message from node [{}] to [{}] (version [{}])", selfAddress, sender(),
joiningNodeVersion)
sender() ! InitJoinNack(selfAddress)
} else {
logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender())
logInfo("Sending InitJoinAck message from node [{}] to [{}] (version [{}])", selfAddress, sender(),
joiningNodeVersion)
// run config compatibility check using config provided by
// joining node and current (full) config on cluster side
@ -543,19 +559,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, context.system.settings.config)
}
joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match {
case Valid
val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings)
// Send back to joining node a subset of current configuration
// containing the keys initially sent by the joining node minus
// any sensitive keys as defined by this node configuration
val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config)
sender() ! InitJoinAck(selfAddress, CompatibleConfig(clusterConfig))
case Invalid(messages)
// messages are only logged on the cluster side
log.warning("Found incompatible settings when [{}] tried to join: {}", sender().path.address, messages.mkString(", "))
sender() ! InitJoinAck(selfAddress, IncompatibleConfig)
}
val configCheckReply =
joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match {
case Valid
if (configCheckUnsupportedByJoiningNode)
ConfigCheckUnsupportedByJoiningNode
else {
val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings)
// Send back to joining node a subset of current configuration
// containing the keys initially sent by the joining node minus
// any sensitive keys as defined by this node configuration
val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config)
CompatibleConfig(clusterConfig)
}
case Invalid(messages)
// messages are only logged on the cluster side
log.warning(
"Found incompatible settings when [{}] tried to join: {}. " +
"Self version [{}], Joining version [{}].",
sender().path.address, messages.mkString(", "),
context.system.settings.ConfigVersion, joiningNodeVersion)
if (configCheckUnsupportedByJoiningNode)
ConfigCheckUnsupportedByJoiningNode
else
IncompatibleConfig
}
sender() ! InitJoinAck(selfAddress, configCheckReply)
}
}

View file

@ -145,7 +145,10 @@ object JoinConfigCompatChecker {
// composite checker
new JoinConfigCompatChecker {
override val requiredKeys: im.Seq[String] = checkers.flatMap(_.requiredKeys).to[im.Seq]
override val requiredKeys: im.Seq[String] = {
// Always include akka.version (used in join logging)
"akka.version" +: checkers.flatMap(_.requiredKeys).to[im.Seq]
}
override def check(toValidate: Config, clusterConfig: Config): ConfigValidation =
checkers.foldLeft(Valid: ConfigValidation) { (acc, checker)
acc ++ checker.check(toValidate, clusterConfig)

View file

@ -12,11 +12,11 @@ import akka.cluster._
import akka.cluster.protobuf.msg.{ ClusterMessages cm }
import akka.serialization._
import akka.protobuf.{ ByteString, MessageLite }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.concurrent.duration.Deadline
import akka.annotation.InternalApi
import akka.cluster.InternalClusterAction._
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
@ -87,7 +87,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin(config) initJoinToProto(config).toByteArray
case InternalClusterAction.InitJoinAck(address, configCheck) initJoinAckToProto(address, configCheck).toByteArray
case InternalClusterAction.InitJoinAck(address, configCheck) initJoinAckToByteArray(address, configCheck)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case InternalClusterAction.ExitingConfirmed(node) uniqueAddressToProtoByteArray(node)
case rp: ClusterRouterPool clusterRouterPoolToProtoByteArray(rp)
@ -330,6 +330,13 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
.build()
}
private def initJoinAckToByteArray(address: Address, configCheck: ConfigCheck): Array[Byte] = {
if (configCheck == ConfigCheckUnsupportedByJoiningNode)
addressToProtoByteArray(address) // plain Address in 2.5.9 or earlier
else
initJoinAckToProto(address, configCheck).toByteArray
}
private def initJoinAckToProto(address: Address, configCheck: ConfigCheck): cm.InitJoinAck = {
val configCheckBuilder = cm.ConfigCheck.newBuilder()
@ -344,7 +351,12 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
configCheckBuilder
.setType(cm.ConfigCheck.Type.CompatibleConfig)
.setClusterConfig(conf.root.render(ConfigRenderOptions.concise))
case ConfigCheckUnsupportedByJoiningNode
// handled as Address in initJoinAckToByteArray
throw new IllegalStateException("Unexpected ConfigCheckUnsupportedByJoiningNode")
}
cm.InitJoinAck.newBuilder().
setAddress(addressToProto(address)).
setConfigCheck(configCheckBuilder.build()).

View file

@ -96,18 +96,28 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
deserialized shouldBe an[InternalClusterAction.InitJoin]
}
"be compatible with wire format of version 2.5.9 (using serialized address for InitJoinAck)" in {
"deserialize from wire format of version 2.5.9 (using serialized address for InitJoinAck)" in {
// we must use the old singleton class name so that the other side will see an InitJoin
// but discard the config as it does not know about the config check
val initJoinAck = InternalClusterAction.InitJoinAck(
Address("akka.tcp", "cluster", "127.0.0.1", 2552),
InternalClusterAction.UncheckedConfig)
val serializedinInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray
val serializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray
val deserialized = serializer.fromBinary(serializedinInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest)
val deserialized = serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest)
deserialized shouldEqual initJoinAck
}
"serialize to wire format of version 2.5.9 (using serialized address for InitJoinAck)" in {
val initJoinAck = InternalClusterAction.InitJoinAck(
Address("akka.tcp", "cluster", "127.0.0.1", 2552),
InternalClusterAction.ConfigCheckUnsupportedByJoiningNode)
val bytes = serializer.toBinary(initJoinAck)
val expectedSerializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray
bytes.toList should ===(expectedSerializedInitJoinAckPre2510.toList)
}
"be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in {
val system = ActorSystem("ClusterMessageSerializer-old-wire-format")