diff --git a/akka-cluster/src/main/mima-filters/2.6.1.backwards.excludes/issue-13654-no-class-manifests-for-cluster-serializer-step1.excludes b/akka-cluster/src/main/mima-filters/2.6.1.backwards.excludes/issue-13654-no-class-manifests-for-cluster-serializer-step1.excludes new file mode 100644 index 0000000000..8051d12e49 --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.6.1.backwards.excludes/issue-13654-no-class-manifests-for-cluster-serializer-step1.excludes @@ -0,0 +1,5 @@ +# internals changed +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.protobuf.ClusterMessageSerializer.HeartBeatRspManifest") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.protobuf.ClusterMessageSerializer.HeartBeatManifest") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.protobuf.ClusterMessageSerializer.HeartBeatManifest") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.protobuf.ClusterMessageSerializer.HeartBeatRspManifest") \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 022b807dba..43e0f5f049 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -31,27 +31,40 @@ import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } @InternalApi @ccompatUsedUntil213 private[akka] object ClusterMessageSerializer { - // FIXME use short manifests when we can break wire compatibility + // Kept for one version iteration from 2.6.2 to allow rolling migration to short manifests + // will be removed in 2.6.3 // needs to be full class names for backwards compatibility - val JoinManifest = s"akka.cluster.InternalClusterAction$$Join" - val WelcomeManifest = s"akka.cluster.InternalClusterAction$$Welcome" - val LeaveManifest = s"akka.cluster.ClusterUserAction$$Leave" - val DownManifest = s"akka.cluster.ClusterUserAction$$Down" + val OldJoinManifest = s"akka.cluster.InternalClusterAction$$Join" + val OldWelcomeManifest = s"akka.cluster.InternalClusterAction$$Welcome" + val OldLeaveManifest = s"akka.cluster.ClusterUserAction$$Leave" + val OldDownManifest = s"akka.cluster.ClusterUserAction$$Down" // #24622 wire compatibility // we need to use this object name rather than classname to be able to join a 2.5.9 cluster during rolling upgrades - val InitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$" - val InitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck" - val InitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack" + val OldInitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$" + val OldInitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck" + val OldInitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack" // FIXME, remove in a later version (2.6?) and make 2.5.24+ a mandatory step for rolling upgrade val HeartBeatManifestPre2523 = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat" val HeartBeatRspManifest2523 = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp" + val OldExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed" + val OldGossipStatusManifest = "akka.cluster.GossipStatus" + val OldGossipEnvelopeManifest = "akka.cluster.GossipEnvelope" + val OldClusterRouterPoolManifest = "akka.cluster.routing.ClusterRouterPool" - val HeartBeatManifest = "HB" - val HeartBeatRspManifest = "HBR" - val ExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed" - val GossipStatusManifest = "akka.cluster.GossipStatus" - val GossipEnvelopeManifest = "akka.cluster.GossipEnvelope" - val ClusterRouterPoolManifest = "akka.cluster.routing.ClusterRouterPool" + // is handled on the deserializing side in 2.6.2 and then on the serializing side in 2.6.3 + val JoinManifest = "J" + val WelcomeManifest = "W" + val LeaveManifest = "L" + val DownManifest = "D" + val InitJoinManifest = "IJ" + val InitJoinAckManifest = "IJA" + val InitJoinNackManifest = "IJN" + val HeartbeatManifest = "HB" + val HeartbeatRspManifest = "HBR" + val ExitingConfirmedManifest = "EC" + val GossipStatusManifest = "GS" + val GossipEnvelopeManifest = "GE" + val ClusterRouterPoolManifest = "CRP" private final val BufferSize = 1024 * 4 } @@ -69,19 +82,19 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive def manifest(o: AnyRef): String = o match { - case _: InternalClusterAction.Join => JoinManifest - case _: InternalClusterAction.Welcome => WelcomeManifest - case _: ClusterUserAction.Leave => LeaveManifest - case _: ClusterUserAction.Down => DownManifest - case _: InternalClusterAction.InitJoin => InitJoinManifest - case _: InternalClusterAction.InitJoinAck => InitJoinAckManifest - case _: InternalClusterAction.InitJoinNack => InitJoinNackManifest + case _: InternalClusterAction.Join => OldJoinManifest + case _: InternalClusterAction.Welcome => OldWelcomeManifest + case _: ClusterUserAction.Leave => OldLeaveManifest + case _: ClusterUserAction.Down => OldDownManifest + case _: InternalClusterAction.InitJoin => OldInitJoinManifest + case _: InternalClusterAction.InitJoinAck => OldInitJoinAckManifest + case _: InternalClusterAction.InitJoinNack => OldInitJoinNackManifest case _: ClusterHeartbeatSender.Heartbeat => HeartBeatManifestPre2523 case _: ClusterHeartbeatSender.HeartbeatRsp => HeartBeatRspManifest2523 - case _: ExitingConfirmed => ExitingConfirmedManifest - case _: GossipStatus => GossipStatusManifest - case _: GossipEnvelope => GossipEnvelopeManifest - case _: ClusterRouterPool => ClusterRouterPoolManifest + case _: ExitingConfirmed => OldExitingConfirmedManifest + case _: GossipStatus => OldGossipStatusManifest + case _: GossipEnvelope => OldGossipEnvelopeManifest + case _: ClusterRouterPool => OldClusterRouterPoolManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } @@ -105,22 +118,33 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) } def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { - case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes) - case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes) - case HeartBeatManifest => deserializeHeartBeat(bytes) - case HeartBeatRspManifest => deserializeHeartBeatResponse(bytes) - case GossipStatusManifest => deserializeGossipStatus(bytes) - case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) - case InitJoinManifest => deserializeInitJoin(bytes) - case InitJoinAckManifest => deserializeInitJoinAck(bytes) - case InitJoinNackManifest => deserializeInitJoinNack(bytes) - case JoinManifest => deserializeJoin(bytes) - case WelcomeManifest => deserializeWelcome(bytes) - case LeaveManifest => deserializeLeave(bytes) - case DownManifest => deserializeDown(bytes) - case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) - case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) - case _ => throw new IllegalArgumentException(s"Unknown manifest [${manifest}]") + case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes) + case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes) + case HeartbeatManifest => deserializeHeartBeat(bytes) + case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes) + case OldGossipStatusManifest => deserializeGossipStatus(bytes) + case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes) + case OldInitJoinManifest => deserializeInitJoin(bytes) + case OldInitJoinAckManifest => deserializeInitJoinAck(bytes) + case OldInitJoinNackManifest => deserializeInitJoinNack(bytes) + case OldJoinManifest => deserializeJoin(bytes) + case OldWelcomeManifest => deserializeWelcome(bytes) + case OldLeaveManifest => deserializeLeave(bytes) + case OldDownManifest => deserializeDown(bytes) + case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes) + case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) + case GossipStatusManifest => deserializeGossipStatus(bytes) + case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) + case InitJoinManifest => deserializeInitJoin(bytes) + case InitJoinAckManifest => deserializeInitJoinAck(bytes) + case InitJoinNackManifest => deserializeInitJoinNack(bytes) + case JoinManifest => deserializeJoin(bytes) + case WelcomeManifest => deserializeWelcome(bytes) + case LeaveManifest => deserializeLeave(bytes) + case DownManifest => deserializeDown(bytes) + case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) + case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) + case _ => throw new IllegalArgumentException(s"Unknown manifest [${manifest}]") } def compress(msg: MessageLite): Array[Byte] = { diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 5bce194048..925d733abc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -36,7 +36,22 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust case (_, ref) => ref should ===(obj) } + } + private def roundtripWithManifest[T <: AnyRef](obj: T, manifest: String): T = { + val blob = serializer.toBinary(obj) + serializer.fromBinary(blob, manifest).asInstanceOf[T] + } + + private def checkDeserializationWithManifest(obj: AnyRef, deserializationManifest: String): Unit = { + (obj, roundtripWithManifest(obj, deserializationManifest)) match { + case (env: GossipEnvelope, env2: GossipEnvelope) => + env2.from should ===(env.from) + env2.to should ===(env.to) + env2.gossip should ===(env.gossip) + case (_, ref) => + ref should ===(obj) + } } import MemberStatus._ @@ -89,6 +104,62 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) } + // can be removed in 2.6.3 only checks deserialization with new not yet in effect manifests for 2.6.2 + "be serializable with new manifests for 2.6.3" in { + val address = Address("akka", "system", "some.host.org", 4711) + val uniqueAddress = UniqueAddress(address, 17L) + val address2 = Address("akka", "system", "other.host.org", 4711) + val uniqueAddress2 = UniqueAddress(address2, 18L) + checkDeserializationWithManifest( + InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A")), + ClusterMessageSerializer.JoinManifest) + checkDeserializationWithManifest(ClusterUserAction.Leave(address), ClusterMessageSerializer.LeaveManifest) + checkDeserializationWithManifest(ClusterUserAction.Down(address), ClusterMessageSerializer.DownManifest) + checkDeserializationWithManifest( + InternalClusterAction.InitJoin(ConfigFactory.empty), + ClusterMessageSerializer.InitJoinManifest) + checkDeserializationWithManifest( + InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)), + ClusterMessageSerializer.InitJoinAckManifest) + checkDeserializationWithManifest( + InternalClusterAction.InitJoinNack(address), + ClusterMessageSerializer.InitJoinNackManifest) + /* this has changed in 2.5.23 but it seems we forgot to add the next step in 2.5.24 + so we can't do two-way like this, the new manifest actually expects an address + timestamp + seqnr only when the new manifest is used + see test below. + checkDeserializationWithManifest( + ClusterHeartbeatSender.Heartbeat(address, -1, -1), + ClusterMessageSerializer.HeartbeatManifest) + checkDeserializationWithManifest( + ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1), + ClusterMessageSerializer.HeartbeatRspManifest) + */ + checkDeserializationWithManifest( + InternalClusterAction.ExitingConfirmed(uniqueAddress), + ClusterMessageSerializer.ExitingConfirmedManifest) + + val node1 = VectorClock.Node("node1") + val node2 = VectorClock.Node("node2") + val node3 = VectorClock.Node("node3") + val node4 = VectorClock.Node("node4") + val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1 :+ node2).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress) + val reachability3 = Reachability.empty + .unreachable(a1.uniqueAddress, e1.uniqueAddress) + .unreachable(b1.uniqueAddress, e1.uniqueAddress) + checkDeserializationWithManifest( + GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1), + ClusterMessageSerializer.GossipEnvelopeManifest) + + checkDeserializationWithManifest( + GossipStatus(a1.uniqueAddress, g1.version), + ClusterMessageSerializer.GossipStatusManifest) + + checkDeserializationWithManifest( + InternalClusterAction.Welcome(uniqueAddress, g2), + ClusterMessageSerializer.WelcomeManifest) + } + "be compatible with wire format of version 2.5.9 (using InitJoin singleton instead of class)" in { // we must use the old singleton class name so that the other side will see an InitJoin // but discard the config as it does not know about the config check @@ -110,7 +181,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust val serializedInitJoinAckPre2510 = serializer.addressToProto(initJoinAck.address).build().toByteArray val deserialized = - serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.InitJoinAckManifest) + serializer.fromBinary(serializedInitJoinAckPre2510, ClusterMessageSerializer.OldInitJoinAckManifest) deserialized shouldEqual initJoinAck } @@ -193,7 +264,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust .build() .toByteArray - serializer.fromBinary(hbProtobuf, ClusterMessageSerializer.HeartBeatManifest) should ===( + serializer.fromBinary(hbProtobuf, ClusterMessageSerializer.HeartbeatManifest) should ===( ClusterHeartbeatSender.Heartbeat(a1.address, 1, 2)) } @@ -206,7 +277,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust .build() .toByteArray - serializer.fromBinary(hbrProtobuf, ClusterMessageSerializer.HeartBeatRspManifest) should ===( + serializer.fromBinary(hbrProtobuf, ClusterMessageSerializer.HeartbeatRspManifest) should ===( ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, 1, 2)) } } diff --git a/akka-docs/src/main/paradox/project/rolling-update.md b/akka-docs/src/main/paradox/project/rolling-update.md index 502cb169b8..ab7a067ccf 100644 --- a/akka-docs/src/main/paradox/project/rolling-update.md +++ b/akka-docs/src/main/paradox/project/rolling-update.md @@ -73,3 +73,16 @@ versions 2.5.18, 2.5.19, 2.5.20 or 2.5.21. ### 2.6.0 Several changes in minor release See @ref:[migration guide](migration-guide-2.5.x-2.6.x.md) when updating from 2.5.x to 2.6.x. + +### 2.6.2 ClusterMessageSerializer manifests change + +Issue: [#23654](https://github.com/akka/akka/issues/13654) + +In preparation of switching away from class based manifests to more efficient letter codes the `ClusterMessageSerializer` +has been prepared to accept those shorter forms but still emits the old long manifests. + +* 2.6.2 - shorter manifests accepted +* 2.6.3 - shorter manifests emitted (not released yet) + +This means that a rolling upgrade will have to go through 2.6.2 and 2.6.3 when upgrading to 2.6.3 or higher or else +cluster nodes will not be able to communicate during the rolling upgrade. \ No newline at end of file