+ akka-cluster-metrics: new akka module
* new akka module split from akka-cluster * provide sigar provisioning * fix ewma usage * resolve #16121 * see #16354
This commit is contained in:
parent
baca3644e2
commit
7b9f77a073
121 changed files with 10462 additions and 215 deletions
|
|
@ -0,0 +1,219 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics.protobuf
|
||||
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.ObjectOutputStream
|
||||
import java.{ lang ⇒ jl }
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.util.zip.GZIPOutputStream
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters.asJavaIterableConverter
|
||||
import scala.collection.JavaConverters.asScalaBufferConverter
|
||||
import scala.collection.JavaConverters.setAsJavaSetConverter
|
||||
import scala.collection.breakOut
|
||||
import com.google.protobuf.ByteString
|
||||
import com.google.protobuf.MessageLite
|
||||
import akka.actor.Address
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.cluster.metrics.EWMA
|
||||
import akka.cluster.metrics.Metric
|
||||
import akka.cluster.metrics.MetricsGossip
|
||||
import akka.cluster.metrics.NodeMetrics
|
||||
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm }
|
||||
import akka.serialization.Serializer
|
||||
import akka.util.ClassLoaderObjectInputStream
|
||||
import akka.cluster.metrics.ClusterMetricsMessage
|
||||
import akka.cluster.metrics.MetricsGossipEnvelope
|
||||
import akka.cluster.metrics.ClusterMetricsSettings
|
||||
|
||||
/**
|
||||
* Protobuf serializer for [[ClusterMetricsMessage]] types.
|
||||
*/
|
||||
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||
|
||||
private final val BufferSize = 4 * 1024
|
||||
|
||||
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMetricsMessage], Array[Byte] ⇒ AnyRef](
|
||||
classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
|
||||
|
||||
override val includeManifest: Boolean = true
|
||||
|
||||
override val identifier = ClusterMetricsSettings(system.settings.config).SerializerIdentifier
|
||||
|
||||
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case m: MetricsGossipEnvelope ⇒
|
||||
compress(metricsGossipEnvelopeToProto(m))
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
|
||||
}
|
||||
|
||||
def compress(msg: MessageLite): Array[Byte] = {
|
||||
val bos = new ByteArrayOutputStream(BufferSize)
|
||||
val zip = new GZIPOutputStream(bos)
|
||||
msg.writeTo(zip)
|
||||
zip.close()
|
||||
bos.toByteArray
|
||||
}
|
||||
|
||||
def decompress(bytes: Array[Byte]): Array[Byte] = {
|
||||
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
|
||||
val out = new ByteArrayOutputStream()
|
||||
val buffer = new Array[Byte](BufferSize)
|
||||
|
||||
@tailrec def readChunk(): Unit = in.read(buffer) match {
|
||||
case -1 ⇒ ()
|
||||
case n ⇒
|
||||
out.write(buffer, 0, n)
|
||||
readChunk()
|
||||
}
|
||||
|
||||
readChunk()
|
||||
out.toByteArray
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
|
||||
case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMetricsMessage]]) match {
|
||||
case Some(f) ⇒ f(bytes)
|
||||
case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in metrics")
|
||||
}
|
||||
case _ ⇒ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics")
|
||||
}
|
||||
|
||||
private def addressFromBinary(bytes: Array[Byte]): Address =
|
||||
addressFromProto(cm.Address.parseFrom(bytes))
|
||||
|
||||
private def addressToProto(address: Address): cm.Address.Builder = address match {
|
||||
case Address(protocol, system, Some(host), Some(port)) ⇒
|
||||
cm.Address.newBuilder().setSystem(system).setHostname(host).setPort(port).setProtocol(protocol)
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
|
||||
}
|
||||
|
||||
private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
|
||||
|
||||
@volatile
|
||||
private var protocolCache: String = null
|
||||
@volatile
|
||||
private var systemCache: String = null
|
||||
|
||||
private def getProtocol(address: cm.Address): String = {
|
||||
val p = address.getProtocol
|
||||
val pc = protocolCache
|
||||
if (pc == p) pc
|
||||
else {
|
||||
protocolCache = p
|
||||
p
|
||||
}
|
||||
}
|
||||
|
||||
private def getSystem(address: cm.Address): String = {
|
||||
val s = address.getSystem
|
||||
val sc = systemCache
|
||||
if (sc == s) sc
|
||||
else {
|
||||
systemCache = s
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
private def addressFromProto(address: cm.Address): Address =
|
||||
Address(getProtocol(address), getSystem(address), address.getHostname, address.getPort)
|
||||
|
||||
private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match {
|
||||
case Some(x) ⇒ x
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message")
|
||||
}
|
||||
|
||||
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = {
|
||||
val mgossip = envelope.gossip
|
||||
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address)
|
||||
val addressMapping = allAddresses.zipWithIndex.toMap
|
||||
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name))
|
||||
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
|
||||
|
||||
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
|
||||
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
|
||||
|
||||
def ewmaToProto(ewma: Option[EWMA]): Option[cm.NodeMetrics.EWMA.Builder] = ewma.map {
|
||||
x ⇒ cm.NodeMetrics.EWMA.newBuilder().setValue(x.value).setAlpha(x.alpha)
|
||||
}
|
||||
|
||||
def numberToProto(number: Number): cm.NodeMetrics.Number.Builder = {
|
||||
import cm.NodeMetrics.Number
|
||||
import cm.NodeMetrics.NumberType
|
||||
number match {
|
||||
case n: jl.Double ⇒ Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n))
|
||||
case n: jl.Long ⇒ Number.newBuilder().setType(NumberType.Long).setValue64(n)
|
||||
case n: jl.Float ⇒ Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n))
|
||||
case n: jl.Integer ⇒ Number.newBuilder().setType(NumberType.Integer) setValue32 (n)
|
||||
case _ ⇒
|
||||
val bos = new ByteArrayOutputStream
|
||||
val out = new ObjectOutputStream(bos)
|
||||
out.writeObject(number)
|
||||
out.close()
|
||||
Number.newBuilder().setType(NumberType.Serialized).setSerialized(ByteString.copyFrom(bos.toByteArray))
|
||||
}
|
||||
}
|
||||
|
||||
def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = {
|
||||
val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value))
|
||||
ewmaToProto(metric.average).map(builder.setEwma(_)).getOrElse(builder)
|
||||
}
|
||||
|
||||
def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder =
|
||||
cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp).
|
||||
addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava)
|
||||
|
||||
val nodeMetrics: Iterable[cm.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto(_).build)
|
||||
|
||||
cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip(
|
||||
cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava).
|
||||
addAllAllMetricNames(allMetricNames.asJava).addAllNodeMetrics(nodeMetrics.asJava)).
|
||||
setReply(envelope.reply).build
|
||||
}
|
||||
|
||||
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope =
|
||||
metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes)))
|
||||
|
||||
private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = {
|
||||
val mgossip = envelope.getGossip
|
||||
val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut)
|
||||
val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector
|
||||
|
||||
def ewmaFromProto(ewma: cm.NodeMetrics.EWMA): Option[EWMA] =
|
||||
Some(EWMA(ewma.getValue, ewma.getAlpha))
|
||||
|
||||
def numberFromProto(number: cm.NodeMetrics.Number): Number = {
|
||||
import cm.NodeMetrics.Number
|
||||
import cm.NodeMetrics.NumberType
|
||||
number.getType.getNumber match {
|
||||
case NumberType.Double_VALUE ⇒ jl.Double.longBitsToDouble(number.getValue64)
|
||||
case NumberType.Long_VALUE ⇒ number.getValue64
|
||||
case NumberType.Float_VALUE ⇒ jl.Float.intBitsToFloat(number.getValue32)
|
||||
case NumberType.Integer_VALUE ⇒ number.getValue32
|
||||
case NumberType.Serialized_VALUE ⇒
|
||||
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader,
|
||||
new ByteArrayInputStream(number.getSerialized.toByteArray))
|
||||
val obj = in.readObject
|
||||
in.close()
|
||||
obj.asInstanceOf[jl.Number]
|
||||
}
|
||||
}
|
||||
|
||||
def metricFromProto(metric: cm.NodeMetrics.Metric): Metric =
|
||||
Metric(metricNameMapping(metric.getNameIndex), numberFromProto(metric.getNumber),
|
||||
if (metric.hasEwma) ewmaFromProto(metric.getEwma) else None)
|
||||
|
||||
def nodeMetricsFromProto(nodeMetrics: cm.NodeMetrics): NodeMetrics =
|
||||
NodeMetrics(addressMapping(nodeMetrics.getAddressIndex), nodeMetrics.getTimestamp,
|
||||
nodeMetrics.getMetricsList.asScala.map(metricFromProto)(breakOut))
|
||||
|
||||
val nodeMetrics: Set[NodeMetrics] = mgossip.getNodeMetricsList.asScala.map(nodeMetricsFromProto)(breakOut)
|
||||
|
||||
MetricsGossipEnvelope(addressFromProto(envelope.getFrom), MetricsGossip(nodeMetrics), envelope.getReply)
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue