!clu #3416 Use plain protobuf and protoc

* Use the protobuf task to generate code for the cluster messages
* Convert the serializer to use plain protobuf instead of scalabuff
This commit is contained in:
Björn Antonsson 2013-09-13 11:47:32 +02:00
parent d0684c2f7e
commit 47a8affb25
8 changed files with 17228 additions and 232 deletions

View file

@ -6,7 +6,6 @@ package akka.cluster.protobuf
import akka.serialization.Serializer
import akka.cluster._
import scala.collection.breakOut
import scala.collection.immutable.TreeMap
import akka.actor.{ ExtendedActorSystem, Address }
import scala.Some
import scala.collection.immutable
@ -18,6 +17,8 @@ import java.util.zip.GZIPOutputStream
import java.util.zip.GZIPInputStream
import com.google.protobuf.MessageLite
import scala.annotation.tailrec
import akka.cluster.protobuf.msg.{ ClusterMessages cm }
import scala.collection.JavaConverters._
/**
* Protobuf serializer of cluster messages.
@ -29,13 +30,14 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] AnyRef](
classOf[InternalClusterAction.Join] -> {
case bytes
val m = msg.Join.defaultInstance.mergeFrom(bytes)
InternalClusterAction.Join(uniqueAddressFromProto(m.node), m.roles.toSet)
val m = cm.Join.parseFrom(bytes)
InternalClusterAction.Join(uniqueAddressFromProto(m.getNode),
Set.empty[String] ++ m.getRolesList.asScala)
},
classOf[InternalClusterAction.Welcome] -> {
case bytes
val m = msg.Welcome.defaultInstance.mergeFrom(decompress(bytes))
InternalClusterAction.Welcome(uniqueAddressFromProto(m.from), gossipFromProto(m.gossip))
val m = cm.Welcome.parseFrom(decompress(bytes))
InternalClusterAction.Welcome(uniqueAddressFromProto(m.getFrom), gossipFromProto(m.getGossip))
},
classOf[ClusterUserAction.Leave] -> (bytes ClusterUserAction.Leave(addressFromBinary(bytes))),
classOf[ClusterUserAction.Down] -> (bytes ClusterUserAction.Down(addressFromBinary(bytes))),
@ -54,24 +56,24 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
def identifier = 5
def toBinary(obj: AnyRef): Array[Byte] =
obj match {
case ClusterHeartbeatReceiver.Heartbeat(from) addressToProto(from).toByteArray
case m: GossipEnvelope compress(gossipEnvelopeToProto(m))
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)))
case ClusterUserAction.Leave(address) addressToProto(address).toByteArray
case ClusterUserAction.Down(address) addressToProto(address).toByteArray
case InternalClusterAction.InitJoin msg.Empty().toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProto(address).toByteArray
case InternalClusterAction.InitJoinNack(address) addressToProto(address).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeat(from) addressToProto(from).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeatAck(from) addressToProto(from).toByteArray
case ClusterHeartbeatSender.HeartbeatRequest(from) addressToProto(from).toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatReceiver.Heartbeat(from) addressToProtoByteArray(from)
case m: GossipEnvelope compress(gossipEnvelopeToProto(m))
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case ClusterHeartbeatReceiver.EndHeartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatReceiver.EndHeartbeatAck(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRequest(from) addressToProtoByteArray(from)
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
@ -97,47 +99,51 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
out.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in ClusterSerializer")
}
case _ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer")
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in ClusterSerializer")
}
case _ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer")
}
private def addressFromBinary(bytes: Array[Byte]): Address =
addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes))
addressFromProto(cm.Address.parseFrom(bytes))
private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress =
uniqueAddressFromProto(msg.UniqueAddress.defaultInstance.mergeFrom(bytes))
uniqueAddressFromProto(cm.UniqueAddress.parseFrom(bytes))
private def addressToProto(address: Address): msg.Address =
msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol))
private def addressToProto(address: Address): cm.Address.Builder = address match {
case Address(protocol, system, Some(host), Some(port))
cm.Address.newBuilder().setSystem(system).setHostname(host).setPort(port).setProtocol(protocol)
case _ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
}
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress =
msg.UniqueAddress(addressToProto(uniqueAddress.address), uniqueAddress.uid)
private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
private def addressFromProto(address: msg.Address): Address =
Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port)
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder =
cm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid)
private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress =
UniqueAddress(addressFromProto(uniqueAddress.address), uniqueAddress.uid)
private def addressFromProto(address: cm.Address): Address =
Address(address.getProtocol, address.getSystem, address.getHostname, address.getPort)
private def uniqueAddressFromProto(uniqueAddress: cm.UniqueAddress): UniqueAddress =
UniqueAddress(addressFromProto(uniqueAddress.getAddress), uniqueAddress.getUid)
private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int](
MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE,
MemberStatus.Up -> msg.MemberStatus.Up_VALUE,
MemberStatus.Leaving -> msg.MemberStatus.Leaving_VALUE,
MemberStatus.Exiting -> msg.MemberStatus.Exiting_VALUE,
MemberStatus.Down -> msg.MemberStatus.Down_VALUE,
MemberStatus.Removed -> msg.MemberStatus.Removed_VALUE)
MemberStatus.Joining -> cm.MemberStatus.Joining_VALUE,
MemberStatus.Up -> cm.MemberStatus.Up_VALUE,
MemberStatus.Leaving -> cm.MemberStatus.Leaving_VALUE,
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE)
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) (b, a) }
private val reachabilityStatusToInt = scala.collection.immutable.HashMap[Reachability.ReachabilityStatus, Int](
Reachability.Reachable -> msg.ReachabilityStatus.Reachable_VALUE,
Reachability.Unreachable -> msg.ReachabilityStatus.Unreachable_VALUE,
Reachability.Terminated -> msg.ReachabilityStatus.Terminated_VALUE)
Reachability.Reachable -> cm.ReachabilityStatus.Reachable_VALUE,
Reachability.Unreachable -> cm.ReachabilityStatus.Unreachable_VALUE,
Reachability.Terminated -> cm.ReachabilityStatus.Terminated_VALUE)
private val reachabilityStatusFromInt = reachabilityStatusToInt.map { case (a, b) (b, a) }
@ -146,7 +152,13 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
case _ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message")
}
private def gossipToProto(gossip: Gossip): msg.Gossip = {
private def joinToProto(node: UniqueAddress, roles: Set[String]): cm.Join =
cm.Join.newBuilder().setNode(uniqueAddressToProto(node)).addAllRoles(roles.asJava).build()
private def welcomeToProto(from: UniqueAddress, gossip: Gossip): cm.Welcome =
cm.Welcome.newBuilder().setFrom(uniqueAddressToProto(from)).setGossip(gossipToProto(gossip)).build()
private def gossipToProto(gossip: Gossip): cm.Gossip.Builder = {
import scala.collection.breakOut
val allMembers = gossip.members.toVector
val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)
@ -156,70 +168,79 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val allHashes = gossip.version.versions.keys.to[Vector]
val hashMapping = allHashes.zipWithIndex.toMap
def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role")
def mapUniqueAddress(uniqueAddress: UniqueAddress): Integer = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
def mapRole(role: String): Integer = mapWithErrorMessage(roleMapping, role, "role")
def memberToProto(member: Member) =
msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber,
msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut))
cm.Member.newBuilder.setAddressIndex(mapUniqueAddress(member.uniqueAddress)).setUpNumber(member.upNumber).
setStatus(cm.MemberStatus.valueOf(memberStatusToInt(member.status))).
addAllRolesIndexes(member.roles.map(mapRole).asJava)
def reachabilityToProto(reachability: Reachability): Vector[msg.ObserverReachability] = {
def reachabilityToProto(reachability: Reachability): Iterable[cm.ObserverReachability.Builder] = {
reachability.versions.map {
case (observer, version)
val subjectReachability = reachability.recordsFrom(observer).map(r
msg.SubjectReachability(mapUniqueAddress(r.subject),
msg.ReachabilityStatus.valueOf(reachabilityStatusToInt(r.status)), r.version))
msg.ObserverReachability(mapUniqueAddress(observer), version, subjectReachability)
}(breakOut)
cm.SubjectReachability.newBuilder().setAddressIndex(mapUniqueAddress(r.subject)).
setStatus(cm.ReachabilityStatus.valueOf(reachabilityStatusToInt(r.status))).
setVersion(r.version))
cm.ObserverReachability.newBuilder().setAddressIndex(mapUniqueAddress(observer)).setVersion(version).
addAllSubjectReachability(subjectReachability.map(_.build).asJava)
}
}
val reachability = reachabilityToProto(gossip.overview.reachability)
val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut)
val seen: Vector[Int] = gossip.overview.seen.map(mapUniqueAddress)(breakOut)
val members = gossip.members.map(memberToProto)
val seen = gossip.overview.seen.map(mapUniqueAddress)
val overview = msg.GossipOverview(seen, reachability)
val overview = cm.GossipOverview.newBuilder.addAllSeen(seen.asJava).
addAllObserverReachability(reachability.map(_.build).asJava)
msg.Gossip(allAddresses.map(uniqueAddressToProto),
allRoles, allHashes, members, overview, vectorClockToProto(gossip.version, hashMapping))
cm.Gossip.newBuilder().addAllAllAddresses(allAddresses.map(uniqueAddressToProto(_).build).asJava).
addAllAllRoles(allRoles.asJava).addAllAllHashes(allHashes.asJava).addAllMembers(members.map(_.build).asJava).
setOverview(overview).setVersion(vectorClockToProto(gossip.version, hashMapping))
}
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = {
val versions: Vector[msg.VectorClock.Version] = version.versions.map({
case (n, t) msg.VectorClock.Version(mapWithErrorMessage(hashMapping, n, "hash"), t)
})(breakOut)
msg.VectorClock(None, versions)
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): cm.VectorClock.Builder = {
val versions: Iterable[cm.VectorClock.Version.Builder] = version.versions.map {
case (n, t) cm.VectorClock.Version.newBuilder().setHashIndex(mapWithErrorMessage(hashMapping, n, "hash")).
setTimestamp(t)
}
cm.VectorClock.newBuilder().setTimestamp(0).addAllVersions(versions.map(_.build).asJava)
}
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope =
msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), gossipToProto(envelope.gossip))
private def gossipEnvelopeToProto(envelope: GossipEnvelope): cm.GossipEnvelope =
cm.GossipEnvelope.newBuilder().setFrom(uniqueAddressToProto(envelope.from)).setTo(uniqueAddressToProto(envelope.to)).
setGossip(gossipToProto(envelope.gossip)).build
private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = {
private def gossipStatusToProto(status: GossipStatus): cm.GossipStatus = {
val allHashes = status.version.versions.keys.toVector
val hashMapping = allHashes.zipWithIndex.toMap
msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping))
cm.GossipStatus.newBuilder().setFrom(uniqueAddressToProto(status.from)).addAllAllHashes(allHashes.asJava).
setVersion(vectorClockToProto(status.version, hashMapping)).build()
}
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope =
gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(decompress(bytes)))
gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(decompress(bytes)))
private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus =
gossipStatusFromProto(msg.GossipStatus.defaultInstance.mergeFrom(bytes))
gossipStatusFromProto(cm.GossipStatus.parseFrom(bytes))
private def gossipFromProto(gossip: msg.Gossip): Gossip = {
private def gossipFromProto(gossip: cm.Gossip): Gossip = {
import scala.collection.breakOut
val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto)
val roleMapping = gossip.allRoles
val hashMapping = gossip.allHashes
val addressMapping: Vector[UniqueAddress] =
gossip.getAllAddressesList.asScala.map(uniqueAddressFromProto)(breakOut)
val roleMapping: Vector[String] = gossip.getAllRolesList.asScala.map(identity)(breakOut)
val hashMapping: Vector[String] = gossip.getAllHashesList.asScala.map(identity)(breakOut)
def reachabilityFromProto(observerReachability: immutable.Seq[msg.ObserverReachability]): Reachability = {
def reachabilityFromProto(observerReachability: Iterable[cm.ObserverReachability]): Reachability = {
val recordBuilder = new immutable.VectorBuilder[Reachability.Record]
val versionsBuilder = new scala.collection.mutable.MapBuilder[UniqueAddress, Long, Map[UniqueAddress, Long]](Map.empty)
for (o observerReachability) {
val observer = addressMapping(o.addressIndex)
versionsBuilder += ((observer, o.version))
for (s o.subjectReachability) {
val subject = addressMapping(s.addressIndex)
val record = Reachability.Record(observer, subject, reachabilityStatusFromInt(s.status), s.version)
val observer = addressMapping(o.getAddressIndex)
versionsBuilder += ((observer, o.getVersion))
for (s o.getSubjectReachabilityList.asScala) {
val subject = addressMapping(s.getAddressIndex)
val record = Reachability.Record(observer, subject, reachabilityStatusFromInt(s.getStatus.getNumber), s.getVersion)
recordBuilder += record
}
}
@ -227,107 +248,119 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
Reachability.create(recordBuilder.result(), versionsBuilder.result())
}
def memberFromProto(member: msg.Member) =
new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id),
member.rolesIndexes.map(roleMapping)(breakOut))
def memberFromProto(member: cm.Member) =
new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber),
member.getRolesIndexesList.asScala.map(roleMapping(_))(breakOut))
val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut)
val members: immutable.SortedSet[Member] = gossip.getMembersList.asScala.map(memberFromProto)(breakOut)
val reachability = reachabilityFromProto(gossip.overview.observerReachability)
val seen: Set[UniqueAddress] = gossip.overview.seen.map(addressMapping)(breakOut)
val reachability = reachabilityFromProto(gossip.getOverview.getObserverReachabilityList.asScala)
val seen: Set[UniqueAddress] = gossip.getOverview.getSeenList.asScala.map(addressMapping(_))(breakOut)
val overview = GossipOverview(seen, reachability)
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))
Gossip(members, overview, vectorClockFromProto(gossip.getVersion, hashMapping))
}
private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = {
VectorClock(version.versions.map({
case msg.VectorClock.Version(h, t) (VectorClock.Node.fromHash(hashMapping(h)), t)
})(breakOut))
private def vectorClockFromProto(version: cm.VectorClock, hashMapping: immutable.Seq[String]) = {
VectorClock(version.getVersionsList.asScala.map(
v (VectorClock.Node.fromHash(hashMapping(v.getHashIndex)), v.getTimestamp))(breakOut))
}
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope =
GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to), gossipFromProto(envelope.gossip))
private def gossipEnvelopeFromProto(envelope: cm.GossipEnvelope): GossipEnvelope =
GossipEnvelope(uniqueAddressFromProto(envelope.getFrom), uniqueAddressFromProto(envelope.getTo),
gossipFromProto(envelope.getGossip))
private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus =
GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, status.allHashes))
private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus =
GossipStatus(uniqueAddressFromProto(status.getFrom), vectorClockFromProto(status.getVersion,
status.getAllHashesList.asScala.toVector))
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = {
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) s + n.address).to[Vector]
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) s + n.address)
val addressMapping = allAddresses.zipWithIndex.toMap
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name)).to[Vector]
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name))
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = ewma.map(x msg.NodeMetrics.EWMA(x.value, x.alpha))
def ewmaToProto(ewma: Option[EWMA]): Option[cm.NodeMetrics.EWMA.Builder] = ewma.map {
x cm.NodeMetrics.EWMA.newBuilder().setValue(x.value).setAlpha(x.alpha)
}
def numberToProto(number: Number): msg.NodeMetrics.Number = {
import msg.NodeMetrics.Number
import msg.NodeMetrics.NumberType
def numberToProto(number: Number): cm.NodeMetrics.Number.Builder = {
import cm.NodeMetrics.Number
import cm.NodeMetrics.NumberType
number match {
case n: jl.Double Number(NumberType.Double, None, Some(jl.Double.doubleToLongBits(n)), None)
case n: jl.Long Number(NumberType.Long, None, Some(n), None)
case n: jl.Float Number(NumberType.Float, Some(jl.Float.floatToIntBits(n)), None, None)
case n: jl.Integer Number(NumberType.Integer, Some(n), None, None)
case n: jl.Double Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n))
case n: jl.Long Number.newBuilder().setType(NumberType.Long).setValue64(n)
case n: jl.Float Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n))
case n: jl.Integer Number.newBuilder().setType(NumberType.Integer) setValue32 (n)
case _
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(number)
out.close()
msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None, Some(ByteString.copyFrom(bos.toByteArray)))
Number.newBuilder().setType(NumberType.Serialized).setSerialized(ByteString.copyFrom(bos.toByteArray))
}
}
def metricToProto(metric: Metric): msg.NodeMetrics.Metric =
msg.NodeMetrics.Metric(mapName(metric.name), numberToProto(metric.value), ewmaToProto(metric.average))
def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = {
val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value))
ewmaToProto(metric.average).map(builder.setEwma(_)).getOrElse(builder)
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics =
msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricToProto)(breakOut))
def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder =
cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp).
addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava)
val nodeMetrics: Vector[msg.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto)(breakOut)
val nodeMetrics: Iterable[cm.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto(_).build)
msg.MetricsGossipEnvelope(addressToProto(envelope.from),
msg.MetricsGossip(allAddresses.map(addressToProto), allMetricNames, nodeMetrics), envelope.reply)
cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip(
cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava).
addAllAllMetricNames(allMetricNames.asJava).addAllNodeMetrics(nodeMetrics.asJava)).
setReply(envelope.reply).build
}
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope =
metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(decompress(bytes)))
metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes)))
private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val addressMapping = mgossip.allAddresses.map(addressFromProto)
val metricNameMapping = mgossip.allMetricNames
private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = {
val mgossip = envelope.getGossip
val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut)
val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector
def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = ewma.map(x EWMA(x.value, x.alpha))
def ewmaFromProto(ewma: cm.NodeMetrics.EWMA): Option[EWMA] =
Some(EWMA(ewma.getValue, ewma.getAlpha))
def numberFromProto(number: msg.NodeMetrics.Number): Number = {
import msg.NodeMetrics.Number
import msg.NodeMetrics.NumberType
number match {
case Number(NumberType.Double, _, Some(n), _) jl.Double.longBitsToDouble(n)
case Number(NumberType.Long, _, Some(n), _) n
case Number(NumberType.Float, Some(n), _, _) jl.Float.intBitsToFloat(n)
case Number(NumberType.Integer, Some(n), _, _) n
case Number(NumberType.Serialized, _, _, Some(b))
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new ByteArrayInputStream(b.toByteArray))
def numberFromProto(number: cm.NodeMetrics.Number): Number = {
import cm.NodeMetrics.Number
import cm.NodeMetrics.NumberType
number.getType.getNumber match {
case NumberType.Double_VALUE jl.Double.longBitsToDouble(number.getValue64)
case NumberType.Long_VALUE number.getValue64
case NumberType.Float_VALUE jl.Float.intBitsToFloat(number.getValue32)
case NumberType.Integer_VALUE number.getValue32
case NumberType.Serialized_VALUE
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader,
new ByteArrayInputStream(number.getSerialized.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]
}
}
def metricFromProto(metric: msg.NodeMetrics.Metric): Metric =
Metric(metricNameMapping(metric.nameIndex), numberFromProto(metric.number), ewmaFromProto(metric.ewma))
def metricFromProto(metric: cm.NodeMetrics.Metric): Metric =
Metric(metricNameMapping(metric.getNameIndex), numberFromProto(metric.getNumber),
if (metric.hasEwma) ewmaFromProto(metric.getEwma) else None)
def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics =
NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricFromProto)(breakOut))
def nodeMetricsFromProto(nodeMetrics: cm.NodeMetrics): NodeMetrics =
NodeMetrics(addressMapping(nodeMetrics.getAddressIndex), nodeMetrics.getTimestamp,
nodeMetrics.getMetricsList.asScala.map(metricFromProto)(breakOut))
val nodeMetrics: Set[NodeMetrics] = mgossip.nodeMetrics.map(nodeMetricsFromProto)(breakOut)
val nodeMetrics: Set[NodeMetrics] = mgossip.getNodeMetricsList.asScala.map(nodeMetricsFromProto)(breakOut)
MetricsGossipEnvelope(addressFromProto(envelope.from), MetricsGossip(nodeMetrics), envelope.reply)
MetricsGossipEnvelope(addressFromProto(envelope.getFrom), MetricsGossip(nodeMetrics), envelope.getReply)
}
}