Protobuf serialization of cluster messages. See #1910

This commit is contained in:
Björn Antonsson 2013-04-04 17:56:29 +02:00
parent c39542240c
commit 73f0f44ddb
12 changed files with 629 additions and 31 deletions

View file

@ -0,0 +1,278 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.protobuf
import akka.serialization.Serializer
import akka.cluster._
import scala.collection.breakOut
import akka.actor.{ ExtendedActorSystem, Address }
import scala.Some
import scala.collection.immutable
import java.io.{ ByteArrayInputStream, ObjectOutputStream, ByteArrayOutputStream }
import com.google.protobuf.ByteString
import akka.util.ClassLoaderObjectInputStream
import java.{ lang jl }
class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer {
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] AnyRef](
classOf[ClusterUserAction.Join] -> {
case bytes
val m = msg.Join.defaultInstance.mergeFrom(bytes)
ClusterUserAction.Join(addressFromProto(m.address), m.roles.toSet)
},
classOf[ClusterUserAction.Leave] -> (bytes ClusterUserAction.Leave(addressFromBinary(bytes))),
classOf[ClusterUserAction.Down] -> (bytes ClusterUserAction.Down(addressFromBinary(bytes))),
InternalClusterAction.InitJoin.getClass -> (_ InternalClusterAction.InitJoin),
classOf[InternalClusterAction.InitJoinAck] -> (bytes InternalClusterAction.InitJoinAck(addressFromBinary(bytes))),
classOf[InternalClusterAction.InitJoinNack] -> (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterLeaderAction.Exit] -> (bytes ClusterLeaderAction.Exit(addressFromBinary(bytes))),
classOf[ClusterLeaderAction.Remove] -> (bytes ClusterLeaderAction.Remove(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))),
classOf[GossipEnvelope] -> gossipEnvelopeFromBinary,
classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
def includeManifest: Boolean = true
def identifier = 5
def toBinary(obj: AnyRef): Array[Byte] = (obj match {
case ClusterHeartbeatReceiver.Heartbeat(from)
addressToProto(from)
case m: GossipEnvelope
gossipEnvelopeToProto(m)
case m: MetricsGossipEnvelope
metricsGossipEnvelopeToProto(m)
case ClusterUserAction.Join(address, roles)
msg.Join(addressToProto(address), roles.map(identity)(breakOut): Vector[String])
case ClusterUserAction.Leave(address)
addressToProto(address)
case ClusterUserAction.Down(address)
addressToProto(address)
case InternalClusterAction.InitJoin
msg.Empty()
case InternalClusterAction.InitJoinAck(address)
addressToProto(address)
case InternalClusterAction.InitJoinNack(address)
addressToProto(address)
case ClusterLeaderAction.Exit(address)
addressToProto(address)
case ClusterLeaderAction.Remove(address)
addressToProto(address)
case ClusterHeartbeatReceiver.EndHeartbeat(from)
addressToProto(from)
case ClusterHeartbeatSender.HeartbeatRequest(from)
addressToProto(from)
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}).toByteArray
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]]): AnyRef = {
clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class ${c} in ClusterSerializer")
}
case _ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer")
}
}
private def addressFromBinary(bytes: Array[Byte]): Address = {
addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes))
}
private def addressToProto(address: Address): msg.Address = {
msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol))
}
private def addressFromProto(address: msg.Address): Address = {
Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port)
}
private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int](
MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE,
MemberStatus.Up -> msg.MemberStatus.Up_VALUE,
MemberStatus.Leaving -> msg.MemberStatus.Leaving_VALUE,
MemberStatus.Exiting -> msg.MemberStatus.Exiting_VALUE,
MemberStatus.Down -> msg.MemberStatus.Down_VALUE,
MemberStatus.Removed -> msg.MemberStatus.Removed_VALUE)
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) (b, a) }
private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match {
case Some(x) x
case _ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message")
}
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = {
val gossip = envelope.gossip
val allMembers = List(gossip.members, gossip.overview.unreachable).flatMap(identity)
val allAddresses = allMembers.map(_.address).to[Vector]
val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.flatMap(_.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap
val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.map(_.hash).toSet) {
case (s, VectorClock(t, v)) s ++ v.keys.map(_.hash)
}.to[Vector]
val hashMapping = allHashes.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role")
def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash")
def memberToProto(member: Member) = {
msg.Member(mapAddress(member.address), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector])
}
def vectorClockToProto(version: VectorClock) = {
msg.VectorClock(version.timestamp.time,
version.versions.map { case (n, t) msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector])
}
def seenToProto(seen: (Address, VectorClock)) = seen match {
case (address: Address, version: VectorClock)
msg.GossipOverview.Seen(mapAddress(address), vectorClockToProto(version))
}
val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector]
val members = gossip.members.toSeq.map(memberToProto).to[Vector]
val seen = gossip.overview.seen.map(seenToProto).to[Vector]
val overview = msg.GossipOverview(seen, unreachable)
msg.GossipEnvelope(addressToProto(envelope.from), msg.Gossip(allAddresses.map(addressToProto),
allRoles, allHashes, members, overview, vectorClockToProto(gossip.version)),
envelope.conversation)
}
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = {
gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes))
}
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = {
val gossip = envelope.gossip
val addressMapping = gossip.allAddresses.map(addressFromProto)
val roleMapping = gossip.allRoles
val hashMapping = gossip.allHashes
def memberFromProto(member: msg.Member) = {
Member(addressMapping(member.addressIndex), memberStatusFromInt(member.status.id),
member.rolesIndexes.map(roleMapping).to[Set])
}
def vectorClockFromProto(version: msg.VectorClock) = {
VectorClock(VectorClock.Timestamp(version.timestamp),
version.versions.map {
case msg.VectorClock.Version(h, t)
(VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t))
}.toMap)
}
def seenFromProto(seen: msg.GossipOverview.Seen) =
(addressMapping(seen.addressIndex), vectorClockFromProto(seen.version))
val members = gossip.members.map(memberFromProto).to[immutable.SortedSet]
val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet
val seen = gossip.overview.seen.map(seenFromProto).toMap
val overview = GossipOverview(seen, unreachable)
GossipEnvelope(addressFromProto(envelope.from), Gossip(members, overview, vectorClockFromProto(gossip.version)))
}
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) s + n.address).to[Vector]
val addressMapping = allAddresses.zipWithIndex.toMap
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.map(_.name)).to[Vector]
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = {
ewma.map(x msg.NodeMetrics.EWMA(x.value, x.alpha))
}
def numberToProto(number: Number): msg.NodeMetrics.Number = {
import msg.NodeMetrics.Number
import msg.NodeMetrics.NumberType
number match {
case n: jl.Double Number(NumberType.Double, None, Some(jl.Double.doubleToLongBits(n)), None)
case n: jl.Long Number(NumberType.Long, None, Some(n), None)
case n: jl.Float Number(NumberType.Float, Some(jl.Float.floatToIntBits(n)), None, None)
case n: jl.Integer Number(NumberType.Integer, Some(n), None, None)
case _
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(number)
out.close()
msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None,
Some(ByteString.copyFrom(bos.toByteArray)))
}
}
def metricToProto(metric: Metric): msg.NodeMetrics.Metric = {
msg.NodeMetrics.Metric(mapName(metric.name), numberToProto(metric.value), ewmaToProto(metric.average))
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics = {
msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp,
nodeMetrics.metrics.map(metricToProto).to[Vector])
}
val nodeMetrics = mgossip.nodes.map(nodeMetricsToProto).to[Vector]
msg.MetricsGossipEnvelope(addressToProto(envelope.from),
msg.MetricsGossip(allAddresses.map(addressToProto), allMetricNames, nodeMetrics), envelope.reply)
}
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = {
metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(bytes))
}
private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val addressMapping = mgossip.allAddresses.map(addressFromProto)
val metricNameMapping = mgossip.allMetricNames
def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = {
ewma.map(x EWMA(x.value, x.alpha))
}
def numberFromProto(number: msg.NodeMetrics.Number): Number = {
import msg.NodeMetrics.Number
import msg.NodeMetrics.NumberType
number match {
case Number(NumberType.Double, _, Some(n), _) jl.Double.longBitsToDouble(n)
case Number(NumberType.Long, _, Some(n), _) n
case Number(NumberType.Float, Some(n), _, _) jl.Float.intBitsToFloat(n)
case Number(NumberType.Integer, Some(n), _, _) n
case Number(NumberType.Serialized, _, _, Some(b))
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader,
new ByteArrayInputStream(b.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]
}
}
def metricFromProto(metric: msg.NodeMetrics.Metric): Metric = {
Metric(metricNameMapping(metric.nameIndex), numberFromProto(metric.number), ewmaFromProto(metric.ewma))
}
def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics = {
NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp,
nodeMetrics.metrics.map(metricFromProto).toSet)
}
val nodeMetrics = mgossip.nodeMetrics.map(nodeMetricsFromProto).toSet
MetricsGossipEnvelope(addressFromProto(envelope.from),
MetricsGossip(nodeMetrics), envelope.reply)
}
}