Step two in the cluster message manifest change #13654

This commit is contained in:
Johan Andrén 2020-04-03 10:36:31 +02:00 committed by GitHub
parent b4677f58d8
commit 7e900bab1b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 159 deletions

View file

@ -31,19 +31,15 @@ import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions }
@InternalApi
@ccompatUsedUntil213
private[akka] object ClusterMessageSerializer {
// Kept for one version iteration from 2.6.2 to allow rolling migration to short manifests
// will be removed in 2.6.3
// needs to be full class names for backwards compatibility
// Kept for one version iteration from 2.6.4 to allow rolling migration to short manifests
// can be removed in 2.6.6 or later.
val OldJoinManifest = s"akka.cluster.InternalClusterAction$$Join"
val OldWelcomeManifest = s"akka.cluster.InternalClusterAction$$Welcome"
val OldLeaveManifest = s"akka.cluster.ClusterUserAction$$Leave"
val OldDownManifest = s"akka.cluster.ClusterUserAction$$Down"
// #24622 wire compatibility
// we need to use this object name rather than classname to be able to join a 2.5.9 cluster during rolling upgrades
val OldInitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$"
val OldInitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck"
val OldInitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack"
// FIXME, remove in a later version (2.6?) and make 2.5.24+ a mandatory step for rolling upgrade
val HeartBeatManifestPre2523 = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat"
val HeartBeatRspManifest2523 = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
val OldExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed"
@ -82,26 +78,26 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive
def manifest(o: AnyRef): String = o match {
case _: InternalClusterAction.Join => OldJoinManifest
case _: InternalClusterAction.Welcome => OldWelcomeManifest
case _: ClusterUserAction.Leave => OldLeaveManifest
case _: ClusterUserAction.Down => OldDownManifest
case _: InternalClusterAction.InitJoin => OldInitJoinManifest
case _: InternalClusterAction.InitJoinAck => OldInitJoinAckManifest
case _: InternalClusterAction.InitJoinNack => OldInitJoinNackManifest
case _: ClusterHeartbeatSender.Heartbeat => HeartBeatManifestPre2523
case _: ClusterHeartbeatSender.HeartbeatRsp => HeartBeatRspManifest2523
case _: ExitingConfirmed => OldExitingConfirmedManifest
case _: GossipStatus => OldGossipStatusManifest
case _: GossipEnvelope => OldGossipEnvelopeManifest
case _: ClusterRouterPool => OldClusterRouterPoolManifest
case _: InternalClusterAction.Join => JoinManifest
case _: InternalClusterAction.Welcome => WelcomeManifest
case _: ClusterUserAction.Leave => LeaveManifest
case _: ClusterUserAction.Down => DownManifest
case _: InternalClusterAction.InitJoin => InitJoinManifest
case _: InternalClusterAction.InitJoinAck => InitJoinAckManifest
case _: InternalClusterAction.InitJoinNack => InitJoinNackManifest
case _: ClusterHeartbeatSender.Heartbeat => HeartbeatManifest
case _: ClusterHeartbeatSender.HeartbeatRsp => HeartbeatRspManifest
case _: ExitingConfirmed => ExitingConfirmedManifest
case _: GossipStatus => GossipStatusManifest
case _: GossipEnvelope => GossipEnvelopeManifest
case _: ClusterRouterPool => ClusterRouterPoolManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatSender.Heartbeat(from, _, _) => addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from, _, _) => uniqueAddressToProtoByteArray(from)
case hb: ClusterHeartbeatSender.Heartbeat => heartbeatToProtoByteArray(hb)
case hbr: ClusterHeartbeatSender.HeartbeatRsp => heartbeatRspToProtoByteArray(hbr)
case m: GossipEnvelope => gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus => gossipStatusToProto(m).toByteArray
case InternalClusterAction.Join(node, roles) => joinToProto(node, roles).toByteArray
@ -118,10 +114,22 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case HeartbeatManifest => deserializeHeartBeat(bytes)
case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes)
case GossipStatusManifest => deserializeGossipStatus(bytes)
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case InitJoinManifest => deserializeInitJoin(bytes)
case InitJoinAckManifest => deserializeInitJoinAck(bytes)
case InitJoinNackManifest => deserializeInitJoinNack(bytes)
case JoinManifest => deserializeJoin(bytes)
case WelcomeManifest => deserializeWelcome(bytes)
case LeaveManifest => deserializeLeave(bytes)
case DownManifest => deserializeDown(bytes)
case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
// needs to stay in 2.6.5 to be able to talk to a 2.5.{3,4} node during rolling upgrade
case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes)
case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes)
case HeartbeatManifest => deserializeHeartBeat(bytes)
case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes)
case OldGossipStatusManifest => deserializeGossipStatus(bytes)
case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case OldInitJoinManifest => deserializeInitJoin(bytes)
@ -133,17 +141,6 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
case OldDownManifest => deserializeDown(bytes)
case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
case GossipStatusManifest => deserializeGossipStatus(bytes)
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case InitJoinManifest => deserializeInitJoin(bytes)
case InitJoinAckManifest => deserializeInitJoinAck(bytes)
case InitJoinNackManifest => deserializeInitJoinNack(bytes)
case JoinManifest => deserializeJoin(bytes)
case WelcomeManifest => deserializeWelcome(bytes)
case LeaveManifest => deserializeLeave(bytes)
case DownManifest => deserializeDown(bytes)
case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
case _ => throw new IllegalArgumentException(s"Unknown manifest [${manifest}]")
}
@ -172,6 +169,26 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
out.toByteArray
}
private def heartbeatToProtoByteArray(hb: ClusterHeartbeatSender.Heartbeat): Array[Byte] = {
cm.Heartbeat
.newBuilder()
.setFrom(addressToProto(hb.from))
.setSequenceNr(hb.sequenceNr)
.setCreationTime(hb.creationTimeNanos)
.build
.toByteArray
}
private def heartbeatRspToProtoByteArray(hbr: ClusterHeartbeatSender.HeartbeatRsp): Array[Byte] = {
cm.HeartBeatResponse
.newBuilder()
.setFrom(uniqueAddressToProto(hbr.from))
.setSequenceNr(hbr.sequenceNr)
.setCreationTime(hbr.creationTimeNanos)
.build
.toByteArray
}
private def addressFromBinary(bytes: Array[Byte]): Address =
addressFromProto(cm.Address.parseFrom(bytes))

