diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index c11fbc3a3a..5248bb6b4b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -13,9 +13,15 @@ import java.io.{ ByteArrayInputStream, ObjectOutputStream, ByteArrayOutputStream import com.google.protobuf.ByteString import akka.util.ClassLoaderObjectInputStream import java.{ lang ⇒ jl } +import java.util.zip.GZIPOutputStream +import java.util.zip.GZIPInputStream +import com.google.protobuf.MessageLite +import scala.annotation.tailrec class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { + private final val BufferSize = 1024 * 4 + private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] ⇒ AnyRef]( classOf[InternalClusterAction.Join] -> { case bytes ⇒ @@ -24,7 +30,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ }, classOf[InternalClusterAction.Welcome] -> { case bytes ⇒ - val m = msg.Welcome.defaultInstance.mergeFrom(bytes) + val m = msg.Welcome.defaultInstance.mergeFrom(decompress(bytes)) InternalClusterAction.Welcome(uniqueAddressFromProto(m.from), gossipFromProto(m.gossip)) }, classOf[ClusterUserAction.Leave] -> (bytes ⇒ ClusterUserAction.Leave(addressFromBinary(bytes))), @@ -45,40 +51,66 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def identifier = 5 - def toBinary(obj: AnyRef): Array[Byte] = (obj match { - case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ - addressToProto(from) - case m: GossipEnvelope ⇒ - gossipEnvelopeToProto(m) - case m: GossipStatus ⇒ - gossipStatusToProto(m) - case m: MetricsGossipEnvelope ⇒ - metricsGossipEnvelopeToProto(m) - case InternalClusterAction.Join(node, roles) ⇒ - msg.Join(uniqueAddressToProto(node), roles.map(identity)(breakOut): Vector[String]) - case InternalClusterAction.Welcome(from, gossip) ⇒ - msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)) - case ClusterUserAction.Leave(address) ⇒ - addressToProto(address) - case ClusterUserAction.Down(address) ⇒ - addressToProto(address) - case InternalClusterAction.InitJoin ⇒ - msg.Empty() - case InternalClusterAction.InitJoinAck(address) ⇒ - addressToProto(address) - case InternalClusterAction.InitJoinNack(address) ⇒ - addressToProto(address) - case ClusterLeaderAction.Exit(node) ⇒ - uniqueAddressToProto(node) - case ClusterLeaderAction.Shutdown(node) ⇒ - uniqueAddressToProto(node) - case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ - addressToProto(from) - case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ - addressToProto(from) - case _ ⇒ - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") - }).toByteArray + def toBinary(obj: AnyRef): Array[Byte] = + obj match { + case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ + addressToProto(from).toByteArray + case m: GossipEnvelope ⇒ + compress(gossipEnvelopeToProto(m)) + case m: GossipStatus ⇒ + gossipStatusToProto(m).toByteArray + case m: MetricsGossipEnvelope ⇒ + compress(metricsGossipEnvelopeToProto(m)) + case InternalClusterAction.Join(node, roles) ⇒ + msg.Join(uniqueAddressToProto(node), roles.map(identity)(breakOut): Vector[String]).toByteArray + case InternalClusterAction.Welcome(from, gossip) ⇒ + compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip))) + case ClusterUserAction.Leave(address) ⇒ + addressToProto(address).toByteArray + case ClusterUserAction.Down(address) ⇒ + addressToProto(address).toByteArray + case InternalClusterAction.InitJoin ⇒ + msg.Empty().toByteArray + case InternalClusterAction.InitJoinAck(address) ⇒ + addressToProto(address).toByteArray + case InternalClusterAction.InitJoinNack(address) ⇒ + addressToProto(address).toByteArray + case ClusterLeaderAction.Exit(node) ⇒ + uniqueAddressToProto(node).toByteArray + case ClusterLeaderAction.Shutdown(node) ⇒ + uniqueAddressToProto(node).toByteArray + case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ + addressToProto(from).toByteArray + case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ + addressToProto(from).toByteArray + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + } + + def compress(msg: MessageLite): Array[Byte] = { + val bos = new ByteArrayOutputStream(BufferSize) + val zip = new GZIPOutputStream(bos) + msg.writeTo(zip) + zip.close() + bos.toByteArray + } + + def decompress(bytes: Array[Byte]): Array[Byte] = { + val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) + val out = new ByteArrayOutputStream() + val buffer = new Array[Byte](BufferSize) + + @tailrec def readChunk(): Unit = { + val n = in.read(buffer) + if (n != -1) { + out.write(buffer, 0, n) + readChunk() + } + } + readChunk() + + out.toByteArray + } def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { @@ -182,7 +214,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = { - gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes)) + gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(decompress(bytes))) } private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = { @@ -276,7 +308,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope = { - metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(bytes)) + metricsGossipEnvelopeFromProto(msg.MetricsGossipEnvelope.defaultInstance.mergeFrom(decompress(bytes))) } private def metricsGossipEnvelopeFromProto(envelope: msg.MetricsGossipEnvelope): MetricsGossipEnvelope = {