diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto new file mode 100644 index 0000000000..ebbb04baf7 --- /dev/null +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -0,0 +1,224 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +option java_package = "akka.cluster.protobuf.msg"; +option optimize_for = SPEED; + +/**************************************** + * Cluster User Messages + ****************************************/ + +/** + * Join + */ +message Join { + required Address address = 1; + repeated string roles = 2; +} + +/** + * Leave + * Sends an Address + */ + +/** + * Down + * Sends an Address + */ + +/**************************************** + * Internal Cluster Action Messages + ****************************************/ + +/** + * InitJoin + * Sends Empty + */ + +/** + * InitJoinAck + * Sends an Address + */ + +/** + * InitJoinNack + * Sends an Address + */ + +/**************************************** + * Cluster Leader Action Messages + ****************************************/ + +/** + * Exit + * Sends an Address + */ + +/** + * Remove + * Sends an Address + */ + + +/**************************************** + * Cluster Heartbeat Messages + ****************************************/ + +/** + * Heartbeat + * Sends an Address + */ + +/** + * EndHeartbeat + * Sends an Address + */ + +/** + * HeartbeatRequest + * Sends an Address + */ + +/**************************************** + * Cluster Gossip Messages + ****************************************/ + +/** + * Gossip Envelope + */ +message GossipEnvelope { + required Address from = 1; + required Gossip gossip = 2; + required bool conversation = 3; +} + +/** + * Gossip + */ +message Gossip { + repeated Address allAddresses = 1; + repeated string allRoles = 2; + repeated string allHashes = 3; + repeated Member members = 4; + required GossipOverview overview = 5; + required VectorClock version = 6; +} + +/** + * Gossip Overview + */ +message GossipOverview { + message Seen { + required int32 addressIndex = 1; + required VectorClock version = 2; + } + repeated Seen seen = 1; + repeated Member unreachable = 2; +} + +/** + * Member + */ +message Member { + required int32 addressIndex = 1; + required MemberStatus status = 2; + repeated int32 rolesIndexes = 3 [packed = true]; +} + +/** + * Member Status + */ +enum MemberStatus { + Joining = 0; + Up = 1; + Leaving = 2; + Exiting = 3; + Down = 4; + Removed = 5; +} + +/** + * Vector Clock + */ +message VectorClock { + message Version { + required int32 hashIndex = 1; + required int64 timestamp = 2; + } + required int64 timestamp = 1; + repeated Version versions = 2; +} + +/**************************************** + * Metrics Gossip Messages + ****************************************/ + +/** + * Metrics Gossip Envelope + */ +message MetricsGossipEnvelope { + required Address from = 1; + required MetricsGossip gossip = 2; + required bool reply = 3; +} + +/** + * Metrics Gossip + */ +message MetricsGossip { + repeated Address allAddresses = 1; + repeated string allMetricNames = 2; + repeated NodeMetrics nodeMetrics = 3; +} + +/** + * Node Metrics + */ +message NodeMetrics { + enum NumberType { + Serialized = 0; + Double = 1; + Float = 2; + Integer = 3; + Long = 4; + } + message Number { + required NumberType type = 1; + optional uint32 value32 = 2; + optional uint64 value64 = 3; + optional bytes serialized = 4; + } + message EWMA { + required double value = 1; + required double alpha = 2; + } + message Metric { + required int32 nameIndex = 1; + required Number number = 2; + optional EWMA ewma = 3; + } + required int32 addressIndex = 1; + required int64 timestamp = 2; + repeated Metric metrics = 3; +} + +/**************************************** + * Common Datatypes and Messages + ****************************************/ + +/** + * An empty message + */ + message Empty { + } + +/** + * Defines a remote address. + */ +message Address { + required string system = 1; + required string hostname = 2; + required uint32 port = 3; + optional string protocol = 4; +} diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b9918447e3..71b10b6ce6 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -223,4 +223,15 @@ akka { } + # Protobuf serializer for cluster messages + actor { + serializers { + akka-cluster = "akka.cluster.protobuf.ClusterMessageSerializer" + } + + serialization-bindings { + "akka.cluster.ClusterMessage" = akka-cluster + } + } + } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 12f6a90c4b..311bc73072 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -12,20 +12,13 @@ import scala.util.control.NonFatal import java.util.UUID import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.OneForOneStrategy -import akka.actor.Status.Failure import akka.actor.SupervisorStrategy.Stop -import akka.actor.Terminated -import akka.event.EventStream -import akka.pattern.ask -import akka.util.Timeout import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.actor.ActorSelection /** * Base trait for all cluster messages. All ClusterMessage's are serializable. - * - * FIXME Protobuf all ClusterMessages */ trait ClusterMessage extends Serializable @@ -38,16 +31,19 @@ object ClusterUserAction { * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ + @SerialVersionUID(1L) case class Join(address: Address, roles: Set[String]) extends ClusterMessage /** * Command to leave the cluster. */ + @SerialVersionUID(1L) case class Leave(address: Address) extends ClusterMessage /** * Command to mark node as temporary down. */ + @SerialVersionUID(1L) case class Down(address: Address) extends ClusterMessage } @@ -61,7 +57,7 @@ private[cluster] object InternalClusterAction { * Command to initiate join another node (represented by 'address'). * Join will be sent to the other node. */ - case class JoinTo(address: Address) extends ClusterMessage + case class JoinTo(address: Address) /** * Command to initiate the process to join the specified @@ -77,21 +73,24 @@ private[cluster] object InternalClusterAction { * If a node is uninitialized it will reply to `InitJoin` with * `InitJoinNack`. */ - case object JoinSeedNode extends ClusterMessage + case object JoinSeedNode /** * @see JoinSeedNode */ + @SerialVersionUID(1L) case object InitJoin extends ClusterMessage /** * @see JoinSeedNode */ + @SerialVersionUID(1L) case class InitJoinAck(address: Address) extends ClusterMessage /** * @see JoinSeedNode */ + @SerialVersionUID(1L) case class InitJoinNack(address: Address) extends ClusterMessage /** @@ -149,11 +148,13 @@ private[cluster] object ClusterLeaderAction { * Command to mark a node to be removed from the cluster immediately. * Can only be sent by the leader. */ + @SerialVersionUID(1L) case class Exit(address: Address) extends ClusterMessage /** * Command to remove a node from the cluster immediately. */ + @SerialVersionUID(1L) case class Remove(address: Address) extends ClusterMessage } @@ -1010,6 +1011,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with /** * INTERNAL API */ +@SerialVersionUID(1L) private[cluster] case class ClusterStats( receivedGossipCount: Long = 0L, mergeCount: Long = 0L, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 84b174ee99..8a436dbc36 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,15 +5,12 @@ package akka.cluster import language.postfixOps import scala.collection.immutable -import scala.collection.immutable.{ VectorBuilder, SortedSet } +import scala.collection.immutable.VectorBuilder import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream import akka.actor.AddressTerminated -import java.lang.Iterable -import akka.japi.Util.immutableSeq -import akka.util.Collections.EmptyImmutableSeq /** * Domain events published to the event bus. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 34587903e7..6a6b78888b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -181,6 +181,7 @@ private[cluster] object MetricsGossip { * * @param nodes metrics per node */ +@SerialVersionUID(1L) private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) { /** @@ -220,6 +221,7 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) { * INTERNAL API * Envelope adding a sender address to the gossip. */ +@SerialVersionUID(1L) private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean) extends ClusterMessage @@ -267,7 +269,8 @@ object EWMA { * This value is always used as the previous EWMA to calculate the new EWMA. * */ -private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMessage { +@SerialVersionUID(1L) +private[cluster] case class EWMA(value: Double, alpha: Double) { require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0") @@ -296,8 +299,9 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe * @param average the data stream of the metric value, for trending over time. Metrics that are already * averages (e.g. system load average) or finite (e.g. as number of processors), are not trended. */ -case class Metric private (name: String, value: Number, private val average: Option[EWMA]) - extends ClusterMessage with MetricNumericConverter { +@SerialVersionUID(1L) +case class Metric private[cluster] (name: String, value: Number, private[cluster] val average: Option[EWMA]) + extends MetricNumericConverter { require(defined(value), s"Invalid Metric [$name] value [$value]") @@ -378,7 +382,8 @@ object Metric extends MetricNumericConverter { * @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC * @param metrics the set of sampled [[akka.actor.Metric]] */ -case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage { +@SerialVersionUID(1L) +case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) { /** * Returns the most recent data. @@ -473,6 +478,7 @@ object StandardMetrics { * @param max the maximum amount of memory (in bytes) that can be used for JVM memory management. * Can be undefined on some OS. */ + @SerialVersionUID(1L) case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) { require(committed > 0L, "committed heap expected to be > 0 bytes") require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes") @@ -516,6 +522,7 @@ object StandardMetrics { * much more it could theoretically. * @param processors the number of available processors */ + @SerialVersionUID(1L) case class Cpu( address: Address, timestamp: Long, diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index b29a61a623..5fadea4286 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -55,12 +55,12 @@ private[cluster] object Gossip { * `Removed` by removing it from the `members` set and sending a `Removed` command to the * removed node telling it to shut itself down. */ +@SerialVersionUID(1L) private[cluster] case class Gossip( members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address overview: GossipOverview = GossipOverview(), version: VectorClock = VectorClock()) // vector clock version - extends ClusterMessage // is a serializable cluster message - with Versioned[Gossip] { + extends Versioned[Gossip] { // FIXME can be disabled as optimization assertInvariants() @@ -226,6 +226,7 @@ private[cluster] case class Gossip( * INTERNAL API * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ +@SerialVersionUID(1L) private[cluster] case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty, unreachable: Set[Member] = Set.empty) { @@ -243,4 +244,5 @@ private[cluster] case class GossipOverview( * INTERNAL API * Envelope adding a sender address to the gossip. */ +@SerialVersionUID(1L) private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 308b0d81f0..d60ce2afc3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -17,7 +17,8 @@ import MemberStatus._ * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus` * and roles. */ -class Member(val address: Address, val status: MemberStatus, val roles: Set[String]) extends ClusterMessage with Serializable { +@SerialVersionUID(1L) +class Member(val address: Address, val status: MemberStatus, val roles: Set[String]) extends Serializable { override def hashCode = address.## override def equals(other: Any) = other match { case m: Member ⇒ address == m.address @@ -123,15 +124,15 @@ object Member { * * Can be one of: Joining, Up, Leaving, Exiting and Down. */ -abstract class MemberStatus extends ClusterMessage +abstract class MemberStatus object MemberStatus { - case object Joining extends MemberStatus - case object Up extends MemberStatus - case object Leaving extends MemberStatus - case object Exiting extends MemberStatus - case object Down extends MemberStatus - case object Removed extends MemberStatus + @SerialVersionUID(1L) case object Joining extends MemberStatus + @SerialVersionUID(1L) case object Up extends MemberStatus + @SerialVersionUID(1L) case object Leaving extends MemberStatus + @SerialVersionUID(1L) case object Exiting extends MemberStatus + @SerialVersionUID(1L) case object Down extends MemberStatus + @SerialVersionUID(1L) case object Removed extends MemberStatus /** * Java API: retrieve the “joining” status singleton diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index 45fa26aa5c..8e53416810 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -72,15 +72,21 @@ object VectorClock { /** * Hash representation of a versioned node name. */ - sealed trait Node extends Serializable + sealed trait Node extends Serializable { + def hash: String + } object Node { + @SerialVersionUID(1L) private case class NodeImpl(name: String) extends Node { override def toString(): String = "Node(" + name + ")" + override def hash: String = name } def apply(name: String): Node = NodeImpl(hash(name)) + def fromHash(hash: String): Node = NodeImpl(hash) + private def hash(name: String): String = { val digester = MessageDigest.getInstance("MD5") digester update name.getBytes("UTF-8") @@ -91,7 +97,8 @@ object VectorClock { /** * Timestamp representation a unique 'Ordered' timestamp. */ - case class Timestamp private (time: Long) extends Ordered[Timestamp] { + @SerialVersionUID(1L) + case class Timestamp(time: Long) extends Ordered[Timestamp] { def max(other: Timestamp) = { if (this < other) other else this @@ -132,6 +139,7 @@ object VectorClock { * * Based on code from the 'vlock' VectorClock library by Coda Hale. */ +@SerialVersionUID(1L) case class VectorClock( timestamp: VectorClock.Timestamp = VectorClock.Timestamp(), versions: Map[VectorClock.Node, VectorClock.Timestamp] = Map.empty[VectorClock.Node, VectorClock.Timestamp]) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala new file mode 100644 index 0000000000..a8bdfb5285 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -0,0 +1,278 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster.protobuf + +import akka.serialization.Serializer +import akka.cluster._ +import scala.collection.breakOut +import akka.actor.{ ExtendedActorSystem, Address } +import scala.Some +import scala.collection.immutable +import java.io.{ ByteArrayInputStream, ObjectOutputStream, ByteArrayOutputStream } +import com.google.protobuf.ByteString +import akka.util.ClassLoaderObjectInputStream +import java.{ lang ⇒ jl } + +class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { + + private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] ⇒ AnyRef]( + classOf[ClusterUserAction.Join] -> { + case bytes ⇒ + val m = msg.Join.defaultInstance.mergeFrom(bytes) + ClusterUserAction.Join(addressFromProto(m.address), m.roles.toSet) + }, + classOf[ClusterUserAction.Leave] -> (bytes ⇒ ClusterUserAction.Leave(addressFromBinary(bytes))), + classOf[ClusterUserAction.Down] -> (bytes ⇒ ClusterUserAction.Down(addressFromBinary(bytes))), + InternalClusterAction.InitJoin.getClass -> (_ ⇒ InternalClusterAction.InitJoin), + classOf[InternalClusterAction.InitJoinAck] -> (bytes ⇒ InternalClusterAction.InitJoinAck(addressFromBinary(bytes))), + classOf[InternalClusterAction.InitJoinNack] -> (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), + classOf[ClusterLeaderAction.Exit] -> (bytes ⇒ ClusterLeaderAction.Exit(addressFromBinary(bytes))), + classOf[ClusterLeaderAction.Remove] -> (bytes ⇒ ClusterLeaderAction.Remove(addressFromBinary(bytes))), + classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), + classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), + classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), + classOf[GossipEnvelope] -> gossipEnvelopeFromBinary, + classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary) + + def includeManifest: Boolean = true + + def identifier = 5 + + def toBinary(obj: AnyRef): Array[Byte] = (obj match { + case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ + addressToProto(from) + case m: GossipEnvelope ⇒ + gossipEnvelopeToProto(m) + case m: MetricsGossipEnvelope ⇒ + metricsGossipEnvelopeToProto(m) + case ClusterUserAction.Join(address, roles) ⇒ + msg.Join(addressToProto(address), roles.map(identity)(breakOut): Vector[String]) + case ClusterUserAction.Leave(address) ⇒ + addressToProto(address) + case ClusterUserAction.Down(address) ⇒ + addressToProto(address) + case InternalClusterAction.InitJoin ⇒ + msg.Empty() + case InternalClusterAction.InitJoinAck(address) ⇒ + addressToProto(address) + case InternalClusterAction.InitJoinNack(address) ⇒ + addressToProto(address) + case ClusterLeaderAction.Exit(address) ⇒ + addressToProto(address) + case ClusterLeaderAction.Remove(address) ⇒ + addressToProto(address) + case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ + addressToProto(from) + case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ + addressToProto(from) + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + }).toByteArray + + def fromBinary(bytes: Array[Byte], + clazz: Option[Class[_]]): AnyRef = { + clazz match { + case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match { + case Some(f) ⇒ f(bytes) + case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class ${c} in ClusterSerializer") + } + case _ ⇒ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer") + } + } + + private def addressFromBinary(bytes: Array[Byte]): Address = { + addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes)) + } + + private def addressToProto(address: Address): msg.Address = { + msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol)) + } + + private def addressFromProto(address: msg.Address): Address = { + Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port) + } + + private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int]( + MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE, + MemberStatus.Up -> msg.MemberStatus.Up_VALUE, + MemberStatus.Leaving -> msg.MemberStatus.Leaving_VALUE, + MemberStatus.Exiting -> msg.MemberStatus.Exiting_VALUE, + MemberStatus.Down -> msg.MemberStatus.Down_VALUE, + MemberStatus.Removed -> msg.MemberStatus.Removed_VALUE) + + private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) } + + private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match { + case Some(x) ⇒ x + case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message") + } + + private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { + val gossip = envelope.gossip + val allMembers = List(gossip.members, gossip.overview.unreachable).flatMap(identity) + val allAddresses = allMembers.map(_.address).to[Vector] + val addressMapping = allAddresses.zipWithIndex.toMap + val allRoles = allMembers.flatMap(_.roles).to[Vector] + val roleMapping = allRoles.zipWithIndex.toMap + val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.map(_.hash).toSet) { + case (s, VectorClock(t, v)) ⇒ s ++ v.keys.map(_.hash) + }.to[Vector] + val hashMapping = allHashes.zipWithIndex.toMap + + def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") + def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role") + def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") + + def memberToProto(member: Member) = { + msg.Member(mapAddress(member.address), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) + } + + def vectorClockToProto(version: VectorClock) = { + msg.VectorClock(version.timestamp.time, + version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) + } + + def seenToProto(seen: (Address, VectorClock)) = seen match { + case (address: Address, version: VectorClock) ⇒ + msg.GossipOverview.Seen(mapAddress(address), vectorClockToProto(version)) + } + + val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector] + val members = gossip.members.toSeq.map(memberToProto).to[Vector] + val seen = gossip.overview.seen.map(seenToProto).to[Vector] + + val overview = msg.GossipOverview(seen, unreachable) + + msg.GossipEnvelope(addressToProto(envelope.from), msg.Gossip(allAddresses.map(addressToProto), + allRoles, allHashes, members, overview, vectorClockToProto(gossip.version)), + envelope.conversation) + } + + private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = { + gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes)) + } + + private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { + val gossip = envelope.gossip + val addressMapping = gossip.allAddresses.map(addressFromProto) + val roleMapping = gossip.allRoles + val hashMapping = gossip.allHashes + + def memberFromProto(member: msg.Member) = { + Member(addressMapping(member.addressIndex), memberStatusFromInt(member.status.id), + member.rolesIndexes.map(roleMapping).to[Set]) + } + + def vectorClockFromProto(version: msg.VectorClock) = { + VectorClock(VectorClock.Timestamp(version.timestamp), + version.versions.map { + case msg.VectorClock.Version(h, t) ⇒ + (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) + }.toMap) + } + + def seenFromProto(seen: msg.GossipOverview.Seen) = + (addressMapping(seen.addressIndex), vectorClockFromProto(seen.version)) + + val members = gossip.members.map(memberFromProto).to[immutable.SortedSet] + val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet + val seen = gossip.overview.seen.map(seenFromProto).toMap + val overview = GossipOverview(seen, unreachable) + + GossipEnvelope(addressFromProto(envelope.from), Gossip(members, overview, vectorClockFromProto(gossip.version))) + } + + private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = { + val mgossip = envelope.gossip + val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address).to[Vector] + val addressMapping = allAddresses.zipWithIndex.toMap + val allMetricNames = mgossip.nodes.foldLeft(Set.empty[String])((s, n) ⇒ s ++ n.metrics.map(_.name)).to[Vector] + val metricNamesMapping = allMetricNames.zipWithIndex.toMap + + def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") + def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address") + + def ewmaToProto(ewma: Option[EWMA]): Option[msg.NodeMetrics.EWMA] = { + ewma.map(x ⇒ msg.NodeMetrics.EWMA(x.value, x.alpha)) + } + + def numberToProto(number: Number): msg.NodeMetrics.Number = { + import msg.NodeMetrics.Number + import msg.NodeMetrics.NumberType + number match { + case n: jl.Double ⇒ Number(NumberType.Double, None, Some(jl.Double.doubleToLongBits(n)), None) + case n: jl.Long ⇒ Number(NumberType.Long, None, Some(n), None) + case n: jl.Float ⇒ Number(NumberType.Float, Some(jl.Float.floatToIntBits(n)), None, None) + case n: jl.Integer ⇒ Number(NumberType.Integer, Some(n), None, None) + case _ ⇒ + val bos = new ByteArrayOutputStream + val out = new ObjectOutputStream(bos) + out.writeObject(number) + out.close() + msg.NodeMetrics.Number(msg.NodeMetrics.NumberType.Serialized, None, None, + Some(ByteString.copyFrom(bos.toByteArray))) + } + } + + def metricToProto(metric: Metric): msg.NodeMetrics.Metric = { + msg.NodeMetrics.Metric(mapName(metric.name), numberToProto(metric.value), ewmaToProto(metric.average)) + } + + def nodeMetricsToProto(nodeMetrics: NodeMetrics): msg.NodeMetrics = { + msg.NodeMetrics(mapAddress(nodeMetrics.address), nodeMetrics.timestamp, + nodeMetrics.metrics.map(metricToProto).to[Vector]) + } + + val nodeMetrics = mgossip.nodes.map(nodeMetricsToProto).to[Vector] + + msg.MetricsGossipEnvelope(addressToProto(envelope.from), + msg.MetricsGossip(allAddresses.map(addressToProto), allMetricNames, nodeMetrics), envelope.reply) + } + + private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = { + metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(bytes)) + } + + private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = { + val mgossip = envelope.gossip + val addressMapping = mgossip.allAddresses.map(addressFromProto) + val metricNameMapping = mgossip.allMetricNames + + def ewmaFromProto(ewma: Option[msg.NodeMetrics.EWMA]): Option[EWMA] = { + ewma.map(x ⇒ EWMA(x.value, x.alpha)) + } + + def numberFromProto(number: msg.NodeMetrics.Number): Number = { + import msg.NodeMetrics.Number + import msg.NodeMetrics.NumberType + number match { + case Number(NumberType.Double, _, Some(n), _) ⇒ jl.Double.longBitsToDouble(n) + case Number(NumberType.Long, _, Some(n), _) ⇒ n + case Number(NumberType.Float, Some(n), _, _) ⇒ jl.Float.intBitsToFloat(n) + case Number(NumberType.Integer, Some(n), _, _) ⇒ n + case Number(NumberType.Serialized, _, _, Some(b)) ⇒ + val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, + new ByteArrayInputStream(b.toByteArray)) + val obj = in.readObject + in.close() + obj.asInstanceOf[jl.Number] + } + } + + def metricFromProto(metric: msg.NodeMetrics.Metric): Metric = { + Metric(metricNameMapping(metric.nameIndex), numberFromProto(metric.number), ewmaFromProto(metric.ewma)) + } + + def nodeMetricsFromProto(nodeMetrics: msg.NodeMetrics): NodeMetrics = { + NodeMetrics(addressMapping(nodeMetrics.addressIndex), nodeMetrics.timestamp, + nodeMetrics.metrics.map(metricFromProto).toSet) + } + + val nodeMetrics = mgossip.nodeMetrics.map(nodeMetricsFromProto).toSet + + MetricsGossipEnvelope(addressFromProto(envelope.from), + MetricsGossip(nodeMetrics), envelope.reply) + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala new file mode 100644 index 0000000000..ba2d885395 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster.protobuf + +import akka.cluster._ +import akka.actor.{ ExtendedActorSystem, Address } +import collection.immutable.SortedSet +import akka.testkit.AkkaSpec +import java.math.BigInteger + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterMessageSerializerSpec extends AkkaSpec { + + val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) + + def checkSerialization(obj: AnyRef): Unit = { + val blob = serializer.toBinary(obj) + val ref = serializer.fromBinary(blob, obj.getClass) + ref must be(obj) + } + + import MemberStatus._ + + val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty) + val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1")) + val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set("r2")) + val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1", "r2")) + val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3")) + val f1 = Member(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r2", "r3")) + + "ClusterMessages" must { + + "be serializable" in { + val address = Address("akka.tcp", "system", "some.host.org", 4711) + checkSerialization(ClusterUserAction.Join(address, Set("foo", "bar"))) + checkSerialization(ClusterUserAction.Leave(address)) + checkSerialization(ClusterUserAction.Down(address)) + checkSerialization(InternalClusterAction.InitJoin) + checkSerialization(InternalClusterAction.InitJoinAck(address)) + checkSerialization(InternalClusterAction.InitJoinNack(address)) + checkSerialization(ClusterLeaderAction.Exit(address)) + checkSerialization(ClusterLeaderAction.Remove(address)) + checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address)) + checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address)) + checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address)) + + val node1 = VectorClock.Node("node1") + val node2 = VectorClock.Node("node2") + val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1).seen(a1.address).seen(b1.address) + val g2 = (g1 :+ node2).seen(a1.address).seen(c1.address) + checkSerialization(GossipEnvelope(a1.address, g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))))) + + val mg = MetricsGossip(Set(NodeMetrics(a1.address, 4711, Set(Metric("foo", 1.2, None))), + NodeMetrics(b1.address, 4712, Set(Metric("foo", 2.1, Some(EWMA(value = 100.0, alpha = 0.18))), + Metric("bar1", Double.MinPositiveValue, None), + Metric("bar2", Float.MaxValue, None), + Metric("bar3", Int.MaxValue, None), + Metric("bar4", Long.MaxValue, None), + Metric("bar5", BigInt(Long.MaxValue), None))))) + checkSerialization(MetricsGossipEnvelope(a1.address, mg, true)) + } + } +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 47300a0813..672166afa2 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -26,6 +26,7 @@ import java.nio.charset.Charset import java.util.Properties import annotation.tailrec import Unidoc.{ JavaDoc, javadocSettings, junidocSources, sunidoc, unidocExclude } +import scalabuff.ScalaBuffPlugin._ object AkkaBuild extends Build { System.setProperty("akka.mode", "test") // Is there better place for this? @@ -180,7 +181,8 @@ object AkkaBuild extends Build { id = "akka-cluster-experimental", base = file("akka-cluster"), dependencies = Seq(remote, remoteTests % "test->test" , testkit % "test->test"), - settings = defaultSettings ++ scaladocSettings ++ javadocSettings ++ multiJvmSettings ++ OSGi.cluster ++ experimentalSettings ++ Seq( + settings = defaultSettings ++ scaladocSettings ++ javadocSettings ++ multiJvmSettings ++ OSGi.cluster ++ experimentalSettings ++ + scalabuffSettings ++ Seq( libraryDependencies ++= Dependencies.cluster, // disable parallel tests parallelExecution in Test := false, @@ -190,7 +192,7 @@ object AkkaBuild extends Build { scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, previousArtifact := akkaPreviousArtifact("akka-cluster-experimental") ) - ) configs (MultiJvm) + ) configs (MultiJvm, ScalaBuff) lazy val slf4j = Project( id = "akka-slf4j", diff --git a/project/plugins.sbt b/project/plugins.sbt index ba3f4b7f33..ead12a47f3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -15,3 +15,5 @@ addSbtPlugin("com.typesafe.sbtosgi" % "sbtosgi" % "0.3.0") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.3") addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.2") + +addSbtPlugin("com.github.sbt" %% "sbt-scalabuff" % "0.2")