diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 43e0f5f049..1c2e2d8ded 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -31,19 +31,15 @@ import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } @InternalApi @ccompatUsedUntil213 private[akka] object ClusterMessageSerializer { - // Kept for one version iteration from 2.6.2 to allow rolling migration to short manifests - // will be removed in 2.6.3 - // needs to be full class names for backwards compatibility + // Kept for one version iteration from 2.6.4 to allow rolling migration to short manifests + // can be removed in 2.6.6 or later. val OldJoinManifest = s"akka.cluster.InternalClusterAction$$Join" val OldWelcomeManifest = s"akka.cluster.InternalClusterAction$$Welcome" val OldLeaveManifest = s"akka.cluster.ClusterUserAction$$Leave" val OldDownManifest = s"akka.cluster.ClusterUserAction$$Down" - // #24622 wire compatibility - // we need to use this object name rather than classname to be able to join a 2.5.9 cluster during rolling upgrades val OldInitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$" val OldInitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck" val OldInitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack" - // FIXME, remove in a later version (2.6?) and make 2.5.24+ a mandatory step for rolling upgrade val HeartBeatManifestPre2523 = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat" val HeartBeatRspManifest2523 = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp" val OldExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed" @@ -82,26 +78,26 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive def manifest(o: AnyRef): String = o match { - case _: InternalClusterAction.Join => OldJoinManifest - case _: InternalClusterAction.Welcome => OldWelcomeManifest - case _: ClusterUserAction.Leave => OldLeaveManifest - case _: ClusterUserAction.Down => OldDownManifest - case _: InternalClusterAction.InitJoin => OldInitJoinManifest - case _: InternalClusterAction.InitJoinAck => OldInitJoinAckManifest - case _: InternalClusterAction.InitJoinNack => OldInitJoinNackManifest - case _: ClusterHeartbeatSender.Heartbeat => HeartBeatManifestPre2523 - case _: ClusterHeartbeatSender.HeartbeatRsp => HeartBeatRspManifest2523 - case _: ExitingConfirmed => OldExitingConfirmedManifest - case _: GossipStatus => OldGossipStatusManifest - case _: GossipEnvelope => OldGossipEnvelopeManifest - case _: ClusterRouterPool => OldClusterRouterPoolManifest + case _: InternalClusterAction.Join => JoinManifest + case _: InternalClusterAction.Welcome => WelcomeManifest + case _: ClusterUserAction.Leave => LeaveManifest + case _: ClusterUserAction.Down => DownManifest + case _: InternalClusterAction.InitJoin => InitJoinManifest + case _: InternalClusterAction.InitJoinAck => InitJoinAckManifest + case _: InternalClusterAction.InitJoinNack => InitJoinNackManifest + case _: ClusterHeartbeatSender.Heartbeat => HeartbeatManifest + case _: ClusterHeartbeatSender.HeartbeatRsp => HeartbeatRspManifest + case _: ExitingConfirmed => ExitingConfirmedManifest + case _: GossipStatus => GossipStatusManifest + case _: GossipEnvelope => GossipEnvelopeManifest + case _: ClusterRouterPool => ClusterRouterPoolManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } def toBinary(obj: AnyRef): Array[Byte] = obj match { - case ClusterHeartbeatSender.Heartbeat(from, _, _) => addressToProtoByteArray(from) - case ClusterHeartbeatSender.HeartbeatRsp(from, _, _) => uniqueAddressToProtoByteArray(from) + case hb: ClusterHeartbeatSender.Heartbeat => heartbeatToProtoByteArray(hb) + case hbr: ClusterHeartbeatSender.HeartbeatRsp => heartbeatRspToProtoByteArray(hbr) case m: GossipEnvelope => gossipEnvelopeToProto(m).toByteArray case m: GossipStatus => gossipStatusToProto(m).toByteArray case InternalClusterAction.Join(node, roles) => joinToProto(node, roles).toByteArray @@ -118,10 +114,22 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) } def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case HeartbeatManifest => deserializeHeartBeat(bytes) + case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes) + case GossipStatusManifest => deserializeGossipStatus(bytes) + case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) + case InitJoinManifest => deserializeInitJoin(bytes) + case InitJoinAckManifest => deserializeInitJoinAck(bytes) + case InitJoinNackManifest => deserializeInitJoinNack(bytes) + case JoinManifest => deserializeJoin(bytes) + case WelcomeManifest => deserializeWelcome(bytes) + case LeaveManifest => deserializeLeave(bytes) + case DownManifest => deserializeDown(bytes) + case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) + case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) + // needs to stay in 2.6.5 to be able to talk to a 2.5.{3,4} node during rolling upgrade case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes) case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes) - case HeartbeatManifest => deserializeHeartBeat(bytes) - case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes) case OldGossipStatusManifest => deserializeGossipStatus(bytes) case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes) case OldInitJoinManifest => deserializeInitJoin(bytes) @@ -133,17 +141,6 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) case OldDownManifest => deserializeDown(bytes) case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes) case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) - case GossipStatusManifest => deserializeGossipStatus(bytes) - case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) - case InitJoinManifest => deserializeInitJoin(bytes) - case InitJoinAckManifest => deserializeInitJoinAck(bytes) - case InitJoinNackManifest => deserializeInitJoinNack(bytes) - case JoinManifest => deserializeJoin(bytes) - case WelcomeManifest => deserializeWelcome(bytes) - case LeaveManifest => deserializeLeave(bytes) - case DownManifest => deserializeDown(bytes) - case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) - case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) case _ => throw new IllegalArgumentException(s"Unknown manifest [${manifest}]") } @@ -172,6 +169,26 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) out.toByteArray } + private def heartbeatToProtoByteArray(hb: ClusterHeartbeatSender.Heartbeat): Array[Byte] = { + cm.Heartbeat + .newBuilder() + .setFrom(addressToProto(hb.from)) + .setSequenceNr(hb.sequenceNr) + .setCreationTime(hb.creationTimeNanos) + .build + .toByteArray + } + + private def heartbeatRspToProtoByteArray(hbr: ClusterHeartbeatSender.HeartbeatRsp): Array[Byte] = { + cm.HeartBeatResponse + .newBuilder() + .setFrom(uniqueAddressToProto(hbr.from)) + .setSequenceNr(hbr.sequenceNr) + .setCreationTime(hbr.creationTimeNanos) + .build + .toByteArray + } + private def addressFromBinary(bytes: Array[Byte]): Address = addressFromProto(cm.Address.parseFrom(bytes)) diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 925d733abc..957dee214c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -5,14 +5,13 @@ package akka.cluster.protobuf import akka.cluster._ -import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } +import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.InternalClusterAction.CompatibleConfig import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } import akka.routing.RoundRobinPool -import akka.cluster.protobuf.msg.{ ClusterMessages => cm } import collection.immutable.SortedSet -import akka.testkit.{ AkkaSpec, TestKit } +import akka.testkit.AkkaSpec import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory @@ -105,38 +104,28 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust } // can be removed in 2.6.3 only checks deserialization with new not yet in effect manifests for 2.6.2 - "be serializable with new manifests for 2.6.3" in { + "be de-serializable with class manifests from 2.6.4 and earlier nodes" in { val address = Address("akka", "system", "some.host.org", 4711) val uniqueAddress = UniqueAddress(address, 17L) val address2 = Address("akka", "system", "other.host.org", 4711) val uniqueAddress2 = UniqueAddress(address2, 18L) checkDeserializationWithManifest( InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A")), - ClusterMessageSerializer.JoinManifest) + ClusterMessageSerializer.OldJoinManifest) checkDeserializationWithManifest(ClusterUserAction.Leave(address), ClusterMessageSerializer.LeaveManifest) checkDeserializationWithManifest(ClusterUserAction.Down(address), ClusterMessageSerializer.DownManifest) checkDeserializationWithManifest( InternalClusterAction.InitJoin(ConfigFactory.empty), - ClusterMessageSerializer.InitJoinManifest) + ClusterMessageSerializer.OldInitJoinManifest) checkDeserializationWithManifest( InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)), - ClusterMessageSerializer.InitJoinAckManifest) + ClusterMessageSerializer.OldInitJoinAckManifest) checkDeserializationWithManifest( InternalClusterAction.InitJoinNack(address), - ClusterMessageSerializer.InitJoinNackManifest) - /* this has changed in 2.5.23 but it seems we forgot to add the next step in 2.5.24 - so we can't do two-way like this, the new manifest actually expects an address + timestamp + seqnr only when the new manifest is used - see test below. - checkDeserializationWithManifest( - ClusterHeartbeatSender.Heartbeat(address, -1, -1), - ClusterMessageSerializer.HeartbeatManifest) - checkDeserializationWithManifest( - ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1), - ClusterMessageSerializer.HeartbeatRspManifest) - */ + ClusterMessageSerializer.OldInitJoinNackManifest) checkDeserializationWithManifest( InternalClusterAction.ExitingConfirmed(uniqueAddress), - ClusterMessageSerializer.ExitingConfirmedManifest) + ClusterMessageSerializer.OldExitingConfirmedManifest) val node1 = VectorClock.Node("node1") val node2 = VectorClock.Node("node2") @@ -149,85 +138,15 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust .unreachable(b1.uniqueAddress, e1.uniqueAddress) checkDeserializationWithManifest( GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1), - ClusterMessageSerializer.GossipEnvelopeManifest) + ClusterMessageSerializer.OldGossipEnvelopeManifest) checkDeserializationWithManifest( GossipStatus(a1.uniqueAddress, g1.version), - ClusterMessageSerializer.GossipStatusManifest) + ClusterMessageSerializer.OldGossipStatusManifest) checkDeserializationWithManifest( InternalClusterAction.Welcome(uniqueAddress, g2), - ClusterMessageSerializer.WelcomeManifest) - } - - "be compatible with wire format of version 2.5.9 (using InitJoin singleton instead of class)" in { - // we must use the old singleton class name so that the other side will see an InitJoin - // but discard the config as it does not know about the config check - val oldClassName = "akka.cluster.InternalClusterAction$InitJoin$" - serializer.manifest(InternalClusterAction.InitJoin(ConfigFactory.empty())) should ===(oldClassName) - - // in 2.5.9 and earlier, it was an object and serialized to empty byte array - // and we should accept that - val deserialized = serializer.fromBinary(Array.emptyByteArray, oldClassName) - deserialized shouldBe an[InternalClusterAction.InitJoin] - } - - "deserialize from wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { - // we must use the old singleton class name so that the other side will see an InitJoin - // but discard the config as it does not know about the config check - val initJoinAck = InternalClusterAction.InitJoinAck( - Address("akka", "cluster", "127.0.0.1", 2552), - InternalClusterAction.UncheckedConfig) - val serializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray - - val deserialized = - serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.OldInitJoinAckManifest) - deserialized shouldEqual initJoinAck - } - - "serialize to wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { - val initJoinAck = InternalClusterAction.InitJoinAck( - Address("akka", "cluster", "127.0.0.1", 2552), - InternalClusterAction.ConfigCheckUnsupportedByJoiningNode) - val bytes = serializer.toBinary(initJoinAck) - - val expectedSerializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray - bytes.toList should ===(expectedSerializedInitJoinAckPre2510.toList) - } - - "be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in { - val system = ActorSystem("ClusterMessageSerializer-old-wire-format") - - try { - val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) - - // the oldSnapshot was created with the version of ClusterRouterPoolSettings in Akka 2.5.3. See issue #23257. - // It was created with: - /* - import org.apache.commons.codec.binary.Hex.encodeHex - val bytes = serializer.toBinary( - ClusterRouterPool(RoundRobinPool(nrOfInstances = 4), ClusterRouterPoolSettings(123, 345, true, Some("role ABC")))) - println(String.valueOf(encodeHex(bytes))) - */ - - val oldBytesHex = "0a0f08101205524f5252501a04080418001211087b10d90218012208726f6c6520414243" - - import org.apache.commons.codec.binary.Hex.decodeHex - val oldBytes = decodeHex(oldBytesHex.toCharArray) - val result = serializer.fromBinary(oldBytes, classOf[ClusterRouterPool]) - - result match { - case pool: ClusterRouterPool => - pool.settings.totalInstances should ===(123) - pool.settings.maxInstancesPerNode should ===(345) - pool.settings.allowLocalRoutees should ===(true) - pool.settings.useRole should ===(Some("role ABC")) - pool.settings.useRoles should ===(Set("role ABC")) - } - } finally { - TestKit.shutdownActorSystem(system) - } - + ClusterMessageSerializer.OldWelcomeManifest) } "add a default data center role to gossip if none is present" in { @@ -242,43 +161,20 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust } } + // support for deserializing a new format with a string based manifest was added in 2.5.23 but the next step + // was never done, meaning that 2.6.4 still emits the old format "Rolling upgrades for heart beat message changes in 2.5.23" must { - // FIXME, add issue for serializing this as the new message type - "serialize heart beats as Address to support versions prior or 2.5.23" in { - serializer.manifest(ClusterHeartbeatSender.Heartbeat(a1.address, -1, -1)) should ===( - ClusterMessageSerializer.HeartBeatManifestPre2523) + "deserialize heart beats represented by just an address Address to support versions prior or 2.6.5" in { + val serialized = serializer.addressToProto(a1.address).build().toByteArray + val deserialized = serializer.fromBinary(serialized, ClusterMessageSerializer.HeartBeatManifestPre2523) + deserialized should ===(ClusterHeartbeatSender.Heartbeat(a1.address, -1, -1)) } - "serialize heart beat responses as UniqueAddress to support versions prior to 2.5.23" in { - serializer.manifest(ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, -1, -1)) should ===( - ClusterMessageSerializer.HeartBeatRspManifest2523) - } - - "be able to deserialize HeartBeat protobuf message" in { - val hbProtobuf = cm.Heartbeat - .newBuilder() - .setFrom(serializer.addressToProto(a1.address)) - .setSequenceNr(1) - .setCreationTime(2) - .build() - .toByteArray - - serializer.fromBinary(hbProtobuf, ClusterMessageSerializer.HeartbeatManifest) should ===( - ClusterHeartbeatSender.Heartbeat(a1.address, 1, 2)) - } - - "be able to deserialize HeartBeatRsp probuf message" in { - val hbrProtobuf = cm.HeartBeatResponse - .newBuilder() - .setFrom(serializer.uniqueAddressToProto(a1.uniqueAddress)) - .setSequenceNr(1) - .setCreationTime(2) - .build() - .toByteArray - - serializer.fromBinary(hbrProtobuf, ClusterMessageSerializer.HeartbeatRspManifest) should ===( - ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, 1, 2)) + "deserialize heart beat responses as UniqueAddress to support versions prior to 2.5.23" in { + val serialized = serializer.uniqueAddressToProto(a1.uniqueAddress).build().toByteArray + val deserialized = serializer.fromBinary(serialized, ClusterMessageSerializer.HeartBeatRspManifest2523) + deserialized should ===(ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, -1, -1)) } }