=clu #16667 fix Inconsistent message order in ClusterMessageSerializer & MessageSerializer

This commit is contained in:
hepin 2015-01-29 16:51:57 +08:00
parent 50d1569f37
commit 8554b8a5ab
2 changed files with 45 additions and 64 deletions

View file

@ -4,31 +4,19 @@
package akka.cluster.metrics.protobuf
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.ObjectOutputStream
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream }
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import java.{ lang jl }
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.breakOut
import com.google.protobuf.ByteString
import com.google.protobuf.MessageLite
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.cluster.metrics.EWMA
import akka.cluster.metrics.Metric
import akka.cluster.metrics.MetricsGossip
import akka.cluster.metrics.NodeMetrics
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages cm }
import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics }
import akka.serialization.Serializer
import akka.util.ClassLoaderObjectInputStream
import akka.cluster.metrics.ClusterMetricsMessage
import akka.cluster.metrics.MetricsGossipEnvelope
import akka.cluster.metrics.ClusterMetricsSettings
import com.google.protobuf.{ ByteString, MessageLite }
import scala.annotation.tailrec
import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter }
/**
* Protobuf serializer for [[ClusterMetricsMessage]] types.
@ -83,17 +71,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
case _ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics")
}
private def addressFromBinary(bytes: Array[Byte]): Address =
addressFromProto(cm.Address.parseFrom(bytes))
private def addressToProto(address: Address): cm.Address.Builder = address match {
case Address(protocol, system, Some(host), Some(port))
cm.Address.newBuilder().setSystem(system).setHostname(host).setPort(port).setProtocol(protocol)
case _ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
case Address(protocol, actorSystem, Some(host), Some(port))
cm.Address.newBuilder().setSystem(actorSystem).setHostname(host).setPort(port).setProtocol(protocol)
case _ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
}
private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
@volatile
private var protocolCache: String = null
@volatile
@ -124,16 +107,16 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match {
case Some(x) x
case _ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message")
case _ throw new IllegalArgumentException(s"Unknown $unknown [$value] in cluster message")
}
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) s + n.address)
import scala.collection.breakOut
val allNodeMetrics = envelope.gossip.nodes
val allAddresses: Vector[Address] = allNodeMetrics.map(_.address)(breakOut)
val addressMapping = allAddresses.zipWithIndex.toMap
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name))
val allMetricNames: Vector[String] = allNodeMetrics.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name)).toVector
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
@ -148,7 +131,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
case n: jl.Double Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n))
case n: jl.Long Number.newBuilder().setType(NumberType.Long).setValue64(n)
case n: jl.Float Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n))
case n: jl.Integer Number.newBuilder().setType(NumberType.Integer) setValue32 (n)
case n: jl.Integer Number.newBuilder().setType(NumberType.Integer).setValue32(n)
case _
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
@ -160,14 +143,14 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = {
val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value))
ewmaToProto(metric.average).map(builder.setEwma(_)).getOrElse(builder)
ewmaToProto(metric.average).map(builder.setEwma).getOrElse(builder)
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder =
cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp).
addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava)
val nodeMetrics: Iterable[cm.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto(_).build)
val nodeMetrics: Iterable[cm.NodeMetrics] = allNodeMetrics.map(nodeMetricsToProto(_).build)
cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip(
cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava).
@ -179,6 +162,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes)))
private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = {
import scala.collection.breakOut
val mgossip = envelope.getGossip
val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut)
val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector
@ -187,7 +171,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
Some(EWMA(ewma.getValue, ewma.getAlpha))
def numberFromProto(number: cm.NodeMetrics.Number): Number = {
import cm.NodeMetrics.Number
import cm.NodeMetrics.NumberType
number.getType.getNumber match {
case NumberType.Double_VALUE jl.Double.longBitsToDouble(number.getValue64)