Reducing the use of intermediate datastructures for the ClusterMessageSerializer

This commit is contained in:
Viktor Klang 2013-06-26 19:23:42 +02:00
parent 007a8b2399
commit a55d024627

View file

@ -52,34 +52,20 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
def toBinary(obj: AnyRef): Array[Byte] =
obj match {
case ClusterHeartbeatReceiver.Heartbeat(from)
addressToProto(from).toByteArray
case m: GossipEnvelope
compress(gossipEnvelopeToProto(m))
case m: GossipStatus
gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope
compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles)
msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray
case InternalClusterAction.Welcome(from, gossip)
compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)))
case ClusterUserAction.Leave(address)
addressToProto(address).toByteArray
case ClusterUserAction.Down(address)
addressToProto(address).toByteArray
case InternalClusterAction.InitJoin
msg.Empty().toByteArray
case InternalClusterAction.InitJoinAck(address)
addressToProto(address).toByteArray
case InternalClusterAction.InitJoinNack(address)
addressToProto(address).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeat(from)
addressToProto(from).toByteArray
case ClusterHeartbeatSender.HeartbeatRequest(from)
addressToProto(from).toByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
case ClusterHeartbeatReceiver.Heartbeat(from) addressToProto(from).toByteArray
case m: GossipEnvelope compress(gossipEnvelopeToProto(m))
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)))
case ClusterUserAction.Leave(address) addressToProto(address).toByteArray
case ClusterUserAction.Down(address) addressToProto(address).toByteArray
case InternalClusterAction.InitJoin msg.Empty().toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProto(address).toByteArray
case InternalClusterAction.InitJoinNack(address) addressToProto(address).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeat(from) addressToProto(from).toByteArray
case ClusterHeartbeatSender.HeartbeatRequest(from) addressToProto(from).toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
def compress(msg: MessageLite): Array[Byte] = {
@ -95,52 +81,43 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
@tailrec def readChunk(): Unit = {
val n = in.read(buffer)
if (n != -1) {
@tailrec def readChunk(): Unit = in.read(buffer) match {
case -1 ()
case n
out.write(buffer, 0, n)
readChunk()
}
}
readChunk()
readChunk()
out.toByteArray
}
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]]): AnyRef = {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class ${c} in ClusterSerializer")
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in ClusterSerializer")
}
case _ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer")
}
}
private def addressFromBinary(bytes: Array[Byte]): Address = {
private def addressFromBinary(bytes: Array[Byte]): Address =
addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes))
}
private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress = {
private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress =
uniqueAddressFromProto(msg.UniqueAddress.defaultInstance.mergeFrom(bytes))
}
private def addressToProto(address: Address): msg.Address = {
private def addressToProto(address: Address): msg.Address =
msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol))
}
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress = {
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress =
msg.UniqueAddress(addressToProto(uniqueAddress.address), uniqueAddress.uid)
}
private def addressFromProto(address: msg.Address): Address = {
private def addressFromProto(address: msg.Address): Address =
Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port)
}
private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress = {
private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress =
UniqueAddress(addressFromProto(uniqueAddress.address), uniqueAddress.uid)
}
private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int](
MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE,
@ -158,32 +135,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
}
private def gossipToProto(gossip: Gossip): msg.Gossip = {
val allMembers = List(gossip.members, gossip.overview.unreachable).flatMap(identity)
val allAddresses = allMembers.map(_.uniqueAddress).to[Vector]
import scala.collection.breakOut
val allMembers = (gossip.members.iterator ++ gossip.overview.unreachable.iterator).toIndexedSeq
val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)(breakOut)
val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) acc ++ m.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap
val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet) {
case (s, VectorClock(v)) s ++ v.keys
}.to[Vector]
val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet)(
{ case (s, VectorClock(v)) s ++ v.keys }).to[Vector]
val hashMapping = allHashes.zipWithIndex.toMap
def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role")
def memberToProto(member: Member) = {
def memberToProto(member: Member) =
msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber,
msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector])
msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut))
def seenToProto(seen: (UniqueAddress, VectorClock)) = {
val (address, version) = seen
msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping))
}
def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match {
case (address: UniqueAddress, version: VectorClock)
msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping))
}
val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector]
val members = gossip.members.toSeq.map(memberToProto).to[Vector]
val seen = gossip.overview.seen.map(seenToProto).to[Vector]
val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut)
val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut)
val seen: Vector[msg.GossipOverview.Seen] = gossip.overview.seen.map(seenToProto)(breakOut)
val overview = msg.GossipOverview(seen, unreachable)
@ -192,46 +168,43 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
}
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = {
def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash")
msg.VectorClock(None,
version.versions.map { case (n, t) msg.VectorClock.Version(mapHash(n), t.time) }.to[Vector])
val versions: Vector[msg.VectorClock.Version] = version.versions.map({
case (n, t) msg.VectorClock.Version(mapWithErrorMessage(hashMapping, n, "hash"), t.time)
})(breakOut)
msg.VectorClock(None, versions)
}
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = {
msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to),
gossipToProto(envelope.gossip))
}
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope =
msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), gossipToProto(envelope.gossip))
private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = {
val allHashes: Vector[String] = status.version.versions.keys.toVector
val allHashes = status.version.versions.keys.toVector
val hashMapping = allHashes.zipWithIndex.toMap
msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping))
}
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = {
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope =
gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(decompress(bytes)))
}
private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = {
private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus =
gossipStatusFromProto(msg.GossipStatus.defaultInstance.mergeFrom(bytes))
}
private def gossipFromProto(gossip: msg.Gossip): Gossip = {
import scala.collection.breakOut
val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto)
val roleMapping = gossip.allRoles
val hashMapping = gossip.allHashes
def memberFromProto(member: msg.Member) = {
def memberFromProto(member: msg.Member) =
new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id),
member.rolesIndexes.map(roleMapping).to[Set])
}
member.rolesIndexes.map(roleMapping)(breakOut))
def seenFromProto(seen: msg.GossipOverview.Seen) =
(addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping))
val members = gossip.members.map(memberFromProto).to[immutable.SortedSet]
val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet
val seen = gossip.overview.seen.map(seenFromProto)(breakOut): TreeMap[UniqueAddress, VectorClock]
val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut)
val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut)
val seen: immutable.TreeMap[UniqueAddress, VectorClock] = gossip.overview.seen.map(seenFromProto)(breakOut)
val overview = GossipOverview(seen, unreachable)
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))
@ -239,34 +212,27 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = {
VectorClock(version.versions.map({
case msg.VectorClock.Version(h, t)
(VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t))
})(breakOut): TreeMap[VectorClock.Node, VectorClock.Timestamp])
case msg.VectorClock.Version(h, t) (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t))
})(breakOut))
}
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = {
GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to),
gossipFromProto(envelope.gossip))
}
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope =
GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to), gossipFromProto(envelope.gossip))
private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus = {
val hashMapping = status.allHashes
GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, hashMapping))
}
private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus =
GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, status.allHashes))
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) s + n.address).to[Vector]
val addressMapping = allAddresses.zipWithIndex.toMap
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.map(_.name)).to[Vector]
val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name)).to[Vector]
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = {
ewma.map(x msg.NodeMetrics.EWMA(x.value, x.alpha))
}
def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = ewma.map(x msg.NodeMetrics.EWMA(x.value, x.alpha))
def numberToProto(number: Number): msg.NodeMetrics.Number = {
import msg.NodeMetrics.Number
@ -281,38 +247,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val out = new ObjectOutputStream(bos)
out.writeObject(number)
out.close()
msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None,
Some(ByteString.copyFrom(bos.toByteArray)))
msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None, Some(ByteString.copyFrom(bos.toByteArray)))
}
}
def metricToProto(metric: Metric): msg.NodeMetrics.Metric = {
def metricToProto(metric: Metric): msg.NodeMetrics.Metric =
msg.NodeMetrics.Metric(mapName(metric.name), numberToProto(metric.value), ewmaToProto(metric.average))
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics = {
msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp,
nodeMetrics.metrics.map(metricToProto).to[Vector])
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics =
msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricToProto)(breakOut))
val nodeMetrics = mgossip.nodes.map(nodeMetricsToProto).to[Vector]
val nodeMetrics: Vector[msg.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto)(breakOut)
msg.MetricsGossipEnvelope(addressToProto(envelope.from),
msg.MetricsGossip(allAddresses.map(addressToProto), allMetricNames, nodeMetrics), envelope.reply)
}
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = {
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope =
metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(decompress(bytes)))
}
private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = {
val mgossip = envelope.gossip
val addressMapping = mgossip.allAddresses.map(addressFromProto)
val metricNameMapping = mgossip.allMetricNames
def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = {
ewma.map(x EWMA(x.value, x.alpha))
}
def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = ewma.map(x EWMA(x.value, x.alpha))
def numberFromProto(number: msg.NodeMetrics.Number): Number = {
import msg.NodeMetrics.Number
@ -323,27 +282,22 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
case Number(NumberType.Float, Some(n), _, _) jl.Float.intBitsToFloat(n)
case Number(NumberType.Integer, Some(n), _, _) n
case Number(NumberType.Serialized, _, _, Some(b))
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader,
new ByteArrayInputStream(b.toByteArray))
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new ByteArrayInputStream(b.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]
}
}
def metricFromProto(metric: msg.NodeMetrics.Metric): Metric = {
def metricFromProto(metric: msg.NodeMetrics.Metric): Metric =
Metric(metricNameMapping(metric.nameIndex), numberFromProto(metric.number), ewmaFromProto(metric.ewma))
}
def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics = {
NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp,
nodeMetrics.metrics.map(metricFromProto).toSet)
}
def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics =
NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricFromProto)(breakOut))
val nodeMetrics = mgossip.nodeMetrics.map(nodeMetricsFromProto).toSet
val nodeMetrics: Set[NodeMetrics] = mgossip.nodeMetrics.map(nodeMetricsFromProto)(breakOut)
MetricsGossipEnvelope(addressFromProto(envelope.from),
MetricsGossip(nodeMetrics), envelope.reply)
MetricsGossipEnvelope(addressFromProto(envelope.from), MetricsGossip(nodeMetrics), envelope.reply)
}
}