From 25079cb56898f1cdbab5115de36d685bb61503a6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 23 Aug 2018 20:37:42 +0200 Subject: [PATCH] Support joining 2.5.9 or earlier, compat InitJoinAck, #25491 * Detect that joining node is 2.5.9 or earlier by empty ConfigCheck config in InitJoin message. Then send back Address, which was the old representation of InitJoinAck * Include akka.version in logging to facilitate troubleshooting --- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../scala/akka/cluster/ClusterDaemon.scala | 60 ++++++++++++++----- .../cluster/JoinConfigCompatChecker.scala | 5 +- .../protobuf/ClusterMessageSerializer.scala | 16 ++++- .../ClusterMessageSerializerSpec.scala | 16 ++++- 5 files changed, 77 insertions(+), 22 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c947ab5b1f..a34cf303a0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -99,7 +99,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { // ClusterJmx is initialized as the last thing in the constructor private var clusterJmx: Option[ClusterJmx] = None - logInfo("Starting up...") + logInfo("Starting up, Akka version [{}] ...", system.settings.ConfigVersion) val failureDetector: FailureDetectorRegistry[Address] = { val createFailureDetector = () ⇒ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index aa33e1b04a..f5dcc20620 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -99,6 +99,13 @@ private[cluster] object InternalClusterAction { sealed trait ConfigCheck case object UncheckedConfig extends ConfigCheck case object IncompatibleConfig extends ConfigCheck + /** + * Node with version 2.5.9 or earlier is joining. The serialized + * representation of `InitJoinAck` must be a plain `Address` for + * such a joining node. + */ + case object ConfigCheckUnsupportedByJoiningNode extends ConfigCheck + final case class CompatibleConfig(clusterConfig: Config) extends ConfigCheck /** @@ -526,13 +533,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } def initJoin(joiningNodeConfig: Config): Unit = { + val joiningNodeVersion = + if (joiningNodeConfig.hasPath("akka.version")) joiningNodeConfig.getString("akka.version") + else "unknown" + // When joiningNodeConfig is empty the joining node has version 2.5.9 or earlier. + val configCheckUnsupportedByJoiningNode = joiningNodeConfig.isEmpty + val selfStatus = latestGossip.member(selfUniqueAddress).status + if (removeUnreachableWithMemberStatus.contains(selfStatus)) { // prevents a Down and Exiting node from being used for joining - logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender()) + logInfo("Sending InitJoinNack message from node [{}] to [{}] (version [{}])", selfAddress, sender(), + joiningNodeVersion) sender() ! InitJoinNack(selfAddress) } else { - logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender()) + logInfo("Sending InitJoinAck message from node [{}] to [{}] (version [{}])", selfAddress, sender(), + joiningNodeVersion) // run config compatibility check using config provided by // joining node and current (full) config on cluster side @@ -543,19 +559,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, context.system.settings.config) } - joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match { - case Valid ⇒ - val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings) - // Send back to joining node a subset of current configuration - // containing the keys initially sent by the joining node minus - // any sensitive keys as defined by this node configuration - val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config) - sender() ! InitJoinAck(selfAddress, CompatibleConfig(clusterConfig)) - case Invalid(messages) ⇒ - // messages are only logged on the cluster side - log.warning("Found incompatible settings when [{}] tried to join: {}", sender().path.address, messages.mkString(", ")) - sender() ! InitJoinAck(selfAddress, IncompatibleConfig) - } + val configCheckReply = + joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match { + case Valid ⇒ + if (configCheckUnsupportedByJoiningNode) + ConfigCheckUnsupportedByJoiningNode + else { + val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings) + // Send back to joining node a subset of current configuration + // containing the keys initially sent by the joining node minus + // any sensitive keys as defined by this node configuration + val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config) + CompatibleConfig(clusterConfig) + } + case Invalid(messages) ⇒ + // messages are only logged on the cluster side + log.warning( + "Found incompatible settings when [{}] tried to join: {}. " + + "Self version [{}], Joining version [{}].", + sender().path.address, messages.mkString(", "), + context.system.settings.ConfigVersion, joiningNodeVersion) + if (configCheckUnsupportedByJoiningNode) + ConfigCheckUnsupportedByJoiningNode + else + IncompatibleConfig + } + + sender() ! InitJoinAck(selfAddress, configCheckReply) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala index 0bb74a34df..3bab8e163b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala +++ b/akka-cluster/src/main/scala/akka/cluster/JoinConfigCompatChecker.scala @@ -145,7 +145,10 @@ object JoinConfigCompatChecker { // composite checker new JoinConfigCompatChecker { - override val requiredKeys: im.Seq[String] = checkers.flatMap(_.requiredKeys).to[im.Seq] + override val requiredKeys: im.Seq[String] = { + // Always include akka.version (used in join logging) + "akka.version" +: checkers.flatMap(_.requiredKeys).to[im.Seq] + } override def check(toValidate: Config, clusterConfig: Config): ConfigValidation = checkers.foldLeft(Valid: ConfigValidation) { (acc, checker) ⇒ acc ++ checker.check(toValidate, clusterConfig) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 1419f6409b..1ac410b06d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -12,11 +12,11 @@ import akka.cluster._ import akka.cluster.protobuf.msg.{ ClusterMessages ⇒ cm } import akka.serialization._ import akka.protobuf.{ ByteString, MessageLite } - import scala.annotation.tailrec import scala.collection.immutable import scala.collection.JavaConverters._ import scala.concurrent.duration.Deadline + import akka.annotation.InternalApi import akka.cluster.InternalClusterAction._ import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } @@ -87,7 +87,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se case ClusterUserAction.Leave(address) ⇒ addressToProtoByteArray(address) case ClusterUserAction.Down(address) ⇒ addressToProtoByteArray(address) case InternalClusterAction.InitJoin(config) ⇒ initJoinToProto(config).toByteArray - case InternalClusterAction.InitJoinAck(address, configCheck) ⇒ initJoinAckToProto(address, configCheck).toByteArray + case InternalClusterAction.InitJoinAck(address, configCheck) ⇒ initJoinAckToByteArray(address, configCheck) case InternalClusterAction.InitJoinNack(address) ⇒ addressToProtoByteArray(address) case InternalClusterAction.ExitingConfirmed(node) ⇒ uniqueAddressToProtoByteArray(node) case rp: ClusterRouterPool ⇒ clusterRouterPoolToProtoByteArray(rp) @@ -330,6 +330,13 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se .build() } + private def initJoinAckToByteArray(address: Address, configCheck: ConfigCheck): Array[Byte] = { + if (configCheck == ConfigCheckUnsupportedByJoiningNode) + addressToProtoByteArray(address) // plain Address in 2.5.9 or earlier + else + initJoinAckToProto(address, configCheck).toByteArray + } + private def initJoinAckToProto(address: Address, configCheck: ConfigCheck): cm.InitJoinAck = { val configCheckBuilder = cm.ConfigCheck.newBuilder() @@ -344,7 +351,12 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se configCheckBuilder .setType(cm.ConfigCheck.Type.CompatibleConfig) .setClusterConfig(conf.root.render(ConfigRenderOptions.concise)) + + case ConfigCheckUnsupportedByJoiningNode ⇒ + // handled as Address in initJoinAckToByteArray + throw new IllegalStateException("Unexpected ConfigCheckUnsupportedByJoiningNode") } + cm.InitJoinAck.newBuilder(). setAddress(addressToProto(address)). setConfigCheck(configCheckBuilder.build()). diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index dde08ed672..5f913712d0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -96,18 +96,28 @@ class ClusterMessageSerializerSpec extends AkkaSpec( deserialized shouldBe an[InternalClusterAction.InitJoin] } - "be compatible with wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { + "deserialize from wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { // we must use the old singleton class name so that the other side will see an InitJoin // but discard the config as it does not know about the config check val initJoinAck = InternalClusterAction.InitJoinAck( Address("akka.tcp", "cluster", "127.0.0.1", 2552), InternalClusterAction.UncheckedConfig) - val serializedinInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray + val serializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray - val deserialized = serializer.fromBinary(serializedinInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest) + val deserialized = serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest) deserialized shouldEqual initJoinAck } + "serialize to wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { + val initJoinAck = InternalClusterAction.InitJoinAck( + Address("akka.tcp", "cluster", "127.0.0.1", 2552), + InternalClusterAction.ConfigCheckUnsupportedByJoiningNode) + val bytes = serializer.toBinary(initJoinAck) + + val expectedSerializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray + bytes.toList should ===(expectedSerializedInitJoinAckPre2510.toList) + } + "be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in { val system = ActorSystem("ClusterMessageSerializer-old-wire-format")