diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index f02653bca4..529ae5595e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -52,34 +52,20 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def toBinary(obj: AnyRef): Array[Byte] = obj match { - case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ - addressToProto(from).toByteArray - case m: GossipEnvelope ⇒ - compress(gossipEnvelopeToProto(m)) - case m: GossipStatus ⇒ - gossipStatusToProto(m).toByteArray - case m: MetricsGossipEnvelope ⇒ - compress(metricsGossipEnvelopeToProto(m)) - case InternalClusterAction.Join(node, roles) ⇒ - msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray - case InternalClusterAction.Welcome(from, gossip) ⇒ - compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip))) - case ClusterUserAction.Leave(address) ⇒ - addressToProto(address).toByteArray - case ClusterUserAction.Down(address) ⇒ - addressToProto(address).toByteArray - case InternalClusterAction.InitJoin ⇒ - msg.Empty().toByteArray - case InternalClusterAction.InitJoinAck(address) ⇒ - addressToProto(address).toByteArray - case InternalClusterAction.InitJoinNack(address) ⇒ - addressToProto(address).toByteArray - case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ - addressToProto(from).toByteArray - case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ - addressToProto(from).toByteArray - case _ ⇒ - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ addressToProto(from).toByteArray + case m: GossipEnvelope ⇒ compress(gossipEnvelopeToProto(m)) + case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray + case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) + case InternalClusterAction.Join(node, roles) ⇒ msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray + case InternalClusterAction.Welcome(from, gossip) ⇒ compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip))) + case ClusterUserAction.Leave(address) ⇒ addressToProto(address).toByteArray + case ClusterUserAction.Down(address) ⇒ addressToProto(address).toByteArray + case InternalClusterAction.InitJoin ⇒ msg.Empty().toByteArray + case InternalClusterAction.InitJoinAck(address) ⇒ addressToProto(address).toByteArray + case InternalClusterAction.InitJoinNack(address) ⇒ addressToProto(address).toByteArray + case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ addressToProto(from).toByteArray + case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ addressToProto(from).toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") } def compress(msg: MessageLite): Array[Byte] = { @@ -95,52 +81,43 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val out = new ByteArrayOutputStream() val buffer = new Array[Byte](BufferSize) - @tailrec def readChunk(): Unit = { - val n = in.read(buffer) - if (n != -1) { + @tailrec def readChunk(): Unit = in.read(buffer) match { + case -1 ⇒ () + case n ⇒ out.write(buffer, 0, n) readChunk() - } } - readChunk() + readChunk() out.toByteArray } - def fromBinary(bytes: Array[Byte], - clazz: Option[Class[_]]): AnyRef = { + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class ${c} in ClusterSerializer") + case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in ClusterSerializer") } case _ ⇒ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer") } - } - private def addressFromBinary(bytes: Array[Byte]): Address = { + private def addressFromBinary(bytes: Array[Byte]): Address = addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes)) - } - private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress = { + private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress = uniqueAddressFromProto(msg.UniqueAddress.defaultInstance.mergeFrom(bytes)) - } - private def addressToProto(address: Address): msg.Address = { + private def addressToProto(address: Address): msg.Address = msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol)) - } - private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress = { + private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress = msg.UniqueAddress(addressToProto(uniqueAddress.address), uniqueAddress.uid) - } - private def addressFromProto(address: msg.Address): Address = { + private def addressFromProto(address: msg.Address): Address = Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port) - } - private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress = { + private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress = UniqueAddress(addressFromProto(uniqueAddress.address), uniqueAddress.uid) - } private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int]( MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE, @@ -158,32 +135,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def gossipToProto(gossip: Gossip): msg.Gossip = { - val allMembers = List(gossip.members, gossip.overview.unreachable).flatMap(identity) - val allAddresses = allMembers.map(_.uniqueAddress).to[Vector] + import scala.collection.breakOut + val allMembers = (gossip.members.iterator ++ gossip.overview.unreachable.iterator).toIndexedSeq + val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)(breakOut) val addressMapping = allAddresses.zipWithIndex.toMap val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc ++ m.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap - val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet) { - case (s, VectorClock(v)) ⇒ s ++ v.keys - }.to[Vector] + val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet)( + { case (s, VectorClock(v)) ⇒ s ++ v.keys }).to[Vector] val hashMapping = allHashes.zipWithIndex.toMap def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address") def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role") - def memberToProto(member: Member) = { + def memberToProto(member: Member) = msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber, - msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) + msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut)) + + def seenToProto(seen: (UniqueAddress, VectorClock)) = { + val (address, version) = seen + msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping)) } - def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match { - case (address: UniqueAddress, version: VectorClock) ⇒ - msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping)) - } - - val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector] - val members = gossip.members.toSeq.map(memberToProto).to[Vector] - val seen = gossip.overview.seen.map(seenToProto).to[Vector] + val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut) + val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut) + val seen: Vector[msg.GossipOverview.Seen] = gossip.overview.seen.map(seenToProto)(breakOut) val overview = msg.GossipOverview(seen, unreachable) @@ -192,46 +168,43 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = { - def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") - msg.VectorClock(None, - version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n), t.time) }.to[Vector]) + val versions: Vector[msg.VectorClock.Version] = version.versions.map({ + case (n, t) ⇒ msg.VectorClock.Version(mapWithErrorMessage(hashMapping, n, "hash"), t.time) + })(breakOut) + msg.VectorClock(None, versions) } - private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { - msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), - gossipToProto(envelope.gossip)) - } + private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = + msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), gossipToProto(envelope.gossip)) private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = { - val allHashes: Vector[String] = status.version.versions.keys.toVector + val allHashes = status.version.versions.keys.toVector val hashMapping = allHashes.zipWithIndex.toMap msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping)) } - private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = { + private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(decompress(bytes))) - } - private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = { + private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = gossipStatusFromProto(msg.GossipStatus.defaultInstance.mergeFrom(bytes)) - } private def gossipFromProto(gossip: msg.Gossip): Gossip = { + import scala.collection.breakOut val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto) val roleMapping = gossip.allRoles val hashMapping = gossip.allHashes - def memberFromProto(member: msg.Member) = { + def memberFromProto(member: msg.Member) = new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id), - member.rolesIndexes.map(roleMapping).to[Set]) - } + member.rolesIndexes.map(roleMapping)(breakOut)) def seenFromProto(seen: msg.GossipOverview.Seen) = (addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping)) - val members = gossip.members.map(memberFromProto).to[immutable.SortedSet] - val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet - val seen = gossip.overview.seen.map(seenFromProto)(breakOut): TreeMap[UniqueAddress, VectorClock] + val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut) + val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut) + val seen: immutable.TreeMap[UniqueAddress, VectorClock] = gossip.overview.seen.map(seenFromProto)(breakOut) val overview = GossipOverview(seen, unreachable) Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) @@ -239,34 +212,27 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = { VectorClock(version.versions.map({ - case msg.VectorClock.Version(h, t) ⇒ - (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) - })(breakOut): TreeMap[VectorClock.Node, VectorClock.Timestamp]) + case msg.VectorClock.Version(h, t) ⇒ (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) + })(breakOut)) } - private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { - GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to), - gossipFromProto(envelope.gossip)) - } + private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = + GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to), gossipFromProto(envelope.gossip)) - private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus = { - val hashMapping = status.allHashes - GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, hashMapping)) - } + private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus = + GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, status.allHashes)) private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = { val mgossip = envelope.gossip val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address).to[Vector] val addressMapping = allAddresses.zipWithIndex.toMap - val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.map(_.name)).to[Vector] + val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name)).to[Vector] val metricNamesMapping = allMetricNames.zipWithIndex.toMap def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address") - def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = { - ewma.map(x ⇒ msg.NodeMetrics.EWMA(x.value, x.alpha)) - } + def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = ewma.map(x ⇒ msg.NodeMetrics.EWMA(x.value, x.alpha)) def numberToProto(number: Number): msg.NodeMetrics.Number = { import msg.NodeMetrics.Number @@ -281,38 +247,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val out = new ObjectOutputStream(bos) out.writeObject(number) out.close() - msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None, - Some(ByteString.copyFrom(bos.toByteArray))) + msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None, Some(ByteString.copyFrom(bos.toByteArray))) } } - def metricToProto(metric: Metric): msg.NodeMetrics.Metric = { + def metricToProto(metric: Metric): msg.NodeMetrics.Metric = msg.NodeMetrics.Metric(mapName(metric.name), numberToProto(metric.value), ewmaToProto(metric.average)) - } - def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics = { - msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp, - nodeMetrics.metrics.map(metricToProto).to[Vector]) - } + def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics = + msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricToProto)(breakOut)) - val nodeMetrics = mgossip.nodes.map(nodeMetricsToProto).to[Vector] + val nodeMetrics: Vector[msg.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto)(breakOut) msg.MetricsGossipEnvelope(addressToProto(envelope.from), msg.MetricsGossip(allAddresses.map(addressToProto), allMetricNames, nodeMetrics), envelope.reply) } - private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = { + private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(decompress(bytes))) - } private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = { val mgossip = envelope.gossip val addressMapping = mgossip.allAddresses.map(addressFromProto) val metricNameMapping = mgossip.allMetricNames - def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = { - ewma.map(x ⇒ EWMA(x.value, x.alpha)) - } + def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = ewma.map(x ⇒ EWMA(x.value, x.alpha)) def numberFromProto(number: msg.NodeMetrics.Number): Number = { import msg.NodeMetrics.Number @@ -323,27 +282,22 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ case Number(NumberType.Float, Some(n), _, _) ⇒ jl.Float.intBitsToFloat(n) case Number(NumberType.Integer, Some(n), _, _) ⇒ n case Number(NumberType.Serialized, _, _, Some(b)) ⇒ - val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, - new ByteArrayInputStream(b.toByteArray)) + val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new ByteArrayInputStream(b.toByteArray)) val obj = in.readObject in.close() obj.asInstanceOf[jl.Number] } } - def metricFromProto(metric: msg.NodeMetrics.Metric): Metric = { + def metricFromProto(metric: msg.NodeMetrics.Metric): Metric = Metric(metricNameMapping(metric.nameIndex), numberFromProto(metric.number), ewmaFromProto(metric.ewma)) - } - def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics = { - NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp, - nodeMetrics.metrics.map(metricFromProto).toSet) - } + def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics = + NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp, nodeMetrics.metrics.map(metricFromProto)(breakOut)) - val nodeMetrics = mgossip.nodeMetrics.map(nodeMetricsFromProto).toSet + val nodeMetrics: Set[NodeMetrics] = mgossip.nodeMetrics.map(nodeMetricsFromProto)(breakOut) - MetricsGossipEnvelope(addressFromProto(envelope.from), - MetricsGossip(nodeMetrics), envelope.reply) + MetricsGossipEnvelope(addressFromProto(envelope.from), MetricsGossip(nodeMetrics), envelope.reply) } }