diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 27c550f9d4..e1334f3a03 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -369,17 +369,10 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR // Using null as the cause to avoid a large serialized message and JDK differences verify( Create(Some(null)), - if (scala.util.Properties.versionNumberString.startsWith("2.10.")) { - "aced00057372001b616b6b612e64697370617463682e7379736d73672e4372656174650000000000" + - "0000010200014c00076661696c75726574000e4c7363616c612f4f7074696f6e3b78707372000a73" + - "63616c612e536f6d65e2a09f87fc0836ae0200014c0001787400124c6a6176612f6c616e672f4f62" + - "6a6563743b7872000c7363616c612e4f7074696f6ee36024a8328a45e9020000787070" - } else { - "aced00057372001b616b6b612e64697370617463682e7379736d73672e4372656174650000000000" + - "0000010200014c00076661696c75726574000e4c7363616c612f4f7074696f6e3b78707372000a73" + - "63616c612e536f6d651122f2695ea18b740200014c0001787400124c6a6176612f6c616e672f4f62" + - "6a6563743b7872000c7363616c612e4f7074696f6efe6937fddb0e6674020000787070" - }) + "aced00057372001b616b6b612e64697370617463682e7379736d73672e4372656174650000000000" + + "0000010200014c00076661696c75726574000e4c7363616c612f4f7074696f6e3b78707372000a73" + + "63616c612e536f6d651122f2695ea18b740200014c0001787400124c6a6176612f6c616e672f4f62" + + "6a6563743b7872000c7363616c612e4f7074696f6efe6937fddb0e6674020000787070") } "be preserved for the Recreate SystemMessage" in { verify( diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index 008195c557..f7b9e901ce 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -2196,17 +2196,17 @@ public final class ClusterMessages { */ akka.cluster.protobuf.msg.ClusterMessages.AddressOrBuilder getAddressOrBuilder(); - // optional .ConfigCheck configCheck = 2; + // required .ConfigCheck configCheck = 2; /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ boolean hasConfigCheck(); /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck getConfigCheck(); /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ akka.cluster.protobuf.msg.ClusterMessages.ConfigCheckOrBuilder getConfigCheckOrBuilder(); } @@ -2354,23 +2354,23 @@ public final class ClusterMessages { return address_; } - // optional .ConfigCheck configCheck = 2; + // required .ConfigCheck configCheck = 2; public static final int CONFIGCHECK_FIELD_NUMBER = 2; private akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck configCheck_; /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public boolean hasConfigCheck() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck getConfigCheck() { return configCheck_; } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public akka.cluster.protobuf.msg.ClusterMessages.ConfigCheckOrBuilder getConfigCheckOrBuilder() { return configCheck_; @@ -2389,15 +2389,17 @@ public final class ClusterMessages { memoizedIsInitialized = 0; return false; } + if (!hasConfigCheck()) { + memoizedIsInitialized = 0; + return false; + } if (!getAddress().isInitialized()) { memoizedIsInitialized = 0; return false; } - if (hasConfigCheck()) { - if (!getConfigCheck().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } + if (!getConfigCheck().isInitialized()) { + memoizedIsInitialized = 0; + return false; } memoizedIsInitialized = 1; return true; @@ -2639,15 +2641,17 @@ public final class ClusterMessages { return false; } + if (!hasConfigCheck()) { + + return false; + } if (!getAddress().isInitialized()) { return false; } - if (hasConfigCheck()) { - if (!getConfigCheck().isInitialized()) { - - return false; - } + if (!getConfigCheck().isInitialized()) { + + return false; } return true; } @@ -2788,18 +2792,18 @@ public final class ClusterMessages { return addressBuilder_; } - // optional .ConfigCheck configCheck = 2; + // required .ConfigCheck configCheck = 2; private akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck configCheck_ = akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck.getDefaultInstance(); private akka.protobuf.SingleFieldBuilder< akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck, akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck.Builder, akka.cluster.protobuf.msg.ClusterMessages.ConfigCheckOrBuilder> configCheckBuilder_; /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public boolean hasConfigCheck() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck getConfigCheck() { if (configCheckBuilder_ == null) { @@ -2809,7 +2813,7 @@ public final class ClusterMessages { } } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public Builder setConfigCheck(akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck value) { if (configCheckBuilder_ == null) { @@ -2825,7 +2829,7 @@ public final class ClusterMessages { return this; } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public Builder setConfigCheck( akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck.Builder builderForValue) { @@ -2839,7 +2843,7 @@ public final class ClusterMessages { return this; } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public Builder mergeConfigCheck(akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck value) { if (configCheckBuilder_ == null) { @@ -2858,7 +2862,7 @@ public final class ClusterMessages { return this; } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public Builder clearConfigCheck() { if (configCheckBuilder_ == null) { @@ -2871,7 +2875,7 @@ public final class ClusterMessages { return this; } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck.Builder getConfigCheckBuilder() { bitField0_ |= 0x00000002; @@ -2879,7 +2883,7 @@ public final class ClusterMessages { return getConfigCheckFieldBuilder().getBuilder(); } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ public akka.cluster.protobuf.msg.ClusterMessages.ConfigCheckOrBuilder getConfigCheckOrBuilder() { if (configCheckBuilder_ != null) { @@ -2889,7 +2893,7 @@ public final class ClusterMessages { } } /** - * optional .ConfigCheck configCheck = 2; + * required .ConfigCheck configCheck = 2; */ private akka.protobuf.SingleFieldBuilder< akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck, akka.cluster.protobuf.msg.ClusterMessages.ConfigCheck.Builder, akka.cluster.protobuf.msg.ClusterMessages.ConfigCheckOrBuilder> @@ -18058,7 +18062,7 @@ public final class ClusterMessages { "Welcome\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\027\n" + "\006gossip\030\002 \002(\0132\007.Gossip\"!\n\010InitJoin\022\025\n\rcu" + "rrentConfig\030\001 \001(\t\"K\n\013InitJoinAck\022\031\n\007addr" + - "ess\030\001 \002(\0132\010.Address\022!\n\013configCheck\030\002 \001(\013" + + "ess\030\001 \002(\0132\010.Address\022!\n\013configCheck\030\002 \002(\013" + "2\014.ConfigCheck\"\226\001\n\013ConfigCheck\022\037\n\004type\030\001" + " \002(\0162\021.ConfigCheck.Type\022\025\n\rclusterConfig" + "\030\002 \001(\t\"I\n\004Type\022\023\n\017UncheckedConfig\020\001\022\026\n\022I" + diff --git a/akka-cluster/src/main/mima-filters/2.5.10.backwards.excludes b/akka-cluster/src/main/mima-filters/2.5.10.backwards.excludes new file mode 100644 index 0000000000..16d6ad1b3c --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.5.10.backwards.excludes @@ -0,0 +1,6 @@ +# #24622 wire compat regression +ProblemFilters.exclude[FinalClassProblem]("akka.cluster.protobuf.ClusterMessageSerializer") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.protobuf.ClusterMessageSerializer.clusterRouterPoolFromBinary") +# these are needed because of bug #196 in MiMa +ProblemFilters.exclude[FinalMethodProblem]("akka.serialization.SerializerWithStringManifest.includeManifest") +ProblemFilters.exclude[FinalMethodProblem]("akka.serialization.SerializerWithStringManifest.fromBinary") diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 57f735d6b3..2f3ad27c22 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -51,7 +51,7 @@ message InitJoin { */ message InitJoinAck { required Address address = 1; - optional ConfigCheck configCheck = 2; + required ConfigCheck configCheck = 2; } message ConfigCheck { diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 01161f9523..7128052c24 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -16,74 +16,66 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.collection.JavaConverters._ import scala.concurrent.duration.Deadline -import java.io.NotSerializableException +import akka.annotation.InternalApi import akka.cluster.InternalClusterAction._ import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } import akka.routing.Pool import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } /** - * Protobuf serializer of cluster messages. + * INTERNAL API */ -class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { - - private lazy val serialization = SerializationExtension(system) +@InternalApi +private[akka] object ClusterMessageSerializer { + // FIXME use short manifests when we can break wire compatibility + // needs to be full class names for backwards compatibility + val JoinManifest = s"akka.cluster.InternalClusterAction$$Join" + val WelcomeManifest = s"akka.cluster.InternalClusterAction$$Welcome" + val LeaveManifest = s"akka.cluster.ClusterUserAction$$Leave" + val DownManifest = s"akka.cluster.ClusterUserAction$$Down" + // #24622 wire compatibility + // we need to use this object name rather than classname to be able to join a 2.5.9 cluster during rolling upgrades + val InitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$" + val InitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck" + val InitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack" + val HeartBeatManifest = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat" + val HeartBeatRspManifest = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp" + val ExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed" + val GossipStatusManifest = "akka.cluster.GossipStatus" + val GossipEnvelopeManifest = "akka.cluster.GossipEnvelope" + val ClusterRouterPoolManifest = "akka.cluster.routing.ClusterRouterPool" private final val BufferSize = 1024 * 4 +} + +/** + * Protobuf serializer of cluster messages. + */ +final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { + import ClusterMessageSerializer._ + private lazy val serialization = SerializationExtension(system) + // must be lazy because serializer is initialized from Cluster extension constructor private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive - private val fromBinaryMap = collection.immutable.HashMap[Class[_], Array[Byte] ⇒ AnyRef]( - classOf[InternalClusterAction.Join] → { - case bytes ⇒ - val m = cm.Join.parseFrom(bytes) - val roles = Set.empty[String] ++ m.getRolesList.asScala - InternalClusterAction.Join( - uniqueAddressFromProto(m.getNode), - if (roles.exists(_.startsWith(ClusterSettings.DcRolePrefix))) roles - else roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter) - ) - }, - classOf[InternalClusterAction.Welcome] → { - case bytes ⇒ - val m = cm.Welcome.parseFrom(decompress(bytes)) - InternalClusterAction.Welcome(uniqueAddressFromProto(m.getFrom), gossipFromProto(m.getGossip)) - }, - classOf[ClusterUserAction.Leave] → (bytes ⇒ ClusterUserAction.Leave(addressFromBinary(bytes))), - classOf[ClusterUserAction.Down] → (bytes ⇒ ClusterUserAction.Down(addressFromBinary(bytes))), - classOf[InternalClusterAction.InitJoin] → { - case bytes ⇒ - val m = cm.InitJoin.parseFrom(bytes) - if (m.hasCurrentConfig) - InternalClusterAction.InitJoin(ConfigFactory.parseString(m.getCurrentConfig)) - else - InternalClusterAction.InitJoin(ConfigFactory.empty) - }, - classOf[InternalClusterAction.InitJoinAck] → { - case bytes ⇒ - val i = cm.InitJoinAck.parseFrom(bytes) - val configCheck = - if (i.hasConfigCheck) { - i.getConfigCheck.getType match { - case cm.ConfigCheck.Type.CompatibleConfig ⇒ CompatibleConfig(ConfigFactory.parseString(i.getConfigCheck.getClusterConfig)) - case cm.ConfigCheck.Type.IncompatibleConfig ⇒ IncompatibleConfig - case cm.ConfigCheck.Type.UncheckedConfig ⇒ UncheckedConfig - } - } else UncheckedConfig - - InternalClusterAction.InitJoinAck(addressFromProto(i.getAddress), configCheck) - }, - classOf[InternalClusterAction.InitJoinNack] → (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), - classOf[ClusterHeartbeatSender.Heartbeat] → (bytes ⇒ ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))), - classOf[ClusterHeartbeatSender.HeartbeatRsp] → (bytes ⇒ ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))), - classOf[ExitingConfirmed] → (bytes ⇒ InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))), - classOf[GossipStatus] → gossipStatusFromBinary, - classOf[GossipEnvelope] → gossipEnvelopeFromBinary, - classOf[ClusterRouterPool] → clusterRouterPoolFromBinary - ) - - def includeManifest: Boolean = true + def manifest(o: AnyRef): String = o match { + case _: InternalClusterAction.Join ⇒ JoinManifest + case _: InternalClusterAction.Welcome ⇒ WelcomeManifest + case _: ClusterUserAction.Leave ⇒ LeaveManifest + case _: ClusterUserAction.Down ⇒ DownManifest + case _: InternalClusterAction.InitJoin ⇒ InitJoinManifest + case _: InternalClusterAction.InitJoinAck ⇒ InitJoinAckManifest + case _: InternalClusterAction.InitJoinNack ⇒ InitJoinNackManifest + case _: ClusterHeartbeatSender.Heartbeat ⇒ HeartBeatManifest + case _: ClusterHeartbeatSender.HeartbeatRsp ⇒ HeartBeatRspManifest + case _: ExitingConfirmed ⇒ ExitingConfirmedManifest + case _: GossipStatus ⇒ GossipStatusManifest + case _: GossipEnvelope ⇒ GossipEnvelopeManifest + case _: ClusterRouterPool ⇒ ClusterRouterPoolManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") + } def toBinary(obj: AnyRef): Array[Byte] = obj match { case ClusterHeartbeatSender.Heartbeat(from) ⇒ addressToProtoByteArray(from) @@ -100,7 +92,24 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri case InternalClusterAction.ExitingConfirmed(node) ⇒ uniqueAddressToProtoByteArray(node) case rp: ClusterRouterPool ⇒ clusterRouterPoolToProtoByteArray(rp) case _ ⇒ - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case HeartBeatManifest ⇒ deserializeHeartBeat(bytes) + case HeartBeatRspManifest ⇒ deserializeHeartBeatRsp(bytes) + case GossipStatusManifest ⇒ deserializeGossipStatus(bytes) + case GossipEnvelopeManifest ⇒ deserializeGossipEnvelope(bytes) + case InitJoinManifest ⇒ deserializeInitJoin(bytes) + case InitJoinAckManifest ⇒ deserializeInitJoinAck(bytes) + case InitJoinNackManifest ⇒ deserializeInitJoinNack(bytes) + case JoinManifest ⇒ deserializeJoin(bytes) + case WelcomeManifest ⇒ deserializeWelcome(bytes) + case LeaveManifest ⇒ deserializeLeave(bytes) + case DownManifest ⇒ deserializeDown(bytes) + case ExitingConfirmedManifest ⇒ deserializeExitingConfirmed(bytes) + case ClusterRouterPoolManifest ⇒ deserializeClusterRouterPool(bytes) + case _ ⇒ throw new IllegalArgumentException(s"Unknown manifest [${manifest}]") } def compress(msg: MessageLite): Array[Byte] = { @@ -128,22 +137,13 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri out.toByteArray } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { - case Some(c) ⇒ - fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match { - case Some(f) ⇒ f(bytes) - case None ⇒ throw new NotSerializableException(s"Unimplemented deserialization of message class $c in ClusterSerializer") - } - case _ ⇒ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer") - } - private def addressFromBinary(bytes: Array[Byte]): Address = addressFromProto(cm.Address.parseFrom(bytes)) private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress = uniqueAddressFromProto(cm.UniqueAddress.parseFrom(bytes)) - private def addressToProto(address: Address): cm.Address.Builder = address match { + private[akka] def addressToProto(address: Address): cm.Address.Builder = address match { case Address(protocol, actorSystem, Some(host), Some(port)) ⇒ cm.Address.newBuilder().setSystem(actorSystem).setHostname(host).setPort(port).setProtocol(protocol) case _ ⇒ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.") @@ -224,6 +224,71 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri } } + private def deserializeJoin(bytes: Array[Byte]): InternalClusterAction.Join = { + val m = cm.Join.parseFrom(bytes) + val roles = Set.empty[String] ++ m.getRolesList.asScala + InternalClusterAction.Join( + uniqueAddressFromProto(m.getNode), + if (roles.exists(_.startsWith(ClusterSettings.DcRolePrefix))) roles + else roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter) + ) + } + + private def deserializeWelcome(bytes: Array[Byte]): InternalClusterAction.Welcome = { + val m = cm.Welcome.parseFrom(decompress(bytes)) + InternalClusterAction.Welcome(uniqueAddressFromProto(m.getFrom), gossipFromProto(m.getGossip)) + } + + private def deserializeLeave(bytes: Array[Byte]): ClusterUserAction.Leave = { + ClusterUserAction.Leave(addressFromBinary(bytes)) + } + + private def deserializeDown(bytes: Array[Byte]): ClusterUserAction.Down = { + ClusterUserAction.Down(addressFromBinary(bytes)) + } + + private def deserializeInitJoin(bytes: Array[Byte]): InternalClusterAction.InitJoin = { + val m = cm.InitJoin.parseFrom(bytes) + if (m.hasCurrentConfig) + InternalClusterAction.InitJoin(ConfigFactory.parseString(m.getCurrentConfig)) + else + InternalClusterAction.InitJoin(ConfigFactory.empty) + } + + private def deserializeInitJoinAck(bytes: Array[Byte]): InternalClusterAction.InitJoinAck = { + try { + val i = cm.InitJoinAck.parseFrom(bytes) + val configCheck = + i.getConfigCheck.getType match { + case cm.ConfigCheck.Type.CompatibleConfig ⇒ CompatibleConfig(ConfigFactory.parseString(i.getConfigCheck.getClusterConfig)) + case cm.ConfigCheck.Type.IncompatibleConfig ⇒ IncompatibleConfig + case cm.ConfigCheck.Type.UncheckedConfig ⇒ UncheckedConfig + } + + InternalClusterAction.InitJoinAck(addressFromProto(i.getAddress), configCheck) + } catch { + case ex: akka.protobuf.InvalidProtocolBufferException ⇒ + // nodes previous to 2.5.9 sends just an address + InternalClusterAction.InitJoinAck(addressFromBinary(bytes), UncheckedConfig) + } + } + + private def deserializeExitingConfirmed(bytes: Array[Byte]): InternalClusterAction.ExitingConfirmed = { + InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes)) + } + + private def deserializeHeartBeatRsp(bytes: Array[Byte]): ClusterHeartbeatSender.HeartbeatRsp = { + ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes)) + } + + private def deserializeHeartBeat(bytes: Array[Byte]): ClusterHeartbeatSender.Heartbeat = { + ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes)) + } + + private def deserializeInitJoinNack(bytes: Array[Byte]): InternalClusterAction.InitJoinNack = { + InternalClusterAction.InitJoinNack(addressFromBinary(bytes)) + } + private def addressFromProto(address: cm.Address): Address = Address(getProtocol(address), getSystem(address), address.getHostname, address.getPort) @@ -370,10 +435,10 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri setVersion(vectorClockToProto(status.version, hashMapping)).build() } - private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = + private def deserializeGossipEnvelope(bytes: Array[Byte]): GossipEnvelope = gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(bytes)) - private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = + private def deserializeGossipStatus(bytes: Array[Byte]): GossipStatus = gossipStatusFromProto(cm.GossipStatus.parseFrom(bytes)) private def gossipFromProto(gossip: cm.Gossip): Gossip = { @@ -449,7 +514,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri status.getVersion, status.getAllHashesList.asScala.toVector)) - def clusterRouterPoolFromBinary(bytes: Array[Byte]): ClusterRouterPool = { + def deserializeClusterRouterPool(bytes: Array[Byte]): ClusterRouterPool = { val crp = cm.ClusterRouterPool.parseFrom(bytes) ClusterRouterPool( diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index bd5371cf45..a8a192f51e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -19,8 +19,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec( val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) def roundtrip[T <: AnyRef](obj: T): T = { + val manifest = serializer.manifest(obj) val blob = serializer.toBinary(obj) - serializer.fromBinary(blob, obj.getClass).asInstanceOf[T] + serializer.fromBinary(blob, manifest).asInstanceOf[T] } def checkSerialization(obj: AnyRef): Unit = { @@ -82,6 +83,30 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) } + "be compatible with wire format of version 2.5.9 (using InitJoin singleton instead of class)" in { + // we must use the old singleton class name so that the other side will see an InitJoin + // but discard the config as it does not know about the config check + val oldClassName = "akka.cluster.InternalClusterAction$InitJoin$" + serializer.manifest(InternalClusterAction.InitJoin(ConfigFactory.empty())) should ===(oldClassName) + + // in 2.5.9 and earlier, it was an object and serialized to empty byte array + // and we should accept that + val deserialized = serializer.fromBinary(Array.emptyByteArray, oldClassName) + deserialized shouldBe an[InternalClusterAction.InitJoin] + } + + "be compatible with wire format of version 2.5.9 (using serialized address for InitJoinAck)" in { + // we must use the old singleton class name so that the other side will see an InitJoin + // but discard the config as it does not know about the config check + val initJoinAck = InternalClusterAction.InitJoinAck( + Address("akka.tcp", "cluster", "127.0.0.1", 2552), + InternalClusterAction.UncheckedConfig) + val serializedinInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray + + val deserialized = serializer.fromBinary(serializedinInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest) + deserialized shouldEqual initJoinAck + } + "be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in { val system = ActorSystem("ClusterMessageSerializer-old-wire-format") diff --git a/project/MiMa.scala b/project/MiMa.scala index 7b847bee6d..9bf0e2e898 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -10,7 +10,7 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.autoImport._ object MiMa extends AutoPlugin { - private val latestMinorOf25 = 9 + private val latestMinorOf25 = 10 private val latestMinorOf24 = 20 override def requires = MimaPlugin