Support for rolling upgrade, #23703
This commit is contained in:
parent
f754705c9c
commit
b72312d428
1 changed files with 59 additions and 9 deletions
|
|
@ -28,6 +28,7 @@ import java.io.NotSerializableException
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
|
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
|
import akka.util.Helpers.toRootLowerCase
|
||||||
|
|
||||||
private object ReplicatedDataSerializer {
|
private object ReplicatedDataSerializer {
|
||||||
/*
|
/*
|
||||||
|
|
@ -167,6 +168,16 @@ private object ReplicatedDataSerializer {
|
||||||
override def getValue(entry: rd.ORMapDeltaGroup.MapEntry): dm.OtherMessage = entry.getValue
|
override def getValue(entry: rd.ORMapDeltaGroup.MapEntry): dm.OtherMessage = entry.getValue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Optimized serializer for ORSet[ActorRef] was added in 2.5.14.
|
||||||
|
// To support rolling upgrades from Akka 2.5.13 to 2.5.14 and then to 2.5.15 those elements
|
||||||
|
// are by default sent as both old and new elements in 2.5.14.
|
||||||
|
// The old is used by 2.5.13 receivers. The new is used by 2.5.14 and 2.5.15 receivers.
|
||||||
|
// FIXME Remove this in 2.5.15
|
||||||
|
private sealed trait ActorRefFormat
|
||||||
|
private case object OldActorRefFormat extends ActorRefFormat
|
||||||
|
private case object BothOldAndNewActorRefFormat extends ActorRefFormat
|
||||||
|
private case object NewActorRefFormat extends ActorRefFormat
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -177,6 +188,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
||||||
|
|
||||||
import ReplicatedDataSerializer._
|
import ReplicatedDataSerializer._
|
||||||
|
|
||||||
|
private val actorRefFormat: ActorRefFormat = {
|
||||||
|
val conf = system.settings.config
|
||||||
|
val confKey = "akka.cluster.distributed-data.actor-ref-format"
|
||||||
|
// this config is not in reference.conf because it's only temporary for the 2.5.14 release
|
||||||
|
if (conf.hasPath(confKey)) {
|
||||||
|
toRootLowerCase(conf.getString(confKey)) match {
|
||||||
|
case "old" ⇒ OldActorRefFormat
|
||||||
|
case "both" ⇒ BothOldAndNewActorRefFormat
|
||||||
|
case "new" ⇒ NewActorRefFormat
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
BothOldAndNewActorRefFormat
|
||||||
|
}
|
||||||
|
|
||||||
private val DeletedDataManifest = "A"
|
private val DeletedDataManifest = "A"
|
||||||
private val GSetManifest = "B"
|
private val GSetManifest = "B"
|
||||||
private val GSetKeyManifest = "b"
|
private val GSetKeyManifest = "b"
|
||||||
|
|
@ -325,11 +350,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
||||||
val otherElements = new ArrayList[dm.OtherMessage]
|
val otherElements = new ArrayList[dm.OtherMessage]
|
||||||
val actorRefElements = new ArrayList[String]
|
val actorRefElements = new ArrayList[String]
|
||||||
gset.elements.foreach {
|
gset.elements.foreach {
|
||||||
case s: String ⇒ stringElements.add(s)
|
case s: String ⇒ stringElements.add(s)
|
||||||
case i: Int ⇒ intElements.add(i)
|
case i: Int ⇒ intElements.add(i)
|
||||||
case l: Long ⇒ longElements.add(l)
|
case l: Long ⇒ longElements.add(l)
|
||||||
case ref: ActorRef ⇒ actorRefElements.add(Serialization.serializedActorPath(ref))
|
case ref: ActorRef ⇒
|
||||||
case other ⇒ otherElements.add(otherMessageToProto(other))
|
actorRefFormat match {
|
||||||
|
case BothOldAndNewActorRefFormat ⇒
|
||||||
|
actorRefElements.add(Serialization.serializedActorPath(ref))
|
||||||
|
otherElements.add(otherMessageToProto(ref))
|
||||||
|
case OldActorRefFormat ⇒
|
||||||
|
otherElements.add(otherMessageToProto(ref))
|
||||||
|
case NewActorRefFormat ⇒
|
||||||
|
actorRefElements.add(Serialization.serializedActorPath(ref))
|
||||||
|
}
|
||||||
|
case other ⇒ otherElements.add(otherMessageToProto(other))
|
||||||
}
|
}
|
||||||
if (!stringElements.isEmpty) {
|
if (!stringElements.isEmpty) {
|
||||||
Collections.sort(stringElements)
|
Collections.sort(stringElements)
|
||||||
|
|
@ -381,10 +415,26 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
||||||
var otherElementsMap = Map.empty[dm.OtherMessage, Any]
|
var otherElementsMap = Map.empty[dm.OtherMessage, Any]
|
||||||
val actorRefElements = new ArrayList[ActorRef]
|
val actorRefElements = new ArrayList[ActorRef]
|
||||||
orset.elementsMap.keysIterator.foreach {
|
orset.elementsMap.keysIterator.foreach {
|
||||||
case s: String ⇒ stringElements.add(s)
|
case s: String ⇒ stringElements.add(s)
|
||||||
case i: Int ⇒ intElements.add(i)
|
case i: Int ⇒ intElements.add(i)
|
||||||
case l: Long ⇒ longElements.add(l)
|
case l: Long ⇒ longElements.add(l)
|
||||||
case ref: ActorRef ⇒ actorRefElements.add(ref)
|
case ref: ActorRef ⇒
|
||||||
|
actorRefFormat match {
|
||||||
|
case BothOldAndNewActorRefFormat ⇒
|
||||||
|
actorRefElements.add(ref)
|
||||||
|
val enclosedMsg = otherMessageToProto(ref)
|
||||||
|
otherElements.add(enclosedMsg)
|
||||||
|
// need the mapping back to the `other` when adding dots
|
||||||
|
otherElementsMap = otherElementsMap.updated(enclosedMsg, ref)
|
||||||
|
case OldActorRefFormat ⇒
|
||||||
|
otherElements.add(otherMessageToProto(ref))
|
||||||
|
val enclosedMsg = otherMessageToProto(ref)
|
||||||
|
otherElements.add(enclosedMsg)
|
||||||
|
// need the mapping back to the `other` when adding dots
|
||||||
|
otherElementsMap = otherElementsMap.updated(enclosedMsg, ref)
|
||||||
|
case NewActorRefFormat ⇒
|
||||||
|
actorRefElements.add(ref)
|
||||||
|
}
|
||||||
case other ⇒
|
case other ⇒
|
||||||
val enclosedMsg = otherMessageToProto(other)
|
val enclosedMsg = otherMessageToProto(other)
|
||||||
otherElements.add(enclosedMsg)
|
otherElements.add(enclosedMsg)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue