diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index 757fbffd88..a0a0c305aa 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -1717,19 +1717,15 @@ public final class ClusterMessages { */ akka.cluster.protobuf.msg.ClusterMessages.UniqueAddressOrBuilder getToOrBuilder(); - // required .Gossip gossip = 3; + // required bytes serializedGossip = 3; /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - boolean hasGossip(); + boolean hasSerializedGossip(); /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - akka.cluster.protobuf.msg.ClusterMessages.Gossip getGossip(); - /** - * required .Gossip gossip = 3; - */ - akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder getGossipOrBuilder(); + com.google.protobuf.ByteString getSerializedGossip(); } /** * Protobuf type {@code GossipEnvelope} @@ -1814,16 +1810,8 @@ public final class ClusterMessages { break; } case 26: { - akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder subBuilder = null; - if (((bitField0_ & 0x00000004) == 0x00000004)) { - subBuilder = gossip_.toBuilder(); - } - gossip_ = input.readMessage(akka.cluster.protobuf.msg.ClusterMessages.Gossip.PARSER, extensionRegistry); - if (subBuilder != null) { - subBuilder.mergeFrom(gossip_); - gossip_ = subBuilder.buildPartial(); - } bitField0_ |= 0x00000004; + serializedGossip_ = input.readBytes(); break; } } @@ -1910,32 +1898,26 @@ public final class ClusterMessages { return to_; } - // required .Gossip gossip = 3; - public static final int GOSSIP_FIELD_NUMBER = 3; - private akka.cluster.protobuf.msg.ClusterMessages.Gossip gossip_; + // required bytes serializedGossip = 3; + public static final int SERIALIZEDGOSSIP_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString serializedGossip_; /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public boolean hasGossip() { + public boolean hasSerializedGossip() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public akka.cluster.protobuf.msg.ClusterMessages.Gossip getGossip() { - return gossip_; - } - /** - * required .Gossip gossip = 3; - */ - public akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder getGossipOrBuilder() { - return gossip_; + public com.google.protobuf.ByteString getSerializedGossip() { + return serializedGossip_; } private void initFields() { from_ = akka.cluster.protobuf.msg.ClusterMessages.UniqueAddress.getDefaultInstance(); to_ = akka.cluster.protobuf.msg.ClusterMessages.UniqueAddress.getDefaultInstance(); - gossip_ = akka.cluster.protobuf.msg.ClusterMessages.Gossip.getDefaultInstance(); + serializedGossip_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1950,7 +1932,7 @@ public final class ClusterMessages { memoizedIsInitialized = 0; return false; } - if (!hasGossip()) { + if (!hasSerializedGossip()) { memoizedIsInitialized = 0; return false; } @@ -1962,10 +1944,6 @@ public final class ClusterMessages { memoizedIsInitialized = 0; return false; } - if (!getGossip().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -1980,7 +1958,7 @@ public final class ClusterMessages { output.writeMessage(2, to_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeMessage(3, gossip_); + output.writeBytes(3, serializedGossip_); } getUnknownFields().writeTo(output); } @@ -2001,7 +1979,7 @@ public final class ClusterMessages { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(3, gossip_); + .computeBytesSize(3, serializedGossip_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -2118,7 +2096,6 @@ public final class ClusterMessages { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getFromFieldBuilder(); getToFieldBuilder(); - getGossipFieldBuilder(); } } private static Builder create() { @@ -2139,11 +2116,7 @@ public final class ClusterMessages { toBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); - if (gossipBuilder_ == null) { - gossip_ = akka.cluster.protobuf.msg.ClusterMessages.Gossip.getDefaultInstance(); - } else { - gossipBuilder_.clear(); - } + serializedGossip_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -2192,11 +2165,7 @@ public final class ClusterMessages { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - if (gossipBuilder_ == null) { - result.gossip_ = gossip_; - } else { - result.gossip_ = gossipBuilder_.build(); - } + result.serializedGossip_ = serializedGossip_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2219,8 +2188,8 @@ public final class ClusterMessages { if (other.hasTo()) { mergeTo(other.getTo()); } - if (other.hasGossip()) { - mergeGossip(other.getGossip()); + if (other.hasSerializedGossip()) { + setSerializedGossip(other.getSerializedGossip()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -2235,7 +2204,7 @@ public final class ClusterMessages { return false; } - if (!hasGossip()) { + if (!hasSerializedGossip()) { return false; } @@ -2247,10 +2216,6 @@ public final class ClusterMessages { return false; } - if (!getGossip().isInitialized()) { - - return false; - } return true; } @@ -2507,121 +2472,40 @@ public final class ClusterMessages { return toBuilder_; } - // required .Gossip gossip = 3; - private akka.cluster.protobuf.msg.ClusterMessages.Gossip gossip_ = akka.cluster.protobuf.msg.ClusterMessages.Gossip.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.cluster.protobuf.msg.ClusterMessages.Gossip, akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder, akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder> gossipBuilder_; + // required bytes serializedGossip = 3; + private com.google.protobuf.ByteString serializedGossip_ = com.google.protobuf.ByteString.EMPTY; /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public boolean hasGossip() { + public boolean hasSerializedGossip() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public akka.cluster.protobuf.msg.ClusterMessages.Gossip getGossip() { - if (gossipBuilder_ == null) { - return gossip_; - } else { - return gossipBuilder_.getMessage(); - } + public com.google.protobuf.ByteString getSerializedGossip() { + return serializedGossip_; } /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public Builder setGossip(akka.cluster.protobuf.msg.ClusterMessages.Gossip value) { - if (gossipBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - gossip_ = value; - onChanged(); - } else { - gossipBuilder_.setMessage(value); - } - bitField0_ |= 0x00000004; - return this; - } - /** - * required .Gossip gossip = 3; - */ - public Builder setGossip( - akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder builderForValue) { - if (gossipBuilder_ == null) { - gossip_ = builderForValue.build(); - onChanged(); - } else { - gossipBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000004; - return this; - } - /** - * required .Gossip gossip = 3; - */ - public Builder mergeGossip(akka.cluster.protobuf.msg.ClusterMessages.Gossip value) { - if (gossipBuilder_ == null) { - if (((bitField0_ & 0x00000004) == 0x00000004) && - gossip_ != akka.cluster.protobuf.msg.ClusterMessages.Gossip.getDefaultInstance()) { - gossip_ = - akka.cluster.protobuf.msg.ClusterMessages.Gossip.newBuilder(gossip_).mergeFrom(value).buildPartial(); - } else { - gossip_ = value; - } - onChanged(); - } else { - gossipBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000004; - return this; - } - /** - * required .Gossip gossip = 3; - */ - public Builder clearGossip() { - if (gossipBuilder_ == null) { - gossip_ = akka.cluster.protobuf.msg.ClusterMessages.Gossip.getDefaultInstance(); - onChanged(); - } else { - gossipBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000004); - return this; - } - /** - * required .Gossip gossip = 3; - */ - public akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder getGossipBuilder() { - bitField0_ |= 0x00000004; + public Builder setSerializedGossip(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + serializedGossip_ = value; onChanged(); - return getGossipFieldBuilder().getBuilder(); + return this; } /** - * required .Gossip gossip = 3; + * required bytes serializedGossip = 3; */ - public akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder getGossipOrBuilder() { - if (gossipBuilder_ != null) { - return gossipBuilder_.getMessageOrBuilder(); - } else { - return gossip_; - } - } - /** - * required .Gossip gossip = 3; - */ - private com.google.protobuf.SingleFieldBuilder< - akka.cluster.protobuf.msg.ClusterMessages.Gossip, akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder, akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder> - getGossipFieldBuilder() { - if (gossipBuilder_ == null) { - gossipBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.cluster.protobuf.msg.ClusterMessages.Gossip, akka.cluster.protobuf.msg.ClusterMessages.Gossip.Builder, akka.cluster.protobuf.msg.ClusterMessages.GossipOrBuilder>( - gossip_, - getParentForChildren(), - isClean()); - gossip_ = null; - } - return gossipBuilder_; + public Builder clearSerializedGossip() { + bitField0_ = (bitField0_ & ~0x00000004); + serializedGossip_ = getDefaultInstance().getSerializedGossip(); + onChanged(); + return this; } // @@protoc_insertion_point(builder_scope:GossipEnvelope) @@ -16856,53 +16740,53 @@ public final class ClusterMessages { "\n\025ClusterMessages.proto\"3\n\004Join\022\034\n\004node\030" + "\001 \002(\0132\016.UniqueAddress\022\r\n\005roles\030\002 \003(\t\"@\n\007" + "Welcome\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\027\n" + - "\006gossip\030\002 \002(\0132\007.Gossip\"c\n\016GossipEnvelope" + + "\006gossip\030\002 \002(\0132\007.Gossip\"d\n\016GossipEnvelope" + "\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030\002 \002" + - "(\0132\016.UniqueAddress\022\027\n\006gossip\030\003 \002(\0132\007.Gos" + - "sip\"^\n\014GossipStatus\022\034\n\004from\030\001 \002(\0132\016.Uniq" + - "ueAddress\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version\030" + - "\003 \002(\0132\014.VectorClock\"\257\001\n\006Gossip\022$\n\014allAdd" + - "resses\030\001 \003(\0132\016.UniqueAddress\022\020\n\010allRoles", - "\030\002 \003(\t\022\021\n\tallHashes\030\003 \003(\t\022\030\n\007members\030\004 \003" + - "(\0132\007.Member\022!\n\010overview\030\005 \002(\0132\017.GossipOv" + - "erview\022\035\n\007version\030\006 \002(\0132\014.VectorClock\"S\n" + - "\016GossipOverview\022\014\n\004seen\030\001 \003(\005\0223\n\024observe" + - "rReachability\030\002 \003(\0132\025.ObserverReachabili" + - "ty\"p\n\024ObserverReachability\022\024\n\014addressInd" + - "ex\030\001 \002(\005\022\017\n\007version\030\004 \002(\003\0221\n\023subjectReac" + - "hability\030\002 \003(\0132\024.SubjectReachability\"a\n\023" + - "SubjectReachability\022\024\n\014addressIndex\030\001 \002(" + - "\005\022#\n\006status\030\003 \002(\0162\023.ReachabilityStatus\022\017", - "\n\007version\030\004 \002(\003\"i\n\006Member\022\024\n\014addressInde" + - "x\030\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006status\030\003 \002(" + - "\0162\r.MemberStatus\022\030\n\014rolesIndexes\030\004 \003(\005B\002" + - "\020\001\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 \001(\003\022&\n\010" + - "versions\030\002 \003(\0132\024.VectorClock.Version\032/\n\007" + - "Version\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttimestamp\030" + - "\002 \002(\003\"^\n\025MetricsGossipEnvelope\022\026\n\004from\030\001" + - " \002(\0132\010.Address\022\036\n\006gossip\030\002 \002(\0132\016.Metrics" + - "Gossip\022\r\n\005reply\030\003 \002(\010\"j\n\rMetricsGossip\022\036" + - "\n\014allAddresses\030\001 \003(\0132\010.Address\022\026\n\016allMet", - "ricNames\030\002 \003(\t\022!\n\013nodeMetrics\030\003 \003(\0132\014.No" + - "deMetrics\"\230\003\n\013NodeMetrics\022\024\n\014addressInde" + - "x\030\001 \002(\005\022\021\n\ttimestamp\030\002 \002(\003\022$\n\007metrics\030\003 " + - "\003(\0132\023.NodeMetrics.Metric\032e\n\006Number\022%\n\004ty" + - "pe\030\001 \002(\0162\027.NodeMetrics.NumberType\022\017\n\007val" + - "ue32\030\002 \001(\r\022\017\n\007value64\030\003 \001(\004\022\022\n\nserialize" + - "d\030\004 \001(\014\032$\n\004EWMA\022\r\n\005value\030\001 \002(\001\022\r\n\005alpha\030" + - "\002 \002(\001\032a\n\006Metric\022\021\n\tnameIndex\030\001 \002(\005\022#\n\006nu" + - "mber\030\002 \002(\0132\023.NodeMetrics.Number\022\037\n\004ewma\030" + - "\003 \001(\0132\021.NodeMetrics.EWMA\"J\n\nNumberType\022\016", - "\n\nSerialized\020\000\022\n\n\006Double\020\001\022\t\n\005Float\020\002\022\013\n" + - "\007Integer\020\003\022\010\n\004Long\020\004\"\007\n\005Empty\"K\n\007Address" + - "\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004po" + - "rt\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAddr" + - "ess\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 " + - "\002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020\000" + - "\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*T\n\014Mem" + - "berStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavin" + - "g\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005B" + - "\035\n\031akka.cluster.protobuf.msgH\001" + "(\0132\016.UniqueAddress\022\030\n\020serializedGossip\030\003" + + " \002(\014\"^\n\014GossipStatus\022\034\n\004from\030\001 \002(\0132\016.Uni" + + "queAddress\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version" + + "\030\003 \002(\0132\014.VectorClock\"\257\001\n\006Gossip\022$\n\014allAd" + + "dresses\030\001 \003(\0132\016.UniqueAddress\022\020\n\010allRole", + "s\030\002 \003(\t\022\021\n\tallHashes\030\003 \003(\t\022\030\n\007members\030\004 " + + "\003(\0132\007.Member\022!\n\010overview\030\005 \002(\0132\017.GossipO" + + "verview\022\035\n\007version\030\006 \002(\0132\014.VectorClock\"S" + + "\n\016GossipOverview\022\014\n\004seen\030\001 \003(\005\0223\n\024observ" + + "erReachability\030\002 \003(\0132\025.ObserverReachabil" + + "ity\"p\n\024ObserverReachability\022\024\n\014addressIn" + + "dex\030\001 \002(\005\022\017\n\007version\030\004 \002(\003\0221\n\023subjectRea" + + "chability\030\002 \003(\0132\024.SubjectReachability\"a\n" + + "\023SubjectReachability\022\024\n\014addressIndex\030\001 \002" + + "(\005\022#\n\006status\030\003 \002(\0162\023.ReachabilityStatus\022", + "\017\n\007version\030\004 \002(\003\"i\n\006Member\022\024\n\014addressInd" + + "ex\030\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006status\030\003 \002" + + "(\0162\r.MemberStatus\022\030\n\014rolesIndexes\030\004 \003(\005B" + + "\002\020\001\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 \001(\003\022&\n" + + "\010versions\030\002 \003(\0132\024.VectorClock.Version\032/\n" + + "\007Version\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttimestamp" + + "\030\002 \002(\003\"^\n\025MetricsGossipEnvelope\022\026\n\004from\030" + + "\001 \002(\0132\010.Address\022\036\n\006gossip\030\002 \002(\0132\016.Metric" + + "sGossip\022\r\n\005reply\030\003 \002(\010\"j\n\rMetricsGossip\022" + + "\036\n\014allAddresses\030\001 \003(\0132\010.Address\022\026\n\016allMe", + "tricNames\030\002 \003(\t\022!\n\013nodeMetrics\030\003 \003(\0132\014.N" + + "odeMetrics\"\230\003\n\013NodeMetrics\022\024\n\014addressInd" + + "ex\030\001 \002(\005\022\021\n\ttimestamp\030\002 \002(\003\022$\n\007metrics\030\003" + + " \003(\0132\023.NodeMetrics.Metric\032e\n\006Number\022%\n\004t" + + "ype\030\001 \002(\0162\027.NodeMetrics.NumberType\022\017\n\007va" + + "lue32\030\002 \001(\r\022\017\n\007value64\030\003 \001(\004\022\022\n\nserializ" + + "ed\030\004 \001(\014\032$\n\004EWMA\022\r\n\005value\030\001 \002(\001\022\r\n\005alpha" + + "\030\002 \002(\001\032a\n\006Metric\022\021\n\tnameIndex\030\001 \002(\005\022#\n\006n" + + "umber\030\002 \002(\0132\023.NodeMetrics.Number\022\037\n\004ewma" + + "\030\003 \001(\0132\021.NodeMetrics.EWMA\"J\n\nNumberType\022", + "\016\n\nSerialized\020\000\022\n\n\006Double\020\001\022\t\n\005Float\020\002\022\013" + + "\n\007Integer\020\003\022\010\n\004Long\020\004\"\007\n\005Empty\"K\n\007Addres" + + "s\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004p" + + "ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" + + "ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" + + " \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" + + "\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*T\n\014Me" + + "mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" + + "ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" + + "B\035\n\031akka.cluster.protobuf.msgH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -16926,7 +16810,7 @@ public final class ClusterMessages { internal_static_GossipEnvelope_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GossipEnvelope_descriptor, - new java.lang.String[] { "From", "To", "Gossip", }); + new java.lang.String[] { "From", "To", "SerializedGossip", }); internal_static_GossipStatus_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_GossipStatus_fieldAccessorTable = new diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 5a72e5eaae..eacccafb2a 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -89,7 +89,7 @@ message Welcome { message GossipEnvelope { required UniqueAddress from = 1; required UniqueAddress to = 2; - required Gossip gossip = 3; + required bytes serializedGossip = 3; } /** diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index ea9a208a9d..19c59ba5e1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -67,6 +67,9 @@ akka { # how often should the node send out gossip information? gossip-interval = 1s + + # discard incoming gossip messages if not handled within this duration + gossip-time-to-live = 2s # how often should the leader perform maintenance tasks? leader-actions-interval = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index e73889d6f6..0e80545d59 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -600,7 +600,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val remoteGossip = envelope.gossip val localGossip = latestGossip - if (envelope.to != selfUniqueAddress) { + if (remoteGossip eq Gossip.empty) { + log.debug("Cluster Node [{}] - Ignoring received gossip from [{}] to protect against overload", selfAddress, from) + Ignored + } else if (envelope.to != selfUniqueAddress) { logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) Ignored } else if (!remoteGossip.overview.reachability.isReachable(selfUniqueAddress)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e88d9156e6..b456aa892e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -50,6 +50,9 @@ final class ClusterSettings(val config: Config, val systemName: String) { } val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS) val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS) + val GossipTimeToLive: FiniteDuration = { + Duration(cc.getMilliseconds("gossip-time-to-live"), MILLISECONDS) + } requiring (_ > Duration.Zero, "gossip-time-to-live must be > 0") val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS) val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) val PublishStatsInterval: Duration = { diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 9349d5c424..ea6b672280 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -6,6 +6,8 @@ package akka.cluster import scala.collection.immutable import MemberStatus._ +import akka.cluster.protobuf.ClusterMessageSerializer +import scala.concurrent.duration.Deadline /** * INTERNAL API @@ -218,6 +220,14 @@ private[cluster] case class GossipOverview( s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])" } +object GossipEnvelope { + def apply(from: UniqueAddress, to: UniqueAddress, gossip: Gossip): GossipEnvelope = + new GossipEnvelope(from, to, gossip, null, null) + + def apply(from: UniqueAddress, to: UniqueAddress, serDeadline: Deadline, ser: () ⇒ Gossip): GossipEnvelope = + new GossipEnvelope(from, to, null, serDeadline, ser) +} + /** * INTERNAL API * Envelope adding a sender and receiver address to the gossip. @@ -226,8 +236,36 @@ private[cluster] case class GossipOverview( * the node with same host:port. The `uid` in the `UniqueAddress` is * different in that case. */ -@SerialVersionUID(1L) -private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip) extends ClusterMessage +@SerialVersionUID(2L) +private[cluster] class GossipEnvelope private ( + val from: UniqueAddress, + val to: UniqueAddress, + @volatile var g: Gossip, + serDeadline: Deadline, + @transient @volatile var ser: () ⇒ Gossip) extends ClusterMessage { + + def gossip: Gossip = { + deserialize() + g + } + + private def deserialize(): Unit = { + if ((g eq null) && (ser ne null)) { + if (serDeadline.hasTimeLeft) + g = ser() + else + g = Gossip.empty + ser = null + } + } + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + deserialize() + this + } + +} /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index c37680764a..f01d1d2ee9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -19,6 +19,7 @@ import com.google.protobuf.MessageLite import scala.annotation.tailrec import akka.cluster.protobuf.msg.{ ClusterMessages ⇒ cm } import scala.collection.JavaConverters._ +import scala.concurrent.duration.Deadline /** * Protobuf serializer of cluster messages. @@ -26,6 +27,8 @@ import scala.collection.JavaConverters._ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { private final val BufferSize = 1024 * 4 + // must be lazy because serializer is initialized from Cluster extension constructor + private lazy val GossipTimeToLive = Cluster(system).settings.GossipTimeToLive private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] ⇒ AnyRef]( classOf[InternalClusterAction.Join] -> { @@ -58,7 +61,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def toBinary(obj: AnyRef): Array[Byte] = obj match { case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ addressToProtoByteArray(from) - case m: GossipEnvelope ⇒ compress(gossipEnvelopeToProto(m)) + case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m).toByteArray case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) case InternalClusterAction.Join(node, roles) ⇒ joinToProto(node, roles).toByteArray @@ -209,8 +212,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def gossipEnvelopeToProto(envelope: GossipEnvelope): cm.GossipEnvelope = - cm.GossipEnvelope.newBuilder().setFrom(uniqueAddressToProto(envelope.from)).setTo(uniqueAddressToProto(envelope.to)). - setGossip(gossipToProto(envelope.gossip)).build + cm.GossipEnvelope.newBuilder(). + setFrom(uniqueAddressToProto(envelope.from)). + setTo(uniqueAddressToProto(envelope.to)). + setSerializedGossip(ByteString.copyFrom(compress(gossipToProto(envelope.gossip).build))). + build private def gossipStatusToProto(status: GossipStatus): cm.GossipStatus = { val allHashes = status.version.versions.keys.toVector @@ -220,7 +226,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = - gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(decompress(bytes))) + gossipEnvelopeFromProto(cm.GossipEnvelope.parseFrom(bytes)) private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = gossipStatusFromProto(cm.GossipStatus.parseFrom(bytes)) @@ -266,9 +272,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ v ⇒ (VectorClock.Node.fromHash(hashMapping(v.getHashIndex)), v.getTimestamp))(breakOut)) } - private def gossipEnvelopeFromProto(envelope: cm.GossipEnvelope): GossipEnvelope = + private def gossipEnvelopeFromProto(envelope: cm.GossipEnvelope): GossipEnvelope = { + val serializedGossip = envelope.getSerializedGossip GossipEnvelope(uniqueAddressFromProto(envelope.getFrom), uniqueAddressFromProto(envelope.getTo), - gossipFromProto(envelope.getGossip)) + Deadline.now + GossipTimeToLive, () ⇒ gossipFromProto(cm.Gossip.parseFrom(decompress(serializedGossip.toByteArray)))) + } private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus = GossipStatus(uniqueAddressFromProto(status.getFrom), vectorClockFromProto(status.getVersion, diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 22b2c8eb83..6341ff4665 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -29,6 +29,7 @@ class ClusterConfigSpec extends AkkaSpec { RetryUnsuccessfulJoinAfter must be(10 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) + GossipTimeToLive must be(2 seconds) HeartbeatInterval must be(1 second) MonitoredByNrOfMembers must be(5) HeartbeatRequestDelay must be(10 seconds) diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index a5e0aeff7e..ab0b4ee73f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -10,14 +10,24 @@ import akka.testkit.AkkaSpec import java.math.BigInteger @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ClusterMessageSerializerSpec extends AkkaSpec { +class ClusterMessageSerializerSpec extends AkkaSpec( + "akka.actor.provider = akka.cluster.ClusterActorRefProvider") { val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) def checkSerialization(obj: AnyRef): Unit = { val blob = serializer.toBinary(obj) val ref = serializer.fromBinary(blob, obj.getClass) - ref must be(obj) + obj match { + case env: GossipEnvelope ⇒ + val env2 = obj.asInstanceOf[GossipEnvelope] + env2.from must be(env.from) + env2.to must be(env.to) + env2.gossip must be(env.gossip) + case _ ⇒ + ref must be(obj) + } + } import MemberStatus._