From b72312d428223156c10130e19f921e8ae5f336b6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 5 Jun 2018 14:57:45 +0200 Subject: [PATCH] Support for rolling upgrade, #23703 --- .../protobuf/ReplicatedDataSerializer.scala | 68 ++++++++++++++++--- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 56b27344ea..cb31928f96 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -28,6 +28,7 @@ import java.io.NotSerializableException import akka.actor.ActorRef import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage import akka.serialization.Serialization +import akka.util.Helpers.toRootLowerCase private object ReplicatedDataSerializer { /* @@ -167,6 +168,16 @@ private object ReplicatedDataSerializer { override def getValue(entry: rd.ORMapDeltaGroup.MapEntry): dm.OtherMessage = entry.getValue } + // Optimized serializer for ORSet[ActorRef] was added in 2.5.14. + // To support rolling upgrades from Akka 2.5.13 to 2.5.14 and then to 2.5.15 those elements + // are by default sent as both old and new elements in 2.5.14. + // The old is used by 2.5.13 receivers. The new is used by 2.5.14 and 2.5.15 receivers. + // FIXME Remove this in 2.5.15 + private sealed trait ActorRefFormat + private case object OldActorRefFormat extends ActorRefFormat + private case object BothOldAndNewActorRefFormat extends ActorRefFormat + private case object NewActorRefFormat extends ActorRefFormat + } /** @@ -177,6 +188,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) import ReplicatedDataSerializer._ + private val actorRefFormat: ActorRefFormat = { + val conf = system.settings.config + val confKey = "akka.cluster.distributed-data.actor-ref-format" + // this config is not in reference.conf because it's only temporary for the 2.5.14 release + if (conf.hasPath(confKey)) { + toRootLowerCase(conf.getString(confKey)) match { + case "old" ⇒ OldActorRefFormat + case "both" ⇒ BothOldAndNewActorRefFormat + case "new" ⇒ NewActorRefFormat + } + } else + BothOldAndNewActorRefFormat + } + private val DeletedDataManifest = "A" private val GSetManifest = "B" private val GSetKeyManifest = "b" @@ -325,11 +350,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) val otherElements = new ArrayList[dm.OtherMessage] val actorRefElements = new ArrayList[String] gset.elements.foreach { - case s: String ⇒ stringElements.add(s) - case i: Int ⇒ intElements.add(i) - case l: Long ⇒ longElements.add(l) - case ref: ActorRef ⇒ actorRefElements.add(Serialization.serializedActorPath(ref)) - case other ⇒ otherElements.add(otherMessageToProto(other)) + case s: String ⇒ stringElements.add(s) + case i: Int ⇒ intElements.add(i) + case l: Long ⇒ longElements.add(l) + case ref: ActorRef ⇒ + actorRefFormat match { + case BothOldAndNewActorRefFormat ⇒ + actorRefElements.add(Serialization.serializedActorPath(ref)) + otherElements.add(otherMessageToProto(ref)) + case OldActorRefFormat ⇒ + otherElements.add(otherMessageToProto(ref)) + case NewActorRefFormat ⇒ + actorRefElements.add(Serialization.serializedActorPath(ref)) + } + case other ⇒ otherElements.add(otherMessageToProto(other)) } if (!stringElements.isEmpty) { Collections.sort(stringElements) @@ -381,10 +415,26 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) var otherElementsMap = Map.empty[dm.OtherMessage, Any] val actorRefElements = new ArrayList[ActorRef] orset.elementsMap.keysIterator.foreach { - case s: String ⇒ stringElements.add(s) - case i: Int ⇒ intElements.add(i) - case l: Long ⇒ longElements.add(l) - case ref: ActorRef ⇒ actorRefElements.add(ref) + case s: String ⇒ stringElements.add(s) + case i: Int ⇒ intElements.add(i) + case l: Long ⇒ longElements.add(l) + case ref: ActorRef ⇒ + actorRefFormat match { + case BothOldAndNewActorRefFormat ⇒ + actorRefElements.add(ref) + val enclosedMsg = otherMessageToProto(ref) + otherElements.add(enclosedMsg) + // need the mapping back to the `other` when adding dots + otherElementsMap = otherElementsMap.updated(enclosedMsg, ref) + case OldActorRefFormat ⇒ + otherElements.add(otherMessageToProto(ref)) + val enclosedMsg = otherMessageToProto(ref) + otherElements.add(enclosedMsg) + // need the mapping back to the `other` when adding dots + otherElementsMap = otherElementsMap.updated(enclosedMsg, ref) + case NewActorRefFormat ⇒ + actorRefElements.add(ref) + } case other ⇒ val enclosedMsg = otherMessageToProto(other) otherElements.add(enclosedMsg)