diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index ebd1440b9d..661cfdc401 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -4,31 +4,19 @@ package akka.cluster.metrics.protobuf -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream -import java.io.ObjectOutputStream +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream } +import java.util.zip.{ GZIPInputStream, GZIPOutputStream } import java.{ lang ⇒ jl } -import java.util.zip.GZIPInputStream -import java.util.zip.GZIPOutputStream -import scala.annotation.tailrec -import scala.collection.JavaConverters.asJavaIterableConverter -import scala.collection.JavaConverters.asScalaBufferConverter -import scala.collection.JavaConverters.setAsJavaSetConverter -import scala.collection.breakOut -import com.google.protobuf.ByteString -import com.google.protobuf.MessageLite -import akka.actor.Address -import akka.actor.ExtendedActorSystem -import akka.cluster.metrics.EWMA -import akka.cluster.metrics.Metric -import akka.cluster.metrics.MetricsGossip -import akka.cluster.metrics.NodeMetrics + +import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm } +import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics } import akka.serialization.Serializer import akka.util.ClassLoaderObjectInputStream -import akka.cluster.metrics.ClusterMetricsMessage -import akka.cluster.metrics.MetricsGossipEnvelope -import akka.cluster.metrics.ClusterMetricsSettings +import com.google.protobuf.{ ByteString, MessageLite } + +import scala.annotation.tailrec +import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter } /** * Protobuf serializer for [[ClusterMetricsMessage]] types. @@ -83,17 +71,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { case _ ⇒ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics") } - private def addressFromBinary(bytes: Array[Byte]): Address = - addressFromProto(cm.Address.parseFrom(bytes)) - private def addressToProto(address: Address): cm.Address.Builder = address match { - case Address(protocol, system, Some(host), Some(port)) ⇒ - cm.Address.newBuilder().setSystem(system).setHostname(host).setPort(port).setProtocol(protocol) - case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") + case Address(protocol, actorSystem, Some(host), Some(port)) ⇒ + cm.Address.newBuilder().setSystem(actorSystem).setHostname(host).setPort(port).setProtocol(protocol) + case _ ⇒ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.") } - private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray - @volatile private var protocolCache: String = null @volatile @@ -124,16 +107,16 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match { case Some(x) ⇒ x - case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message") + case _ ⇒ throw new IllegalArgumentException(s"Unknown $unknown [$value] in cluster message") } private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = { - val mgossip = envelope.gossip - val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address) + import scala.collection.breakOut + val allNodeMetrics = envelope.gossip.nodes + val allAddresses: Vector[Address] = allNodeMetrics.map(_.address)(breakOut) val addressMapping = allAddresses.zipWithIndex.toMap - val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name)) + val allMetricNames: Vector[String] = allNodeMetrics.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name)).toVector val metricNamesMapping = allMetricNames.zipWithIndex.toMap - def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address") @@ -148,7 +131,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { case n: jl.Double ⇒ Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n)) case n: jl.Long ⇒ Number.newBuilder().setType(NumberType.Long).setValue64(n) case n: jl.Float ⇒ Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n)) - case n: jl.Integer ⇒ Number.newBuilder().setType(NumberType.Integer) setValue32 (n) + case n: jl.Integer ⇒ Number.newBuilder().setType(NumberType.Integer).setValue32(n) case _ ⇒ val bos = new ByteArrayOutputStream val out = new ObjectOutputStream(bos) @@ -160,14 +143,14 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = { val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value)) - ewmaToProto(metric.average).map(builder.setEwma(_)).getOrElse(builder) + ewmaToProto(metric.average).map(builder.setEwma).getOrElse(builder) } def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder = cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp). addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava) - val nodeMetrics: Iterable[cm.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto(_).build) + val nodeMetrics: Iterable[cm.NodeMetrics] = allNodeMetrics.map(nodeMetricsToProto(_).build) cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip( cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava). @@ -179,6 +162,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes))) private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = { + import scala.collection.breakOut val mgossip = envelope.getGossip val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut) val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector @@ -187,7 +171,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { Some(EWMA(ewma.getValue, ewma.getAlpha)) def numberFromProto(number: cm.NodeMetrics.Number): Number = { - import cm.NodeMetrics.Number import cm.NodeMetrics.NumberType number.getType.getNumber match { case NumberType.Double_VALUE ⇒ jl.Double.longBitsToDouble(number.getValue64) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 7aed37b6b6..c6de909716 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -3,22 +3,20 @@ */ package akka.cluster.protobuf -import akka.serialization.Serializer -import akka.cluster._ -import scala.collection.breakOut -import akka.actor.{ ExtendedActorSystem, Address } -import scala.Some -import scala.collection.immutable -import java.io.{ ByteArrayInputStream, ObjectOutputStream, ByteArrayOutputStream } -import com.google.protobuf.ByteString -import akka.util.ClassLoaderObjectInputStream +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream } +import java.util.zip.{ GZIPInputStream, GZIPOutputStream } import java.{ lang ⇒ jl } -import java.util.zip.GZIPOutputStream -import java.util.zip.GZIPInputStream -import com.google.protobuf.MessageLite -import scala.annotation.tailrec + +import akka.actor.{ Address, ExtendedActorSystem } +import akka.cluster._ import akka.cluster.protobuf.msg.{ ClusterMessages ⇒ cm } +import akka.serialization.Serializer +import akka.util.ClassLoaderObjectInputStream +import com.google.protobuf.{ ByteString, MessageLite } + +import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.immutable import scala.concurrent.duration.Deadline /** @@ -113,9 +111,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ uniqueAddressFromProto(cm.UniqueAddress.parseFrom(bytes)) private def addressToProto(address: Address): cm.Address.Builder = address match { - case Address(protocol, system, Some(host), Some(port)) ⇒ - cm.Address.newBuilder().setSystem(system).setHostname(host).setPort(port).setProtocol(protocol) - case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") + case Address(protocol, actorSystem, Some(host), Some(port)) ⇒ + cm.Address.newBuilder().setSystem(actorSystem).setHostname(host).setPort(port).setProtocol(protocol) + case _ ⇒ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.") } private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray @@ -177,7 +175,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match { case Some(x) ⇒ x - case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message") + case _ ⇒ throw new IllegalArgumentException(s"Unknown $unknown [$value] in cluster message") } private def joinToProto(node: UniqueAddress, roles: Set[String]): cm.Join = @@ -187,7 +185,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ cm.Welcome.newBuilder().setFrom(uniqueAddressToProto(from)).setGossip(gossipToProto(gossip)).build() private def gossipToProto(gossip: Gossip): cm.Gossip.Builder = { - import scala.collection.breakOut val allMembers = gossip.members.toVector val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress) val addressMapping = allAddresses.zipWithIndex.toMap @@ -293,6 +290,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def vectorClockFromProto(version: cm.VectorClock, hashMapping: immutable.Seq[String]) = { + import scala.collection.breakOut VectorClock(version.getVersionsList.asScala.map( v ⇒ (VectorClock.Node.fromHash(hashMapping(v.getHashIndex)), v.getTimestamp))(breakOut)) } @@ -308,12 +306,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ status.getAllHashesList.asScala.toVector)) private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = { - val mgossip = envelope.gossip - val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address) + import scala.collection.breakOut + val allNodeMetrics = envelope.gossip.nodes + val allAddresses: Vector[Address] = allNodeMetrics.map(_.address)(breakOut) val addressMapping = allAddresses.zipWithIndex.toMap - val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name)) + val allMetricNames: Vector[String] = allNodeMetrics.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.iterator.map(_.name)).toVector val metricNamesMapping = allMetricNames.zipWithIndex.toMap - def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address") @@ -328,7 +326,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ case n: jl.Double ⇒ Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n)) case n: jl.Long ⇒ Number.newBuilder().setType(NumberType.Long).setValue64(n) case n: jl.Float ⇒ Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n)) - case n: jl.Integer ⇒ Number.newBuilder().setType(NumberType.Integer) setValue32 (n) + case n: jl.Integer ⇒ Number.newBuilder().setType(NumberType.Integer).setValue32(n) case _ ⇒ val bos = new ByteArrayOutputStream val out = new ObjectOutputStream(bos) @@ -340,14 +338,14 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = { val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value)) - ewmaToProto(metric.average).map(builder.setEwma(_)).getOrElse(builder) + ewmaToProto(metric.average).map(builder.setEwma).getOrElse(builder) } def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder = cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp). addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava) - val nodeMetrics: Iterable[cm.NodeMetrics] = mgossip.nodes.map(nodeMetricsToProto(_).build) + val nodeMetrics: Iterable[cm.NodeMetrics] = allNodeMetrics.map(nodeMetricsToProto(_).build) cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip( cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava). @@ -359,6 +357,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes))) private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = { + import scala.collection.breakOut val mgossip = envelope.getGossip val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut) val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector @@ -367,7 +366,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ Some(EWMA(ewma.getValue, ewma.getAlpha)) def numberFromProto(number: cm.NodeMetrics.Number): Number = { - import cm.NodeMetrics.Number import cm.NodeMetrics.NumberType number.getType.getNumber match { case NumberType.Double_VALUE ⇒ jl.Double.longBitsToDouble(number.getValue64)