!clu #3657 Lazy deserialization and TTL of Gossip message payload

This commit is contained in:
Patrik Nordwall 2013-10-17 10:35:37 +02:00
parent beff53f0a6
commit 7d5a3ec30b
9 changed files with 169 additions and 219 deletions

View file

@ -19,6 +19,7 @@ import com.google.protobuf.MessageLite
import scala.annotation.tailrec
import akka.cluster.protobuf.msg.{ ClusterMessages cm }
import scala.collection.JavaConverters._
import scala.concurrent.duration.Deadline
/**
* Protobuf serializer of cluster messages.
@ -26,6 +27,8 @@ import scala.collection.JavaConverters._
class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer {
private final val BufferSize = 1024 * 4
// must be lazy because serializer is initialized from Cluster extension constructor
private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] AnyRef](
classOf[InternalClusterAction.Join] -> {
@ -58,7 +61,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatReceiver.Heartbeat(from) addressToProtoByteArray(from)
case m: GossipEnvelope compress(gossipEnvelopeToProto(m))
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
@ -209,8 +212,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
}
private def gossipEnvelopeToProto(envelope: GossipEnvelope): cm.GossipEnvelope =
cm.GossipEnvelope.newBuilder().setFrom(uniqueAddressToProto(envelope.from)).setTo(uniqueAddressToProto(envelope.to)).
setGossip(gossipToProto(envelope.gossip)).build
cm.GossipEnvelope.newBuilder().
setFrom(uniqueAddressToProto(envelope.from)).
setTo(uniqueAddressToProto(envelope.to)).
setSerializedGossip(ByteString.copyFrom(compress(gossipToProto(envelope.gossip).build))).
build
private def gossipStatusToProto(status: GossipStatus): cm.GossipStatus = {
val allHashes = status.version.versions.keys.toVector
@ -220,7 +226,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
}
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope =
gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(decompress(bytes)))
gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(bytes))
private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus =
gossipStatusFromProto(cm.GossipStatus.parseFrom(bytes))
@ -266,9 +272,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
v (VectorClock.Node.fromHash(hashMapping(v.getHashIndex)), v.getTimestamp))(breakOut))
}
private def gossipEnvelopeFromProto(envelope: cm.GossipEnvelope): GossipEnvelope =
private def gossipEnvelopeFromProto(envelope: cm.GossipEnvelope): GossipEnvelope = {
val serializedGossip = envelope.getSerializedGossip
GossipEnvelope(uniqueAddressFromProto(envelope.getFrom), uniqueAddressFromProto(envelope.getTo),
gossipFromProto(envelope.getGossip))
Deadline.now + GossipTimeToLive, () gossipFromProto(cm.Gossip.parseFrom(decompress(serializedGossip.toByteArray))))
}
private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus =
GossipStatus(uniqueAddressFromProto(status.getFrom), vectorClockFromProto(status.getVersion,