View file

@ -5,14 +5,13 @@
package akka.cluster.protobuf
import akka.cluster._
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.InternalClusterAction.CompatibleConfig
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
import akka.routing.RoundRobinPool
import akka.cluster.protobuf.msg.{ ClusterMessages => cm }
import collection.immutable.SortedSet
import akka.testkit.{ AkkaSpec, TestKit }
import akka.testkit.AkkaSpec
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
@ -105,38 +104,28 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
}
// can be removed in 2.6.3 only checks deserialization with new not yet in effect manifests for 2.6.2
"be serializable with new manifests for 2.6.3" in {
"be de-serializable with class manifests from 2.6.4 and earlier nodes" in {
val address = Address("akka", "system", "some.host.org", 4711)
val uniqueAddress = UniqueAddress(address, 17L)
val address2 = Address("akka", "system", "other.host.org", 4711)
val uniqueAddress2 = UniqueAddress(address2, 18L)
checkDeserializationWithManifest(
InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A")),
ClusterMessageSerializer.JoinManifest)
ClusterMessageSerializer.OldJoinManifest)
checkDeserializationWithManifest(ClusterUserAction.Leave(address), ClusterMessageSerializer.LeaveManifest)
checkDeserializationWithManifest(ClusterUserAction.Down(address), ClusterMessageSerializer.DownManifest)
checkDeserializationWithManifest(
InternalClusterAction.InitJoin(ConfigFactory.empty),
ClusterMessageSerializer.InitJoinManifest)
ClusterMessageSerializer.OldInitJoinManifest)
checkDeserializationWithManifest(
InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)),
ClusterMessageSerializer.InitJoinAckManifest)
ClusterMessageSerializer.OldInitJoinAckManifest)
checkDeserializationWithManifest(
InternalClusterAction.InitJoinNack(address),
ClusterMessageSerializer.InitJoinNackManifest)
/* this has changed in 2.5.23 but it seems we forgot to add the next step in 2.5.24
so we can't do two-way like this, the new manifest actually expects an address + timestamp + seqnr only when the new manifest is used
see test below.
checkDeserializationWithManifest(
ClusterHeartbeatSender.Heartbeat(address, -1, -1),
ClusterMessageSerializer.HeartbeatManifest)
checkDeserializationWithManifest(
ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1),
ClusterMessageSerializer.HeartbeatRspManifest)
*/
ClusterMessageSerializer.OldInitJoinNackManifest)
checkDeserializationWithManifest(
InternalClusterAction.ExitingConfirmed(uniqueAddress),
ClusterMessageSerializer.ExitingConfirmedManifest)
ClusterMessageSerializer.OldExitingConfirmedManifest)
val node1 = VectorClock.Node("node1")
val node2 = VectorClock.Node("node2")
@ -149,85 +138,15 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
.unreachable(b1.uniqueAddress, e1.uniqueAddress)
checkDeserializationWithManifest(
GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1),
ClusterMessageSerializer.GossipEnvelopeManifest)
ClusterMessageSerializer.OldGossipEnvelopeManifest)
checkDeserializationWithManifest(
GossipStatus(a1.uniqueAddress, g1.version),
ClusterMessageSerializer.GossipStatusManifest)
ClusterMessageSerializer.OldGossipStatusManifest)
checkDeserializationWithManifest(
InternalClusterAction.Welcome(uniqueAddress, g2),
ClusterMessageSerializer.WelcomeManifest)
}
"be compatible with wire format of version 2.5.9 (using InitJoin singleton instead of class)" in {
// we must use the old singleton class name so that the other side will see an InitJoin
// but discard the config as it does not know about the config check
val oldClassName = "akka.cluster.InternalClusterAction$InitJoin$"
serializer.manifest(InternalClusterAction.InitJoin(ConfigFactory.empty())) should ===(oldClassName)
// in 2.5.9 and earlier, it was an object and serialized to empty byte array
// and we should accept that
val deserialized = serializer.fromBinary(Array.emptyByteArray, oldClassName)
deserialized shouldBe an[InternalClusterAction.InitJoin]
}
"deserialize from wire format of version 2.5.9 (using serialized address for InitJoinAck)" in {
// we must use the old singleton class name so that the other side will see an InitJoin
// but discard the config as it does not know about the config check
val initJoinAck = InternalClusterAction.InitJoinAck(
Address("akka", "cluster", "127.0.0.1", 2552),
InternalClusterAction.UncheckedConfig)
val serializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray
val deserialized =
serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.OldInitJoinAckManifest)
deserialized shouldEqual initJoinAck
}
"serialize to wire format of version 2.5.9 (using serialized address for InitJoinAck)" in {
val initJoinAck = InternalClusterAction.InitJoinAck(
Address("akka", "cluster", "127.0.0.1", 2552),
InternalClusterAction.ConfigCheckUnsupportedByJoiningNode)
val bytes = serializer.toBinary(initJoinAck)
val expectedSerializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray
bytes.toList should ===(expectedSerializedInitJoinAckPre2510.toList)
}
"be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in {
val system = ActorSystem("ClusterMessageSerializer-old-wire-format")
try {
val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
// the oldSnapshot was created with the version of ClusterRouterPoolSettings in Akka 2.5.3. See issue #23257.
// It was created with:
/*
import org.apache.commons.codec.binary.Hex.encodeHex
val bytes = serializer.toBinary(
ClusterRouterPool(RoundRobinPool(nrOfInstances = 4), ClusterRouterPoolSettings(123, 345, true, Some("role ABC"))))
println(String.valueOf(encodeHex(bytes)))
*/
val oldBytesHex = "0a0f08101205524f5252501a04080418001211087b10d90218012208726f6c6520414243"
import org.apache.commons.codec.binary.Hex.decodeHex
val oldBytes = decodeHex(oldBytesHex.toCharArray)
val result = serializer.fromBinary(oldBytes, classOf[ClusterRouterPool])
result match {
case pool: ClusterRouterPool =>
pool.settings.totalInstances should ===(123)
pool.settings.maxInstancesPerNode should ===(345)
pool.settings.allowLocalRoutees should ===(true)
pool.settings.useRole should ===(Some("role ABC"))
pool.settings.useRoles should ===(Set("role ABC"))
}
} finally {
TestKit.shutdownActorSystem(system)
}
ClusterMessageSerializer.OldWelcomeManifest)
}
"add a default data center role to gossip if none is present" in {
@ -242,43 +161,20 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
}
}
// support for deserializing a new format with a string based manifest was added in 2.5.23 but the next step
// was never done, meaning that 2.6.4 still emits the old format
"Rolling upgrades for heart beat message changes in 2.5.23" must {
// FIXME, add issue for serializing this as the new message type
"serialize heart beats as Address to support versions prior or 2.5.23" in {
serializer.manifest(ClusterHeartbeatSender.Heartbeat(a1.address, -1, -1)) should ===(
ClusterMessageSerializer.HeartBeatManifestPre2523)
"deserialize heart beats represented by just an address Address to support versions prior or 2.6.5" in {
val serialized = serializer.addressToProto(a1.address).build().toByteArray
val deserialized = serializer.fromBinary(serialized, ClusterMessageSerializer.HeartBeatManifestPre2523)
deserialized should ===(ClusterHeartbeatSender.Heartbeat(a1.address, -1, -1))
}
"serialize heart beat responses as UniqueAddress to support versions prior to 2.5.23" in {
serializer.manifest(ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, -1, -1)) should ===(
ClusterMessageSerializer.HeartBeatRspManifest2523)
}
"be able to deserialize HeartBeat protobuf message" in {
val hbProtobuf = cm.Heartbeat
.newBuilder()
.setFrom(serializer.addressToProto(a1.address))
.setSequenceNr(1)
.setCreationTime(2)
.build()
.toByteArray
serializer.fromBinary(hbProtobuf, ClusterMessageSerializer.HeartbeatManifest) should ===(
ClusterHeartbeatSender.Heartbeat(a1.address, 1, 2))
}
"be able to deserialize HeartBeatRsp probuf message" in {
val hbrProtobuf = cm.HeartBeatResponse
.newBuilder()
.setFrom(serializer.uniqueAddressToProto(a1.uniqueAddress))
.setSequenceNr(1)
.setCreationTime(2)
.build()
.toByteArray
serializer.fromBinary(hbrProtobuf, ClusterMessageSerializer.HeartbeatRspManifest) should ===(
ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, 1, 2))
"deserialize heart beat responses as UniqueAddress to support versions prior to 2.5.23" in {
val serialized = serializer.uniqueAddressToProto(a1.uniqueAddress).build().toByteArray
val deserialized = serializer.fromBinary(serialized, ClusterMessageSerializer.HeartBeatRspManifest2523)
deserialized should ===(ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, -1, -1))
}
}