diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
index 9e8b1ce958..533e3dd8a0 100644
--- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
+++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
@@ -3574,6 +3574,31 @@ public final class ClusterMessages {
* required .VectorClock version = 6;
*/
akka.cluster.protobuf.msg.ClusterMessages.VectorClockOrBuilder getVersionOrBuilder();
+
+ // repeated .Tombstone tombstones = 7;
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ java.util.List
+ getTombstonesList();
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone getTombstones(int index);
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ int getTombstonesCount();
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ java.util.List extends akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder>
+ getTombstonesOrBuilderList();
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder getTombstonesOrBuilder(
+ int index);
}
/**
* Protobuf type {@code Gossip}
@@ -3689,6 +3714,14 @@ public final class ClusterMessages {
bitField0_ |= 0x00000002;
break;
}
+ case 58: {
+ if (!((mutable_bitField0_ & 0x00000040) == 0x00000040)) {
+ tombstones_ = new java.util.ArrayList();
+ mutable_bitField0_ |= 0x00000040;
+ }
+ tombstones_.add(input.readMessage(akka.cluster.protobuf.msg.ClusterMessages.Tombstone.PARSER, extensionRegistry));
+ break;
+ }
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@@ -3709,6 +3742,9 @@ public final class ClusterMessages {
if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
members_ = java.util.Collections.unmodifiableList(members_);
}
+ if (((mutable_bitField0_ & 0x00000040) == 0x00000040)) {
+ tombstones_ = java.util.Collections.unmodifiableList(tombstones_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -3917,6 +3953,42 @@ public final class ClusterMessages {
return version_;
}
+ // repeated .Tombstone tombstones = 7;
+ public static final int TOMBSTONES_FIELD_NUMBER = 7;
+ private java.util.List tombstones_;
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public java.util.List getTombstonesList() {
+ return tombstones_;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public java.util.List extends akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder>
+ getTombstonesOrBuilderList() {
+ return tombstones_;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public int getTombstonesCount() {
+ return tombstones_.size();
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone getTombstones(int index) {
+ return tombstones_.get(index);
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder getTombstonesOrBuilder(
+ int index) {
+ return tombstones_.get(index);
+ }
+
private void initFields() {
allAddresses_ = java.util.Collections.emptyList();
allRoles_ = akka.protobuf.LazyStringArrayList.EMPTY;
@@ -3924,6 +3996,7 @@ public final class ClusterMessages {
members_ = java.util.Collections.emptyList();
overview_ = akka.cluster.protobuf.msg.ClusterMessages.GossipOverview.getDefaultInstance();
version_ = akka.cluster.protobuf.msg.ClusterMessages.VectorClock.getDefaultInstance();
+ tombstones_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3958,6 +4031,12 @@ public final class ClusterMessages {
memoizedIsInitialized = 0;
return false;
}
+ for (int i = 0; i < getTombstonesCount(); i++) {
+ if (!getTombstones(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -3983,6 +4062,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(6, version_);
}
+ for (int i = 0; i < tombstones_.size(); i++) {
+ output.writeMessage(7, tombstones_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -4026,6 +4108,10 @@ public final class ClusterMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(6, version_);
}
+ for (int i = 0; i < tombstones_.size(); i++) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(7, tombstones_.get(i));
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4143,6 +4229,7 @@ public final class ClusterMessages {
getMembersFieldBuilder();
getOverviewFieldBuilder();
getVersionFieldBuilder();
+ getTombstonesFieldBuilder();
}
}
private static Builder create() {
@@ -4179,6 +4266,12 @@ public final class ClusterMessages {
versionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000020);
+ if (tombstonesBuilder_ == null) {
+ tombstones_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+ } else {
+ tombstonesBuilder_.clear();
+ }
return this;
}
@@ -4253,6 +4346,15 @@ public final class ClusterMessages {
} else {
result.version_ = versionBuilder_.build();
}
+ if (tombstonesBuilder_ == null) {
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ tombstones_ = java.util.Collections.unmodifiableList(tombstones_);
+ bitField0_ = (bitField0_ & ~0x00000040);
+ }
+ result.tombstones_ = tombstones_;
+ } else {
+ result.tombstones_ = tombstonesBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4347,6 +4449,32 @@ public final class ClusterMessages {
if (other.hasVersion()) {
mergeVersion(other.getVersion());
}
+ if (tombstonesBuilder_ == null) {
+ if (!other.tombstones_.isEmpty()) {
+ if (tombstones_.isEmpty()) {
+ tombstones_ = other.tombstones_;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ } else {
+ ensureTombstonesIsMutable();
+ tombstones_.addAll(other.tombstones_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.tombstones_.isEmpty()) {
+ if (tombstonesBuilder_.isEmpty()) {
+ tombstonesBuilder_.dispose();
+ tombstonesBuilder_ = null;
+ tombstones_ = other.tombstones_;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ tombstonesBuilder_ =
+ akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getTombstonesFieldBuilder() : null;
+ } else {
+ tombstonesBuilder_.addAllMessages(other.tombstones_);
+ }
+ }
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4380,6 +4508,12 @@ public final class ClusterMessages {
return false;
}
+ for (int i = 0; i < getTombstonesCount(); i++) {
+ if (!getTombstones(i).isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -5302,6 +5436,246 @@ public final class ClusterMessages {
return versionBuilder_;
}
+ // repeated .Tombstone tombstones = 7;
+ private java.util.List tombstones_ =
+ java.util.Collections.emptyList();
+ private void ensureTombstonesIsMutable() {
+ if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+ tombstones_ = new java.util.ArrayList(tombstones_);
+ bitField0_ |= 0x00000040;
+ }
+ }
+
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder, akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder> tombstonesBuilder_;
+
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public java.util.List getTombstonesList() {
+ if (tombstonesBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(tombstones_);
+ } else {
+ return tombstonesBuilder_.getMessageList();
+ }
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public int getTombstonesCount() {
+ if (tombstonesBuilder_ == null) {
+ return tombstones_.size();
+ } else {
+ return tombstonesBuilder_.getCount();
+ }
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone getTombstones(int index) {
+ if (tombstonesBuilder_ == null) {
+ return tombstones_.get(index);
+ } else {
+ return tombstonesBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder setTombstones(
+ int index, akka.cluster.protobuf.msg.ClusterMessages.Tombstone value) {
+ if (tombstonesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTombstonesIsMutable();
+ tombstones_.set(index, value);
+ onChanged();
+ } else {
+ tombstonesBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder setTombstones(
+ int index, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder builderForValue) {
+ if (tombstonesBuilder_ == null) {
+ ensureTombstonesIsMutable();
+ tombstones_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ tombstonesBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder addTombstones(akka.cluster.protobuf.msg.ClusterMessages.Tombstone value) {
+ if (tombstonesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTombstonesIsMutable();
+ tombstones_.add(value);
+ onChanged();
+ } else {
+ tombstonesBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder addTombstones(
+ int index, akka.cluster.protobuf.msg.ClusterMessages.Tombstone value) {
+ if (tombstonesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTombstonesIsMutable();
+ tombstones_.add(index, value);
+ onChanged();
+ } else {
+ tombstonesBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder addTombstones(
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder builderForValue) {
+ if (tombstonesBuilder_ == null) {
+ ensureTombstonesIsMutable();
+ tombstones_.add(builderForValue.build());
+ onChanged();
+ } else {
+ tombstonesBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder addTombstones(
+ int index, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder builderForValue) {
+ if (tombstonesBuilder_ == null) {
+ ensureTombstonesIsMutable();
+ tombstones_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ tombstonesBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder addAllTombstones(
+ java.lang.Iterable extends akka.cluster.protobuf.msg.ClusterMessages.Tombstone> values) {
+ if (tombstonesBuilder_ == null) {
+ ensureTombstonesIsMutable();
+ super.addAll(values, tombstones_);
+ onChanged();
+ } else {
+ tombstonesBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder clearTombstones() {
+ if (tombstonesBuilder_ == null) {
+ tombstones_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+ onChanged();
+ } else {
+ tombstonesBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public Builder removeTombstones(int index) {
+ if (tombstonesBuilder_ == null) {
+ ensureTombstonesIsMutable();
+ tombstones_.remove(index);
+ onChanged();
+ } else {
+ tombstonesBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder getTombstonesBuilder(
+ int index) {
+ return getTombstonesFieldBuilder().getBuilder(index);
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder getTombstonesOrBuilder(
+ int index) {
+ if (tombstonesBuilder_ == null) {
+ return tombstones_.get(index); } else {
+ return tombstonesBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public java.util.List extends akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder>
+ getTombstonesOrBuilderList() {
+ if (tombstonesBuilder_ != null) {
+ return tombstonesBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(tombstones_);
+ }
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder addTombstonesBuilder() {
+ return getTombstonesFieldBuilder().addBuilder(
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone.getDefaultInstance());
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder addTombstonesBuilder(
+ int index) {
+ return getTombstonesFieldBuilder().addBuilder(
+ index, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.getDefaultInstance());
+ }
+ /**
+ * repeated .Tombstone tombstones = 7;
+ */
+ public java.util.List
+ getTombstonesBuilderList() {
+ return getTombstonesFieldBuilder().getBuilderList();
+ }
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder, akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder>
+ getTombstonesFieldBuilder() {
+ if (tombstonesBuilder_ == null) {
+ tombstonesBuilder_ = new akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder, akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder>(
+ tombstones_,
+ ((bitField0_ & 0x00000040) == 0x00000040),
+ getParentForChildren(),
+ isClean());
+ tombstones_ = null;
+ }
+ return tombstonesBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:Gossip)
}
@@ -7686,6 +8060,499 @@ public final class ClusterMessages {
// @@protoc_insertion_point(class_scope:SubjectReachability)
}
+ public interface TombstoneOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required int32 addressIndex = 1;
+ /**
+ * required int32 addressIndex = 1;
+ */
+ boolean hasAddressIndex();
+ /**
+ * required int32 addressIndex = 1;
+ */
+ int getAddressIndex();
+
+ // required int64 timestamp = 2;
+ /**
+ * required int64 timestamp = 2;
+ */
+ boolean hasTimestamp();
+ /**
+ * required int64 timestamp = 2;
+ */
+ long getTimestamp();
+ }
+ /**
+ * Protobuf type {@code Tombstone}
+ */
+ public static final class Tombstone extends
+ akka.protobuf.GeneratedMessage
+ implements TombstoneOrBuilder {
+ // Use Tombstone.newBuilder() to construct.
+ private Tombstone(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private Tombstone(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final Tombstone defaultInstance;
+ public static Tombstone getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public Tombstone getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private Tombstone(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ addressIndex_ = input.readInt32();
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ timestamp_ = input.readInt64();
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.protobuf.msg.ClusterMessages.internal_static_Tombstone_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.protobuf.msg.ClusterMessages.internal_static_Tombstone_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone.class, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public Tombstone parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new Tombstone(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required int32 addressIndex = 1;
+ public static final int ADDRESSINDEX_FIELD_NUMBER = 1;
+ private int addressIndex_;
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public boolean hasAddressIndex() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public int getAddressIndex() {
+ return addressIndex_;
+ }
+
+ // required int64 timestamp = 2;
+ public static final int TIMESTAMP_FIELD_NUMBER = 2;
+ private long timestamp_;
+ /**
+ * required int64 timestamp = 2;
+ */
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required int64 timestamp = 2;
+ */
+ public long getTimestamp() {
+ return timestamp_;
+ }
+
+ private void initFields() {
+ addressIndex_ = 0;
+ timestamp_ = 0L;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasAddressIndex()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTimestamp()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt32(1, addressIndex_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeInt64(2, timestamp_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeInt32Size(1, addressIndex_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeInt64Size(2, timestamp_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.protobuf.msg.ClusterMessages.Tombstone parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.protobuf.msg.ClusterMessages.Tombstone prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code Tombstone}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.protobuf.msg.ClusterMessages.TombstoneOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.protobuf.msg.ClusterMessages.internal_static_Tombstone_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.protobuf.msg.ClusterMessages.internal_static_Tombstone_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone.class, akka.cluster.protobuf.msg.ClusterMessages.Tombstone.Builder.class);
+ }
+
+ // Construct using akka.cluster.protobuf.msg.ClusterMessages.Tombstone.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ addressIndex_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ timestamp_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.protobuf.msg.ClusterMessages.internal_static_Tombstone_descriptor;
+ }
+
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone getDefaultInstanceForType() {
+ return akka.cluster.protobuf.msg.ClusterMessages.Tombstone.getDefaultInstance();
+ }
+
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone build() {
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.protobuf.msg.ClusterMessages.Tombstone buildPartial() {
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone result = new akka.cluster.protobuf.msg.ClusterMessages.Tombstone(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.addressIndex_ = addressIndex_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.timestamp_ = timestamp_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.protobuf.msg.ClusterMessages.Tombstone) {
+ return mergeFrom((akka.cluster.protobuf.msg.ClusterMessages.Tombstone)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.protobuf.msg.ClusterMessages.Tombstone other) {
+ if (other == akka.cluster.protobuf.msg.ClusterMessages.Tombstone.getDefaultInstance()) return this;
+ if (other.hasAddressIndex()) {
+ setAddressIndex(other.getAddressIndex());
+ }
+ if (other.hasTimestamp()) {
+ setTimestamp(other.getTimestamp());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasAddressIndex()) {
+
+ return false;
+ }
+ if (!hasTimestamp()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.protobuf.msg.ClusterMessages.Tombstone parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.protobuf.msg.ClusterMessages.Tombstone) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required int32 addressIndex = 1;
+ private int addressIndex_ ;
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public boolean hasAddressIndex() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public int getAddressIndex() {
+ return addressIndex_;
+ }
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public Builder setAddressIndex(int value) {
+ bitField0_ |= 0x00000001;
+ addressIndex_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required int32 addressIndex = 1;
+ */
+ public Builder clearAddressIndex() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ addressIndex_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // required int64 timestamp = 2;
+ private long timestamp_ ;
+ /**
+ * required int64 timestamp = 2;
+ */
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required int64 timestamp = 2;
+ */
+ public long getTimestamp() {
+ return timestamp_;
+ }
+ /**
+ * required int64 timestamp = 2;
+ */
+ public Builder setTimestamp(long value) {
+ bitField0_ |= 0x00000002;
+ timestamp_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required int64 timestamp = 2;
+ */
+ public Builder clearTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ timestamp_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:Tombstone)
+ }
+
+ static {
+ defaultInstance = new Tombstone(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:Tombstone)
+ }
+
public interface MemberOrBuilder
extends akka.protobuf.MessageOrBuilder {
@@ -13880,6 +14747,11 @@ public final class ClusterMessages {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_SubjectReachability_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_Tombstone_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_Tombstone_fieldAccessorTable;
private static akka.protobuf.Descriptors.Descriptor
internal_static_Member_descriptor;
private static
@@ -13942,40 +14814,43 @@ public final class ClusterMessages {
"(\0132\016.UniqueAddress\022\030\n\020serializedGossip\030\003" +
" \002(\014\"^\n\014GossipStatus\022\034\n\004from\030\001 \002(\0132\016.Uni" +
"queAddress\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version" +
- "\030\003 \002(\0132\014.VectorClock\"\257\001\n\006Gossip\022$\n\014allAd" +
+ "\030\003 \002(\0132\014.VectorClock\"\317\001\n\006Gossip\022$\n\014allAd" +
"dresses\030\001 \003(\0132\016.UniqueAddress\022\020\n\010allRole",
"s\030\002 \003(\t\022\021\n\tallHashes\030\003 \003(\t\022\030\n\007members\030\004 " +
"\003(\0132\007.Member\022!\n\010overview\030\005 \002(\0132\017.GossipO" +
- "verview\022\035\n\007version\030\006 \002(\0132\014.VectorClock\"S" +
- "\n\016GossipOverview\022\014\n\004seen\030\001 \003(\005\0223\n\024observ" +
- "erReachability\030\002 \003(\0132\025.ObserverReachabil" +
- "ity\"p\n\024ObserverReachability\022\024\n\014addressIn" +
- "dex\030\001 \002(\005\022\017\n\007version\030\004 \002(\003\0221\n\023subjectRea" +
- "chability\030\002 \003(\0132\024.SubjectReachability\"a\n" +
- "\023SubjectReachability\022\024\n\014addressIndex\030\001 \002" +
- "(\005\022#\n\006status\030\003 \002(\0162\023.ReachabilityStatus\022",
- "\017\n\007version\030\004 \002(\003\"i\n\006Member\022\024\n\014addressInd" +
- "ex\030\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006status\030\003 \002" +
- "(\0162\r.MemberStatus\022\030\n\014rolesIndexes\030\004 \003(\005B" +
- "\002\020\001\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 \001(\003\022&\n" +
- "\010versions\030\002 \003(\0132\024.VectorClock.Version\032/\n" +
- "\007Version\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttimestamp" +
- "\030\002 \002(\003\"\007\n\005Empty\"K\n\007Address\022\016\n\006system\030\001 \002" +
- "(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010pr" +
- "otocol\030\004 \001(\t\"E\n\rUniqueAddress\022\031\n\007address" +
- "\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\r\022\014\n\004uid2\030\003 ",
- "\001(\r\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005" +
- ".Pool\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterP" +
- "oolSettings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(" +
- "\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"|\n\031Clu" +
- "sterRouterPoolSettings\022\026\n\016totalInstances" +
- "\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021a" +
- "llowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t*" +
- "D\n\022ReachabilityStatus\022\r\n\tReachable\020\000\022\017\n\013" +
- "Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014MemberS" +
- "tatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022",
- "\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010W" +
- "eaklyUp\020\006B\035\n\031akka.cluster.protobuf.msgH\001"
+ "verview\022\035\n\007version\030\006 \002(\0132\014.VectorClock\022\036" +
+ "\n\ntombstones\030\007 \003(\0132\n.Tombstone\"S\n\016Gossip" +
+ "Overview\022\014\n\004seen\030\001 \003(\005\0223\n\024observerReacha" +
+ "bility\030\002 \003(\0132\025.ObserverReachability\"p\n\024O" +
+ "bserverReachability\022\024\n\014addressIndex\030\001 \002(" +
+ "\005\022\017\n\007version\030\004 \002(\003\0221\n\023subjectReachabilit" +
+ "y\030\002 \003(\0132\024.SubjectReachability\"a\n\023Subject" +
+ "Reachability\022\024\n\014addressIndex\030\001 \002(\005\022#\n\006st",
+ "atus\030\003 \002(\0162\023.ReachabilityStatus\022\017\n\007versi" +
+ "on\030\004 \002(\003\"4\n\tTombstone\022\024\n\014addressIndex\030\001 " +
+ "\002(\005\022\021\n\ttimestamp\030\002 \002(\003\"i\n\006Member\022\024\n\014addr" +
+ "essIndex\030\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006stat" +
+ "us\030\003 \002(\0162\r.MemberStatus\022\030\n\014rolesIndexes\030" +
+ "\004 \003(\005B\002\020\001\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 " +
+ "\001(\003\022&\n\010versions\030\002 \003(\0132\024.VectorClock.Vers" +
+ "ion\032/\n\007Version\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttim" +
+ "estamp\030\002 \002(\003\"\007\n\005Empty\"K\n\007Address\022\016\n\006syst" +
+ "em\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r",
+ "\022\020\n\010protocol\030\004 \001(\t\"E\n\rUniqueAddress\022\031\n\007a" +
+ "ddress\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\r\022\014\n\004u" +
+ "id2\030\003 \001(\r\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001" +
+ " \002(\0132\005.Pool\022,\n\010settings\030\002 \002(\0132\032.ClusterR" +
+ "outerPoolSettings\"<\n\004Pool\022\024\n\014serializerI" +
+ "d\030\001 \002(\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"" +
+ "|\n\031ClusterRouterPoolSettings\022\026\n\016totalIns" +
+ "tances\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(" +
+ "\r\022\031\n\021allowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030" +
+ "\004 \001(\t*D\n\022ReachabilityStatus\022\r\n\tReachable",
+ "\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014M" +
+ "emberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leav" +
+ "ing\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020" +
+ "\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf" +
+ ".msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14011,7 +14886,7 @@ public final class ClusterMessages {
internal_static_Gossip_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Gossip_descriptor,
- new java.lang.String[] { "AllAddresses", "AllRoles", "AllHashes", "Members", "Overview", "Version", });
+ new java.lang.String[] { "AllAddresses", "AllRoles", "AllHashes", "Members", "Overview", "Version", "Tombstones", });
internal_static_GossipOverview_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_GossipOverview_fieldAccessorTable = new
@@ -14030,14 +14905,20 @@ public final class ClusterMessages {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubjectReachability_descriptor,
new java.lang.String[] { "AddressIndex", "Status", "Version", });
- internal_static_Member_descriptor =
+ internal_static_Tombstone_descriptor =
getDescriptor().getMessageTypes().get(8);
+ internal_static_Tombstone_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_Tombstone_descriptor,
+ new java.lang.String[] { "AddressIndex", "Timestamp", });
+ internal_static_Member_descriptor =
+ getDescriptor().getMessageTypes().get(9);
internal_static_Member_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Member_descriptor,
new java.lang.String[] { "AddressIndex", "UpNumber", "Status", "RolesIndexes", });
internal_static_VectorClock_descriptor =
- getDescriptor().getMessageTypes().get(9);
+ getDescriptor().getMessageTypes().get(10);
internal_static_VectorClock_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_VectorClock_descriptor,
@@ -14049,37 +14930,37 @@ public final class ClusterMessages {
internal_static_VectorClock_Version_descriptor,
new java.lang.String[] { "HashIndex", "Timestamp", });
internal_static_Empty_descriptor =
- getDescriptor().getMessageTypes().get(10);
+ getDescriptor().getMessageTypes().get(11);
internal_static_Empty_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Empty_descriptor,
new java.lang.String[] { });
internal_static_Address_descriptor =
- getDescriptor().getMessageTypes().get(11);
+ getDescriptor().getMessageTypes().get(12);
internal_static_Address_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Address_descriptor,
new java.lang.String[] { "System", "Hostname", "Port", "Protocol", });
internal_static_UniqueAddress_descriptor =
- getDescriptor().getMessageTypes().get(12);
+ getDescriptor().getMessageTypes().get(13);
internal_static_UniqueAddress_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UniqueAddress_descriptor,
new java.lang.String[] { "Address", "Uid", "Uid2", });
internal_static_ClusterRouterPool_descriptor =
- getDescriptor().getMessageTypes().get(13);
+ getDescriptor().getMessageTypes().get(14);
internal_static_ClusterRouterPool_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ClusterRouterPool_descriptor,
new java.lang.String[] { "Pool", "Settings", });
internal_static_Pool_descriptor =
- getDescriptor().getMessageTypes().get(14);
+ getDescriptor().getMessageTypes().get(15);
internal_static_Pool_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Pool_descriptor,
new java.lang.String[] { "SerializerId", "Manifest", "Data", });
internal_static_ClusterRouterPoolSettings_descriptor =
- getDescriptor().getMessageTypes().get(15);
+ getDescriptor().getMessageTypes().get(16);
internal_static_ClusterRouterPoolSettings_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ClusterRouterPoolSettings_descriptor,
diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto
index fa4358dfac..9ff9926337 100644
--- a/akka-cluster/src/main/protobuf/ClusterMessages.proto
+++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto
@@ -101,6 +101,7 @@ message Gossip {
repeated Member members = 4;
required GossipOverview overview = 5;
required VectorClock version = 6;
+ repeated Tombstone tombstones = 7;
}
/**
@@ -127,6 +128,11 @@ message SubjectReachability {
required int64 version = 4;
}
+message Tombstone {
+ required int32 addressIndex = 1;
+ required int64 timestamp = 2;
+}
+
/**
* Reachability status
*/
diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index f5b49f6c81..ad9e3414bd 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -74,6 +74,8 @@ akka {
# The roles are part of the membership information and can be used by
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
+ # Roles are not allowed to start with "team-" as that is reserved for the
+ # special role assigned from the team a node belongs to (see above)
roles = []
# Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster
@@ -145,6 +147,11 @@ akka {
# greater than this value.
reduce-gossip-different-view-probability = 400
+ # When a node is removed the removal is marked with a tombstone
+ # which is kept at least this long, after which it is pruned, if there is a partition
+ # longer than this it could lead to removed nodes being re-added to the cluster
+ prune-gossip-tombstones-after = 24h
+
# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
# members.
@@ -209,6 +216,9 @@ akka {
debug {
# log heartbeat events (very verbose, useful mostly when debugging heartbeating issues)
verbose-heartbeat-logging = off
+
+ # log verbose details about gossip
+ verbose-gossip-logging = off
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 94605654bf..558f1420f8 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -420,13 +420,32 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
private[cluster] object InfoLogger {
def logInfo(message: String): Unit =
- if (LogInfo) log.info("Cluster Node [{}] - {}", selfAddress, message)
+ if (LogInfo)
+ if (settings.Team == ClusterSettings.DefaultTeam)
+ log.info("Cluster Node [{}] - {}", selfAddress, message)
+ else
+ log.info("Cluster Node [{}] team [{}] - {}", selfAddress, settings.Team, message)
def logInfo(template: String, arg1: Any): Unit =
- if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
+ if (LogInfo)
+ if (settings.Team == ClusterSettings.DefaultTeam)
+ log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
+ else
+ log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
- if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
+ if (LogInfo)
+ if (settings.Team == ClusterSettings.DefaultTeam)
+ log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
+ else
+ log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1, arg2)
+
+ def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
+ if (LogInfo)
+ if (settings.Team == ClusterSettings.DefaultTeam)
+ log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
+ else
+ log.info("Cluster Node [{}] team [" + settings.Team + "] - " + template, selfAddress, arg1, arg2, arg3)
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
index b24b1249c2..b0e1e38130 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -7,19 +7,24 @@ import language.existentials
import scala.collection.immutable
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
+
import scala.util.control.NonFatal
import akka.actor._
import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
-import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
+import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
+
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
+
import akka.pattern.ask
import akka.util.Timeout
import akka.Done
+import akka.annotation.InternalApi
+
import scala.concurrent.Future
import scala.concurrent.Promise
@@ -266,9 +271,22 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/**
* INTERNAL API.
*/
+@InternalApi
+private[cluster] object ClusterCoreDaemon {
+ def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
+
+ val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
+ val MaxGossipsBeforeShuttingDownMyself = 5
+}
+
+/**
+ * INTERNAL API.
+ */
+@InternalApi
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
+ import ClusterCoreDaemon._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector }
@@ -277,10 +295,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
protected def selfUniqueAddress = cluster.selfUniqueAddress
- val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
- val MaxGossipsBeforeShuttingDownMyself = 5
-
- def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
// note that self is not initially member,
@@ -316,6 +330,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
var exitingConfirmed = Set.empty[UniqueAddress]
+ def selfTeam = cluster.settings.Team
+
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@@ -544,28 +560,28 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Received `Join` message and replies with `Welcome` message, containing
* current gossip state, including the new joining member.
*/
- def joining(node: UniqueAddress, roles: Set[String]): Unit = {
+ def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status
- if (node.address.protocol != selfAddress.protocol)
+ if (joiningNode.address.protocol != selfAddress.protocol)
log.warning(
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
- selfAddress.protocol, node.address.protocol)
- else if (node.address.system != selfAddress.system)
+ selfAddress.protocol, joiningNode.address.protocol)
+ else if (joiningNode.address.system != selfAddress.system)
log.warning(
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
- selfAddress.system, node.address.system)
+ selfAddress.system, joiningNode.address.system)
else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
- logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", node, selfStatus)
+ logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus)
else {
val localMembers = latestGossip.members
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
- localMembers.find(_.address == node.address) match {
- case Some(m) if m.uniqueAddress == node ⇒
+ localMembers.find(_.address == joiningNode.address) match {
+ case Some(m) if m.uniqueAddress == joiningNode ⇒
// node retried join attempt, probably due to lost Welcome message
logInfo("Existing member [{}] is joining again.", m)
- if (node != selfUniqueAddress)
+ if (joiningNode != selfUniqueAddress)
sender() ! Welcome(selfUniqueAddress, latestGossip)
case Some(m) ⇒
// node restarted, same host:port as existing member, but with different uid
@@ -584,17 +600,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
case None ⇒
// remove the node from the failure detector
- failureDetector.remove(node.address)
+ failureDetector.remove(joiningNode.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
- val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
+ val newMembers = localMembers + Member(joiningNode, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
updateLatestGossip(newGossip)
- logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
- if (node == selfUniqueAddress) {
+ logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", "))
+ if (joiningNode == selfUniqueAddress) {
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else
@@ -613,8 +629,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (joinWith != from.address)
logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
else {
- logInfo("Welcome from [{}]", from.address)
latestGossip = gossip seen selfUniqueAddress
+ logInfo("Welcome from [{}]", from.address)
assertLatestGossip()
publish(latestGossip)
if (from != selfUniqueAddress)
@@ -663,11 +679,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
// send ExitingConfirmed to two potential leaders
- val membersWithoutSelf = latestGossip.members.filterNot(_.uniqueAddress == selfUniqueAddress)
- latestGossip.leaderOf(membersWithoutSelf, selfUniqueAddress) match {
+ val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress)
+
+ latestGossip.leaderOf(selfTeam, membersExceptSelf, selfUniqueAddress) match {
case Some(node1) ⇒
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
- latestGossip.leaderOf(membersWithoutSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
+ latestGossip.leaderOf(selfTeam, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
case Some(node2) ⇒
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None ⇒ // no more potential leader
@@ -706,26 +723,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
- val localReachability = localOverview.reachability
+ val localReachability = localGossip.teamReachability(selfTeam)
// check if the node to DOWN is in the `members` set
localMembers.find(_.address == address) match {
- case Some(m) if (m.status != Down) ⇒
+ case Some(m) if m.status != Down ⇒
if (localReachability.isReachable(m.uniqueAddress))
logInfo("Marking node [{}] as [{}]", m.address, Down)
else
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
- // replace member (changed status)
- val newMembers = localMembers - m + m.copy(status = Down)
- // remove nodes marked as DOWN from the `seen` table
- val newSeen = localSeen - m.uniqueAddress
-
- // update gossip overview
- val newOverview = localOverview copy (seen = newSeen)
- val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
+ val newGossip = localGossip.markAsDown(m)
updateLatestGossip(newGossip)
-
publish(latestGossip)
case Some(_) ⇒ // already down
case None ⇒
@@ -751,7 +760,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def receiveGossipStatus(status: GossipStatus): Unit = {
val from = status.from
- if (!latestGossip.overview.reachability.isReachable(selfUniqueAddress, from))
+ if (!latestGossip.isReachable(selfUniqueAddress, from))
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else if (latestGossip.members.forall(_.uniqueAddress != from))
log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from)
@@ -778,6 +787,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
+
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
@@ -788,7 +798,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
- } else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) {
+ } else if (!localGossip.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
@@ -839,10 +849,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
- if (exitingTasksInProgress)
- latestGossip = winningGossip
- else
- latestGossip = winningGossip seen selfUniqueAddress
+ latestGossip =
+ if (exitingTasksInProgress) winningGossip
+ else winningGossip seen selfUniqueAddress
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
@@ -852,7 +861,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
- if (comparison == VectorClock.Concurrent) {
+ if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
@@ -995,11 +1004,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
*/
def leaderActions(): Unit = {
- if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) {
- // only run the leader actions if we are the LEADER
+ if (latestGossip.isTeamLeader(selfTeam, selfUniqueAddress, selfUniqueAddress)) {
+ // only run the leader actions if we are the LEADER of the team
val firstNotice = 20
val periodicNotice = 60
- if (latestGossip.convergence(selfUniqueAddress, exitingConfirmed)) {
+ if (latestGossip.convergence(selfTeam, selfUniqueAddress, exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@@ -1012,9 +1021,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
- latestGossip.reachabilityExcludingDownedObservers,
- latestGossip.members.map(m ⇒
- s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
+ latestGossip.teamReachabilityExcludingDownedObservers(selfTeam),
+ latestGossip.members.collect {
+ case m if m.team == selfTeam ⇒
+ s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
+ }.mkString(", "))
}
}
cleanupExitingConfirmed()
@@ -1025,8 +1036,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
// status Down. The down commands should spread before we shutdown.
- val unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated
- val downed = latestGossip.members.collect { case m if m.status == Down ⇒ m.uniqueAddress }
+ val unreachable = latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
+ val downed = latestGossip.teamMembers(selfTeam).collect { case m if m.status == Down ⇒ m.uniqueAddress }
if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
@@ -1059,95 +1070,85 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* 9. Update the state with the new gossip
*/
def leaderActionsOnConvergence(): Unit = {
- val localGossip = latestGossip
- val localMembers = localGossip.members
- val localOverview = localGossip.overview
- val localSeen = localOverview.seen
-
- val enoughMembers: Boolean = isMinNrOfMembersFulfilled
- def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
val removedUnreachable = for {
- node ← localOverview.reachability.allUnreachableOrTerminated
- m = localGossip.member(node)
- if Gossip.removeUnreachableWithMemberStatus(m.status)
+ node ← latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
+ m = latestGossip.member(node)
+ if m.team == selfTeam && Gossip.removeUnreachableWithMemberStatus(m.status)
} yield m
- val removedExitingConfirmed = exitingConfirmed.filter(n ⇒ localGossip.member(n).status == Exiting)
+ val removedExitingConfirmed = exitingConfirmed.filter { n ⇒
+ val member = latestGossip.member(n)
+ member.team == selfTeam && member.status == Exiting
+ }
- val changedMembers = localMembers collect {
- var upNumber = 0
+ val changedMembers = {
+ val enoughMembers: Boolean = isMinNrOfMembersFulfilled
+ def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
- {
- case m if isJoiningToUp(m) ⇒
- // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
- // and minimum number of nodes have joined the cluster
- if (upNumber == 0) {
- // It is alright to use same upNumber as already used by a removed member, since the upNumber
- // is only used for comparing age of current cluster members (Member.isOlderThan)
- val youngest = localGossip.youngestMember
- upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
- } else {
- upNumber += 1
- }
- m.copyUp(upNumber)
+ latestGossip.members collect {
+ var upNumber = 0
- case m if m.status == Leaving ⇒
- // Move LEAVING => EXITING (once we have a convergence on LEAVING)
- m copy (status = Exiting)
+ {
+ case m if m.team == selfTeam && isJoiningToUp(m) ⇒
+ // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
+ // and minimum number of nodes have joined the cluster
+ if (upNumber == 0) {
+ // It is alright to use same upNumber as already used by a removed member, since the upNumber
+ // is only used for comparing age of current cluster members (Member.isOlderThan)
+ val youngest = latestGossip.youngestMember
+ upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
+ } else {
+ upNumber += 1
+ }
+ m.copyUp(upNumber)
+
+ case m if m.team == selfTeam && m.status == Leaving ⇒
+ // Move LEAVING => EXITING (once we have a convergence on LEAVING)
+ m copy (status = Exiting)
+ }
}
}
- if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
- // handle changes
+ val updatedGossip: Gossip =
+ if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
- // replace changed members
- val newMembers = changedMembers.union(localMembers).diff(removedUnreachable)
- .filterNot(m ⇒ removedExitingConfirmed(m.uniqueAddress))
+ // replace changed members
+ val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
+ val newGossip =
+ latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
- // removing REMOVED nodes from the `seen` table
- val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
- val newSeen = localSeen diff removed
- // removing REMOVED nodes from the `reachability` table
- val newReachability = localOverview.reachability.remove(removed)
- val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
- // Clear the VectorClock when member is removed. The change made by the leader is stamped
- // and will propagate as is if there are no other changes on other nodes.
- // If other concurrent changes on other nodes (e.g. join) the pruning is also
- // taken care of when receiving gossips.
- val newVersion = removed.foldLeft(localGossip.version) { (v, node) ⇒
- v.prune(VectorClock.Node(vclockName(node)))
- }
- val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion)
+ if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
+ // Leader is moving itself from Leaving to Exiting.
+ // ExitingCompleted will be received via CoordinatedShutdown to continue
+ // the leaving process. Meanwhile the gossip state is not marked as seen.
+ exitingTasksInProgress = true
+ logInfo("Exiting (leader), starting coordinated shutdown")
+ selfExiting.trySuccess(Done)
+ coordShutdown.run()
+ }
- if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
- // Leader is moving itself from Leaving to Exiting.
- // ExitingCompleted will be received via CoordinatedShutdown to continue
- // the leaving process. Meanwhile the gossip state is not marked as seen.
- exitingTasksInProgress = true
- logInfo("Exiting (leader), starting coordinated shutdown")
- selfExiting.trySuccess(Done)
- coordShutdown.run()
- }
+ exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
- updateLatestGossip(newGossip)
- exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
+ changedMembers foreach { m ⇒
+ logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
+ }
+ removedUnreachable foreach { m ⇒
+ val status = if (m.status == Exiting) "exiting" else "unreachable"
+ logInfo("Leader is removing {} node [{}]", status, m.address)
+ }
+ removedExitingConfirmed.foreach { n ⇒
+ logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
+ }
- // log status changes
- changedMembers foreach { m ⇒
- logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
- }
+ newGossip
+ } else
+ latestGossip
- // log the removal of the unreachable nodes
- removedUnreachable foreach { m ⇒
- val status = if (m.status == Exiting) "exiting" else "unreachable"
- logInfo("Leader is removing {} node [{}]", status, m.address)
- }
- removedExitingConfirmed.foreach { n ⇒
- logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
- }
-
- publish(latestGossip)
+ val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
+ if (pruned ne latestGossip) {
+ updateLatestGossip(pruned)
+ publish(pruned)
}
}
@@ -1157,7 +1158,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToWeaklyUp(m: Member): Boolean =
- m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
+ m.team == selfTeam &&
+ m.status == Joining &&
+ enoughMembers &&
+ latestGossip.teamReachabilityExcludingDownedObservers(selfTeam).isReachable(m.uniqueAddress)
val changedMembers = localMembers.collect {
case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp)
}
@@ -1203,10 +1207,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (newlyDetectedUnreachableMembers.nonEmpty || newlyDetectedReachableMembers.nonEmpty) {
- val newReachability1 = (localOverview.reachability /: newlyDetectedUnreachableMembers) {
+ val newReachability1 = newlyDetectedUnreachableMembers.foldLeft(localOverview.reachability) {
(reachability, m) ⇒ reachability.unreachable(selfUniqueAddress, m.uniqueAddress)
}
- val newReachability2 = (newReachability1 /: newlyDetectedReachableMembers) {
+ val newReachability2 = newlyDetectedReachableMembers.foldLeft(newReachability1) {
(reachability, m) ⇒ reachability.reachable(selfUniqueAddress, m.uniqueAddress)
}
@@ -1265,8 +1269,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
- (node != selfUniqueAddress && latestGossip.hasMember(node) &&
- latestGossip.reachabilityExcludingDownedObservers.isReachable(node))
+ node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfTeam, node)
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
@@ -1291,6 +1294,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}")
def publish(newGossip: Gossip): Unit = {
+ if (cluster.settings.Debug.VerboseGossipLogging)
+ log.debug("Cluster Node [{}] team [{}] - New gossip published [{}]", selfAddress, cluster.settings.Team, newGossip)
+
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
index 319581b9cf..9b50818a6a 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
@@ -7,11 +7,15 @@ import language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
+import akka.cluster.ClusterSettings.Team
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
-import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
+import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.actor.DeadLetterSuppression
+import akka.annotation.InternalApi
+
+import scala.collection.breakOut
/**
* Domain events published to the event bus.
@@ -53,6 +57,8 @@ object ClusterEvent {
/**
* Current snapshot state of the cluster. Sent to new subscriber.
+ *
+ * @param leader leader of the team of this node
*/
final case class CurrentClusterState(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
@@ -82,10 +88,21 @@ object ClusterEvent {
scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava
/**
- * Java API: get address of current leader, or null if none
+ * Java API: get address of current team leader, or null if none
*/
def getLeader: Address = leader orNull
+ /**
+ * get address of current leader, if any, within the team that has the given role
+ */
+ def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
+
+ /**
+ * Java API: get address of current leader, if any, within the team that has the given role
+ * or null if no such node exists
+ */
+ def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
+
/**
* All node roles in the cluster
*/
@@ -98,15 +115,16 @@ object ClusterEvent {
scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava
/**
- * get address of current leader, if any, within the role set
+ * All teams in the cluster
*/
- def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
+ def allTeams: Set[String] = members.map(_.team)(breakOut)
/**
- * Java API: get address of current leader within the role set,
- * or null if no node with that role
+ * Java API: All teams in the cluster
*/
- def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
+ def getAllTeams: java.util.Set[String] =
+ scala.collection.JavaConverters.setAsJavaSetConverter(allTeams).asJava
+
}
/**
@@ -171,7 +189,7 @@ object ClusterEvent {
}
/**
- * Leader of the cluster members changed. Published when the state change
+ * Leader of the cluster team of this node changed. Published when the state change
* is first seen on a node.
*/
final case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
@@ -183,7 +201,7 @@ object ClusterEvent {
}
/**
- * First member (leader) of the members within a role set changed.
+ * First member (leader) of the members within a role set (in the same team as this node, if cluster teams are used) changed.
* Published when the state change is first seen on a node.
*/
final case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
@@ -299,32 +317,35 @@ object ClusterEvent {
/**
* INTERNAL API
*/
- private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = {
- val newLeader = newGossip.leader(selfUniqueAddress)
- if (newLeader != oldGossip.leader(selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address)))
+ @InternalApi
+ private[cluster] def diffLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = {
+ val newLeader = newGossip.teamLeader(team, selfUniqueAddress)
+ if (newLeader != oldGossip.teamLeader(team, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address)))
else Nil
}
/**
* INTERNAL API
*/
- private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
+ @InternalApi
+ private[cluster] def diffRolesLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
for {
- role ← (oldGossip.allRoles union newGossip.allRoles)
- newLeader = newGossip.roleLeader(role, selfUniqueAddress)
- if newLeader != oldGossip.roleLeader(role, selfUniqueAddress)
+ role ← oldGossip.allRoles union newGossip.allRoles
+ newLeader = newGossip.roleLeader(team, role, selfUniqueAddress)
+ if newLeader != oldGossip.roleLeader(team, role, selfUniqueAddress)
} yield RoleLeaderChanged(role, newLeader.map(_.address))
}
/**
* INTERNAL API
*/
- private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
+ @InternalApi
+ private[cluster] def diffSeen(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
- val newConvergence = newGossip.convergence(selfUniqueAddress, Set.empty)
+ val newConvergence = newGossip.convergence(team, selfUniqueAddress, Set.empty)
val newSeenBy = newGossip.seenBy
- if (newConvergence != oldGossip.convergence(selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy)
+ if (newConvergence != oldGossip.convergence(team, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}
@@ -332,6 +353,7 @@ object ClusterEvent {
/**
* INTERNAL API
*/
+ @InternalApi
private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] =
if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil
else List(ReachabilityChanged(newGossip.overview.reachability))
@@ -347,8 +369,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
- val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
+ val cluster = Cluster(context.system)
+ val selfUniqueAddress = cluster.selfUniqueAddress
var latestGossip: Gossip = Gossip.empty
+ def selfTeam = cluster.settings.Team
override def preRestart(reason: Throwable, message: Option[Any]) {
// don't postStop when restarted, no children to stop
@@ -383,9 +407,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
members = latestGossip.members,
unreachable = unreachable,
seenBy = latestGossip.seenBy.map(_.address),
- leader = latestGossip.leader(selfUniqueAddress).map(_.address),
- roleLeaderMap = latestGossip.allRoles.map(r ⇒ r → latestGossip.roleLeader(r, selfUniqueAddress)
- .map(_.address))(collection.breakOut))
+ leader = latestGossip.teamLeader(selfTeam, selfUniqueAddress).map(_.address),
+ roleLeaderMap = latestGossip.allRoles.map(r ⇒
+ r → latestGossip.roleLeader(selfTeam, r, selfUniqueAddress).map(_.address)
+ )(collection.breakOut)
+ )
receiver ! state
}
@@ -420,10 +446,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
diffMemberEvents(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
- diffLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub
- diffRolesLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub
+ diffLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
+ diffRolesLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
// publish internal SeenState for testing purposes
- diffSeen(oldGossip, newGossip, selfUniqueAddress) foreach pub
+ diffSeen(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachability(oldGossip, newGossip) foreach pub
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala
index 9bb9149651..10f7cb309a 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala
@@ -109,12 +109,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def status: MemberStatus = self.status
/**
- * Is this node the leader?
+ * Is this node the current team leader
*/
def isLeader: Boolean = leader.contains(selfAddress)
/**
- * Get the address of the current leader.
+ * Get the address of the current team leader
*/
def leader: Option[Address] = state.leader
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index 44b92a8009..f6c8fca61d 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -10,14 +10,31 @@ import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.AddressFromURIString
+import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
-import akka.util.Helpers.{ Requiring, ConfigOps, toRootLowerCase }
+import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
-final class ClusterSettings(val config: Config, val systemName: String) {
+object ClusterSettings {
+ type Team = String
+ /**
+ * INTERNAL API.
+ */
+ @InternalApi
+ private[akka] val TeamRolePrefix = "team-"
+ /**
+ * INTERNAL API.
+ */
+ @InternalApi
+ private[akka] val DefaultTeam: Team = "default"
+
+}
+
+final class ClusterSettings(val config: Config, val systemName: String) {
+ import ClusterSettings._
private val cc = config.getConfig("akka.cluster")
val LogInfo: Boolean = cc.getBoolean("log-info")
@@ -58,6 +75,11 @@ final class ClusterSettings(val config: Config, val systemName: String) {
}
}
+ val PruneGossipTombstonesAfter: Duration = {
+ val key = "prune-gossip-tombstones-after"
+ cc.getMillisDuration(key) requiring (_ >= Duration.Zero, key + " >= 0s")
+ }
+
// specific to the [[akka.cluster.DefaultDowningProvider]]
val AutoDownUnreachableAfter: Duration = {
val key = "auto-down-unreachable-after"
@@ -93,8 +115,15 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
- val Team: String = cc.getString("team")
- val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet + s"team-$Team"
+ val Team: Team = cc.getString("team")
+ val Roles: Set[String] = {
+ val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
+ _.forall(!_.startsWith(TeamRolePrefix)),
+ s"Roles must not start with '${TeamRolePrefix}' as that is reserved for the cluster team setting"
+ )
+
+ configuredRoles + s"$TeamRolePrefix$Team"
+ }
val MinNrOfMembers: Int = {
cc.getInt("min-nr-of-members")
} requiring (_ > 0, "min-nr-of-members must be > 0")
@@ -118,6 +147,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
object Debug {
val VerboseHeartbeatLogging = cc.getBoolean("debug.verbose-heartbeat-logging")
+ val VerboseGossipLogging = cc.getBoolean("debug.verbose-gossip-logging")
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
index fee83bb317..6bc18f38a2 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
@@ -4,14 +4,18 @@
package akka.cluster
-import scala.collection.immutable
+import scala.collection.{ SortedSet, immutable }
+import ClusterSettings.Team
import MemberStatus._
+import akka.annotation.InternalApi
+
import scala.concurrent.duration.Deadline
/**
* INTERNAL API
*/
private[cluster] object Gossip {
+ type Timestamp = Long
val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty
val empty: Gossip = new Gossip(Gossip.emptyMembers)
@@ -59,10 +63,12 @@ private[cluster] object Gossip {
* removed node telling it to shut itself down.
*/
@SerialVersionUID(1L)
+@InternalApi
private[cluster] final case class Gossip(
- members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address
- overview: GossipOverview = GossipOverview(),
- version: VectorClock = VectorClock()) { // vector clock version
+ members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address
+ overview: GossipOverview = GossipOverview(),
+ version: VectorClock = VectorClock(), // vector clock version
+ tombstones: Map[UniqueAddress, Gossip.Timestamp] = Map.empty) {
if (Cluster.isAssertInvariantsEnabled) assertInvariants()
@@ -138,15 +144,18 @@ private[cluster] final case class Gossip(
this copy (overview = overview copy (seen = overview.seen union that.overview.seen))
/**
- * Merges two Gossip instances including membership tables, and the VectorClock histories.
+ * Merges two Gossip instances including membership tables, tombstones, and the VectorClock histories.
*/
def merge(that: Gossip): Gossip = {
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
+ // 2. merge sets of tombstones
+ val mergedTombstones = tombstones ++ that.tombstones
+
// 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups
- val mergedMembers = Gossip.emptyMembers union Member.pickHighestPriority(this.members, that.members)
+ val mergedMembers = Gossip.emptyMembers union Member.pickHighestPriority(this.members, that.members, mergedTombstones)
// 3. merge reachability table by picking records with highest version
val mergedReachability = this.overview.reachability.merge(
@@ -156,29 +165,36 @@ private[cluster] final case class Gossip(
// 4. Nobody can have seen this new gossip yet
val mergedSeen = Set.empty[UniqueAddress]
- Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock)
+ Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock, mergedTombstones)
}
/**
- * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
- * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
+ * Checks if we have a cluster convergence. If there are any in team node pairs that cannot reach each other
+ * then we can't have a convergence until those nodes reach each other again or one of them is downed
*
* @return true if convergence have been reached and false if not
*/
- def convergence(selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = {
- // First check that:
- // 1. we don't have any members that are unreachable, excluding observations from members
- // that have status DOWN, or
- // 2. all unreachable members in the set have status DOWN or EXITING
- // Else we can't continue to check for convergence
- // When that is done we check that all members with a convergence
- // status is in the seen table, i.e. has seen this version
- val unreachable = reachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect {
- case node if (node != selfUniqueAddress && !exitingConfirmed(node)) ⇒ member(node)
+ def convergence(team: Team, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = {
+ // Find cluster members in the team that are unreachable from other members of the team
+ // excluding observations from members outside of the team, that have status DOWN or is passed in as confirmed exiting.
+ val unreachableInTeam = teamReachabilityExcludingDownedObservers(team).allUnreachableOrTerminated.collect {
+ case node if node != selfUniqueAddress && !exitingConfirmed(node) ⇒ member(node)
}
- unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
- !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) &&
- !(seenByNode(m.uniqueAddress) || exitingConfirmed(m.uniqueAddress)))
+
+ // If another member in the team that is UP or LEAVING and has not seen this gossip or is exiting
+ // convergence cannot be reached
+ def teamMemberHinderingConvergenceExists =
+ members.exists(member ⇒
+ member.team == team &&
+ Gossip.convergenceMemberStatus(member.status) &&
+ !(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))
+ )
+
+ // unreachables outside of the team or with status DOWN or EXITING does not affect convergence
+ def allUnreachablesCanBeIgnored =
+ unreachableInTeam.forall(unreachable ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status))
+
+ allUnreachablesCanBeIgnored && !teamMemberHinderingConvergenceExists
}
lazy val reachabilityExcludingDownedObservers: Reachability = {
@@ -186,29 +202,81 @@ private[cluster] final case class Gossip(
overview.reachability.removeObservers(downed.map(_.uniqueAddress))
}
- def isLeader(node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean =
- leader(selfUniqueAddress).contains(node)
+ /**
+ * @return Reachability excluding observations from nodes outside of the team, but including observed unreachable
+ * nodes outside of the team
+ */
+ def teamReachability(team: Team): Reachability =
+ overview.reachability.removeObservers(members.collect { case m if m.team != team ⇒ m.uniqueAddress })
- def leader(selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
- leaderOf(members, selfUniqueAddress)
-
- def roleLeader(role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
- leaderOf(members.filter(_.hasRole(role)), selfUniqueAddress)
-
- def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
- val reachableMembers =
- if (overview.reachability.isAllReachable) mbrs.filterNot(_.status == Down)
- else mbrs.filter(m ⇒ m.status != Down &&
- (overview.reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress))
- if (reachableMembers.isEmpty) None
- else reachableMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)).
- orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress)
+ /**
+ * @return reachability for team nodes, with observations from outside the team or from downed nodes filtered out
+ */
+ def teamReachabilityExcludingDownedObservers(team: Team): Reachability = {
+ val membersToExclude = members.collect { case m if m.status == Down || m.team != team ⇒ m.uniqueAddress }
+ overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.team != team ⇒ m.uniqueAddress })
}
+ def teamMembers(team: Team): SortedSet[Member] =
+ members.filter(_.team == team)
+
+ def isTeamLeader(team: Team, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean =
+ teamLeader(team, selfUniqueAddress).contains(node)
+
+ def teamLeader(team: Team, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
+ leaderOf(team, members, selfUniqueAddress)
+
+ def roleLeader(team: Team, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
+ leaderOf(team, members.filter(_.hasRole(role)), selfUniqueAddress)
+
+ def leaderOf(team: Team, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
+ val reachability = teamReachability(team)
+
+ val reachableTeamMembers =
+ if (reachability.isAllReachable) mbrs.filter(m ⇒ m.team == team && m.status != Down)
+ else mbrs.filter(m ⇒
+ m.team == team &&
+ m.status != Down &&
+ (reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress))
+ if (reachableTeamMembers.isEmpty) None
+ else reachableTeamMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status))
+ .orElse(Some(reachableTeamMembers.min(Member.leaderStatusOrdering)))
+ .map(_.uniqueAddress)
+ }
+
+ def allTeams: Set[Team] = members.map(_.team)
+
def allRoles: Set[String] = members.flatMap(_.roles)
def isSingletonCluster: Boolean = members.size == 1
+ /**
+ * @return true if toAddress should be reachable from the fromTeam in general, within a team
+ * this means only caring about team-local observations, across teams it means caring
+ * about all observations for the toAddress.
+ */
+ def isReachableExcludingDownedObservers(fromTeam: Team, toAddress: UniqueAddress): Boolean =
+ if (!hasMember(toAddress)) false
+ else {
+ val to = member(toAddress)
+
+ // if member is in the same team, we ignore cross-team unreachability
+ if (fromTeam == to.team) teamReachabilityExcludingDownedObservers(fromTeam).isReachable(toAddress)
+ // if not it is enough that any non-downed node observed it as unreachable
+ else reachabilityExcludingDownedObservers.isReachable(toAddress)
+ }
+
+ /**
+ * @return true if fromAddress should be able to reach toAddress based on the unreachability data and their
+ * respective teams
+ */
+ def isReachable(fromAddress: UniqueAddress, toAddress: UniqueAddress): Boolean =
+ if (!hasMember(toAddress)) false
+ else {
+ // as it looks for specific unreachable entires for the node pair we don't have to filter on team
+ overview.reachability.isReachable(fromAddress, toAddress)
+ }
+
def member(node: UniqueAddress): Member = {
membersMap.getOrElse(
node,
@@ -222,14 +290,60 @@ private[cluster] final case class Gossip(
members.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
}
+ def removeAll(nodes: Iterable[UniqueAddress], removalTimestamp: Long): Gossip = {
+ nodes.foldLeft(this)((gossip, node) ⇒ gossip.remove(node, removalTimestamp))
+ }
+
+ def update(updatedMembers: immutable.SortedSet[Member]): Gossip = {
+ copy(members = updatedMembers union members)
+ }
+
+ /**
+ * Remove the given member from the set of members and mark it's removal with a tombstone to avoid having it
+ * reintroduced when merging with another gossip that has not seen the removal.
+ */
+ def remove(node: UniqueAddress, removalTimestamp: Long): Gossip = {
+ // removing REMOVED nodes from the `seen` table
+ val newSeen = overview.seen - node
+ // removing REMOVED nodes from the `reachability` table
+ val newReachability = overview.reachability.remove(node :: Nil)
+ val newOverview = overview.copy(seen = newSeen, reachability = newReachability)
+
+ // Clear the VectorClock when member is removed. The change made by the leader is stamped
+ // and will propagate as is if there are no other changes on other nodes.
+ // If other concurrent changes on other nodes (e.g. join) the pruning is also
+ // taken care of when receiving gossips.
+ val newVersion = version.prune(VectorClock.Node(ClusterCoreDaemon.vclockName(node)))
+ val newMembers = members.filterNot(_.uniqueAddress == node)
+ val newTombstones = tombstones + (node → removalTimestamp)
+ copy(version = newVersion, members = newMembers, overview = newOverview, tombstones = newTombstones)
+ }
+
+ def markAsDown(member: Member): Gossip = {
+ // replace member (changed status)
+ val newMembers = members - member + member.copy(status = Down)
+ // remove nodes marked as DOWN from the `seen` table
+ val newSeen = overview.seen - member.uniqueAddress
+
+ // update gossip overview
+ val newOverview = overview copy (seen = newSeen)
+ copy(members = newMembers, overview = newOverview) // update gossip
+ }
+
def prune(removedNode: VectorClock.Node): Gossip = {
val newVersion = version.prune(removedNode)
if (newVersion eq version) this
else copy(version = newVersion)
}
+ def pruneTombstones(removeEarlierThan: Gossip.Timestamp): Gossip = {
+ val newTombstones = tombstones.filter { case (_, timestamp) ⇒ timestamp > removeEarlierThan }
+ if (newTombstones.size == tombstones.size) this
+ else copy(tombstones = newTombstones)
+ }
+
override def toString =
- s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})"
+ s"Gossip(members = [${members.mkString(", ")}], overview = $overview, version = $version, tombstones = $tombstones)"
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala
index c0195d5e39..79b6ac7b77 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Member.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala
@@ -6,6 +6,8 @@ package akka.cluster
import akka.actor.Address
import MemberStatus._
+import akka.annotation.InternalApi
+import akka.cluster.ClusterSettings.Team
import scala.runtime.AbstractFunction2
@@ -22,8 +24,9 @@ class Member private[cluster] (
val status: MemberStatus,
val roles: Set[String]) extends Serializable {
- lazy val team: String = roles.find(_.startsWith("team-"))
+ lazy val team: String = roles.find(_.startsWith(ClusterSettings.TeamRolePrefix))
.getOrElse(throw new IllegalStateException("Team undefined, should not be possible"))
+ .substring(ClusterSettings.TeamRolePrefix.length)
def address: Address = uniqueAddress.address
@@ -32,7 +35,11 @@ class Member private[cluster] (
case m: Member ⇒ uniqueAddress == m.uniqueAddress
case _ ⇒ false
}
- override def toString = s"Member(address = ${address}, status = ${status})"
+ override def toString =
+ if (team == ClusterSettings.DefaultTeam)
+ s"Member(address = $address, status = $status)"
+ else
+ s"Member(address = $address, team = $team, status = $status)"
def hasRole(role: String): Boolean = roles.contains(role)
@@ -46,7 +53,9 @@ class Member private[cluster] (
* Is this member older, has been part of cluster longer, than another
* member. It is only correct when comparing two existing members in a
* cluster. A member that joined after removal of another member may be
- * considered older than the removed member.
+ * considered older than the removed member. Note that is only makes
+ * sense to compare with other members inside of one team (upNumber has
+ * a higher risk of being reused across teams).
*/
def isOlderThan(other: Member): Boolean =
if (upNumber == other.upNumber)
@@ -87,7 +96,8 @@ object Member {
/**
* INTERNAL API
*/
- private[cluster] def removed(node: UniqueAddress): Member = new Member(node, Int.MaxValue, Removed, Set.empty)
+ private[cluster] def removed(node: UniqueAddress): Member =
+ new Member(node, Int.MaxValue, Removed, Set(ClusterSettings.TeamRolePrefix + "-N/A"))
/**
* `Address` ordering type class, sorts addresses by host and port.
@@ -136,16 +146,24 @@ object Member {
(a, b) ⇒ a.isOlderThan(b)
}
- def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
+ @deprecated("Was accidentally made a public API, internal", since = "2.5.4")
+ def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] =
+ pickHighestPriority(a, b, Map.empty)
+
+ /**
+ * INTERNAL API.
+ */
+ @InternalApi
+ private[akka] def pickHighestPriority(a: Set[Member], b: Set[Member], tombstones: Map[UniqueAddress, Long]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress)
// pick highest MemberStatus
- (Member.none /: groupedByAddress) {
+ groupedByAddress.foldLeft(Member.none) {
case (acc, (_, members)) ⇒
if (members.size == 2) acc + members.reduceLeft(highestPriorityOf)
else {
val m = members.head
- if (Gossip.removeUnreachableWithMemberStatus(m.status)) acc // removed
+ if (tombstones.contains(m.uniqueAddress) || Gossip.removeUnreachableWithMemberStatus(m.status)) acc // removed
else acc + m
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala
index c27e6b3710..9b8c39e336 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala
@@ -3,6 +3,8 @@
*/
package akka.cluster
+import akka.annotation.InternalApi
+
import scala.collection.immutable
import scala.collection.breakOut
@@ -46,6 +48,7 @@ private[cluster] object Reachability {
* - Reachable otherwise, i.e. no observer node considers it as Unreachable
*/
@SerialVersionUID(1L)
+@InternalApi
private[cluster] class Reachability private (
val records: immutable.IndexedSeq[Reachability.Record],
val versions: Map[UniqueAddress, Long]) extends Serializable {
@@ -205,8 +208,14 @@ private[cluster] class Reachability private (
else if (cache.allUnreachable(node)) Unreachable
else Reachable
+ /**
+ * @return true if there is no observer that has marked node unreachable or terminated
+ */
def isReachable(node: UniqueAddress): Boolean = isAllReachable || !allUnreachableOrTerminated.contains(node)
+ /**
+ * @return true if there is no specific entry saying observer observed subject as unreachable
+ */
def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean =
status(observer, subject) == Reachable
diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
index 23c8834001..4896c706e3 100644
--- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
@@ -244,7 +244,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
private def gossipToProto(gossip: Gossip): cm.Gossip.Builder = {
val allMembers = gossip.members.toVector
- val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)
+ val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress) ++ gossip.tombstones.keys
val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc union m.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap
@@ -271,6 +271,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
}
}
+ def tombstoneToProto(t: (UniqueAddress, Long)): cm.Tombstone =
+ cm.Tombstone.newBuilder()
+ .setAddressIndex(mapUniqueAddress(t._1))
+ .setTimestamp(t._2)
+ .build()
+
val reachability = reachabilityToProto(gossip.overview.reachability)
val members = gossip.members.map(memberToProto)
val seen = gossip.overview.seen.map(mapUniqueAddress)
@@ -279,8 +285,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
addAllObserverReachability(reachability.map(_.build).asJava)
cm.Gossip.newBuilder().addAllAllAddresses(allAddresses.map(uniqueAddressToProto(_).build).asJava).
- addAllAllRoles(allRoles.asJava).addAllAllHashes(allHashes.asJava).addAllMembers(members.map(_.build).asJava).
- setOverview(overview).setVersion(vectorClockToProto(gossip.version, hashMapping))
+ addAllAllRoles(allRoles.asJava)
+ .addAllAllHashes(allHashes.asJava)
+ .addAllMembers(members.map(_.build).asJava)
+ .setOverview(overview)
+ .setVersion(vectorClockToProto(gossip.version, hashMapping))
+ .addAllTombstones(gossip.tombstones.map(tombstoneToProto).asJava)
}
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): cm.VectorClock.Builder = {
@@ -338,13 +348,17 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber),
member.getRolesIndexesList.asScala.map(roleMapping(_))(breakOut))
+ def tombstoneFromProto(tombstone: cm.Tombstone): (UniqueAddress, Long) =
+ (addressMapping(tombstone.getAddressIndex), tombstone.getTimestamp)
+
val members: immutable.SortedSet[Member] = gossip.getMembersList.asScala.map(memberFromProto)(breakOut)
val reachability = reachabilityFromProto(gossip.getOverview.getObserverReachabilityList.asScala)
val seen: Set[UniqueAddress] = gossip.getOverview.getSeenList.asScala.map(addressMapping(_))(breakOut)
val overview = GossipOverview(seen, reachability)
+ val tombstones: Map[UniqueAddress, Long] = gossip.getTombstonesList.asScala.map(tombstoneFromProto)(breakOut)
- Gossip(members, overview, vectorClockFromProto(gossip.getVersion, hashMapping))
+ Gossip(members, overview, vectorClockFromProto(gossip.getVersion, hashMapping), tombstones)
}
private def vectorClockFromProto(version: cm.VectorClock, hashMapping: immutable.Seq[String]) = {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala
new file mode 100644
index 0000000000..a1355f6ea9
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2009-2017 Lightbend Inc.
+ */
+package akka.cluster
+
+import akka.cluster.MemberStatus.Up
+import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
+import akka.remote.transport.ThrottlerTransportAdapter.Direction
+import com.typesafe.config.ConfigFactory
+
+import scala.concurrent.duration._
+
+object MultiTeamMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+ val fifth = role("fifth")
+
+ commonConfig(MultiNodeClusterSpec.clusterConfig)
+
+ nodeConfig(first, second)(ConfigFactory.parseString(
+ """
+ akka.cluster.team = "dc1"
+ akka.loglevel = INFO
+ """))
+
+ nodeConfig(third, fourth, fifth)(ConfigFactory.parseString(
+ """
+ akka.cluster.team = "dc2"
+ akka.loglevel = INFO
+ """))
+
+ testTransport(on = true)
+}
+
+class MultiTeamMultiJvmNode1 extends MultiTeamSpec
+class MultiTeamMultiJvmNode2 extends MultiTeamSpec
+class MultiTeamMultiJvmNode3 extends MultiTeamSpec
+class MultiTeamMultiJvmNode4 extends MultiTeamSpec
+class MultiTeamMultiJvmNode5 extends MultiTeamSpec
+
+abstract class MultiTeamSpec
+ extends MultiNodeSpec(MultiTeamMultiJvmSpec)
+ with MultiNodeClusterSpec {
+
+ import MultiTeamMultiJvmSpec._
+
+ "A cluster with multiple cluster teams" must {
+ "be able to form" in {
+
+ runOn(first) {
+ cluster.join(first)
+ }
+ runOn(second, third, fourth) {
+ cluster.join(first)
+ }
+ enterBarrier("form-cluster-join-attempt")
+
+ runOn(first, second, third, fourth) {
+ within(20.seconds) {
+ awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (4))
+ }
+ }
+
+ enterBarrier("cluster started")
+ }
+
+ "have a leader per team" in {
+ runOn(first, second) {
+ cluster.settings.Team should ===("dc1")
+ clusterView.leader shouldBe defined
+ val dc1 = Set(address(first), address(second))
+ dc1 should contain(clusterView.leader.get)
+ }
+ runOn(third, fourth) {
+ cluster.settings.Team should ===("dc2")
+ clusterView.leader shouldBe defined
+ val dc2 = Set(address(third), address(fourth))
+ dc2 should contain(clusterView.leader.get)
+ }
+
+ enterBarrier("leader per team")
+ }
+
+ "be able to have team member changes while there is inter-team unreachability" in within(20.seconds) {
+ runOn(first) {
+ testConductor.blackhole(first, third, Direction.Both).await
+ }
+ runOn(first, second, third, fourth) {
+ awaitAssert(clusterView.unreachableMembers should not be empty)
+ }
+ enterBarrier("inter-team unreachability")
+
+ runOn(fifth) {
+ cluster.join(third)
+ }
+
+ // should be able to join and become up since the
+ // unreachable is between dc1 and dc2,
+ within(10.seconds) {
+ awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5))
+ }
+
+ runOn(first) {
+ testConductor.passThrough(first, third, Direction.Both).await
+ }
+ runOn(first, second, third, fourth) {
+ awaitAssert(clusterView.unreachableMembers should not be empty)
+ }
+ enterBarrier("inter-team unreachability end")
+ }
+
+ "be able to have team member changes while there is unreachability in another team" in within(20.seconds) {
+ runOn(first) {
+ testConductor.blackhole(first, second, Direction.Both).await
+ }
+ runOn(first, second, third, fourth) {
+ awaitAssert(clusterView.unreachableMembers should not be empty)
+ }
+ enterBarrier("other-team-internal-unreachable")
+
+ runOn(third) {
+ cluster.join(fifth)
+ // should be able to join and leave
+ // since the unreachable nodes are inside of dc1
+ cluster.leave(fourth)
+
+ awaitAssert(clusterView.members.map(_.address) should not contain (address(fourth)))
+ awaitAssert(clusterView.members.collect { case m if m.status == Up ⇒ m.address } should contain(address(fifth)))
+ }
+
+ enterBarrier("other-team-internal-unreachable changed")
+
+ runOn(first) {
+ testConductor.passThrough(first, second, Direction.Both).await
+ }
+ enterBarrier("other-team-internal-unreachable end")
+ }
+
+ "be able to down a member of another team" in within(20.seconds) {
+ runOn(fifth) {
+ cluster.down(address(second))
+ }
+
+ runOn(first, third, fifth) {
+ awaitAssert(clusterView.members.map(_.address) should not contain (address(second)))
+ }
+ enterBarrier("cross-team-downed")
+ }
+
+ }
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala
new file mode 100644
index 0000000000..772f2de585
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2009-2017 Lightbend Inc.
+ */
+package akka.cluster
+
+import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
+import akka.remote.transport.ThrottlerTransportAdapter.Direction
+import com.typesafe.config.ConfigFactory
+
+import scala.concurrent.duration._
+
+object MultiTeamSplitBrainMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+
+ commonConfig(MultiNodeClusterSpec.clusterConfig)
+
+ nodeConfig(first, second)(ConfigFactory.parseString(
+ """
+ akka.cluster.team = "dc1"
+ akka.loglevel = INFO
+ """))
+
+ nodeConfig(third, fourth)(ConfigFactory.parseString(
+ """
+ akka.cluster.team = "dc2"
+ akka.loglevel = INFO
+ """))
+
+ testTransport(on = true)
+}
+
+class MultiTeamSplitBrainMultiJvmNode1 extends MultiTeamSpec
+class MultiTeamSplitBrainMultiJvmNode2 extends MultiTeamSpec
+class MultiTeamSplitBrainMultiJvmNode3 extends MultiTeamSpec
+class MultiTeamSplitBrainMultiJvmNode4 extends MultiTeamSpec
+class MultiTeamSplitBrainMultiJvmNode5 extends MultiTeamSpec
+
+abstract class MultiTeamSplitBrainSpec
+ extends MultiNodeSpec(MultiTeamSplitBrainMultiJvmSpec)
+ with MultiNodeClusterSpec {
+
+ import MultiTeamSplitBrainMultiJvmSpec._
+
+ val dc1 = List(first, second)
+ val dc2 = List(third, fourth)
+
+ def splitTeams(): Unit = {
+ runOn(first) {
+ for {
+ dc1Node ← dc1
+ dc2Node ← dc2
+ } {
+ testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await
+ }
+ }
+
+ runOn(dc1: _*) {
+ awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(dc2.map(address)))
+ }
+ runOn(dc2: _*) {
+ awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(dc1.map(address)))
+ }
+
+ }
+
+ def unsplitTeams(): Unit = {
+ runOn(first) {
+ for {
+ dc1Node ← dc1
+ dc2Node ← dc2
+ } {
+ testConductor.passThrough(dc1Node, dc2Node, Direction.Both).await
+ }
+ }
+
+ awaitAllReachable()
+ }
+
+ "A cluster with multiple cluster teams" must {
+ "be able to form two teams" in {
+ awaitClusterUp(first, second, third)
+ }
+
+ "be able to have a team member join while there is inter-team split" in within(20.seconds) {
+ // introduce a split between teams
+ splitTeams()
+ enterBarrier("team-split-1")
+
+ runOn(fourth) {
+ cluster.join(third)
+ }
+ enterBarrier("inter-team unreachability")
+
+ // should be able to join and become up since the
+ // split is between dc1 and dc2
+ runOn(third, fourth) {
+ awaitAssert(clusterView.members.collect {
+ case m if m.team == "dc2" && m.status == MemberStatus.Up ⇒ m.address
+ }) should ===(Set(address(third), address(fourth)))
+ }
+ enterBarrier("dc2-join-completed")
+
+ unsplitTeams()
+ enterBarrier("team-unsplit-1")
+
+ runOn(dc1: _*) {
+ awaitAssert(clusterView.members.collect {
+ case m if m.team == "dc2" && m.status == MemberStatus.Up ⇒ m.address
+ }) should ===(Set(address(third), address(fourth)))
+ }
+
+ enterBarrier("inter-team-split-1-done")
+ }
+
+ "be able to have team member leave while there is inter-team split" in within(20.seconds) {
+ splitTeams()
+ enterBarrier("team-split-2")
+
+ runOn(fourth) {
+ cluster.leave(third)
+ }
+
+ runOn(third, fourth) {
+ awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))
+ }
+ enterBarrier("node-4-left")
+
+ unsplitTeams()
+ enterBarrier("team-unsplit-2")
+
+ runOn(first, second) {
+ awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))
+ }
+ enterBarrier("inter-team-split-2-done")
+ }
+
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala
index e9d1897fce..1d78f51a85 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala
@@ -131,7 +131,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged]))
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp)))
- subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address)))
+ subscriber.expectMsgAllOf(
+ RoleLeaderChanged("GRP", Some(dUp.address)),
+ RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(dUp.address))
+ )
publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala
index e474340b81..f3c5f54ab6 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala
@@ -52,7 +52,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for changed status of members" in {
@@ -61,7 +61,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for members in unreachable" in {
@@ -76,7 +76,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq(UnreachableMember(bDown)))
// never include self member in unreachable
diffUnreachable(g1, g2, bDown.uniqueAddress) should ===(Seq())
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq.empty)
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq.empty)
}
"be produced for members becoming reachable after unreachable" in {
@@ -104,7 +104,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(dRemoved, Exiting)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for convergence changes" in {
@@ -113,10 +113,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq.empty)
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
diffMemberEvents(g2, g1) should ===(Seq.empty)
diffUnreachable(g2, g1, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
}
"be produced for leader changes" in {
@@ -125,27 +125,38 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
- diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
- diffLeader(g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address))))
+ diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
+ diffLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address))))
}
- "be produced for role leader changes" in {
+ "be produced for role leader changes in the same team" in {
val g0 = Gossip.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
- diffRolesLeader(g0, g1, selfDummyAddress) should ===(
+ diffRolesLeader(ClusterSettings.DefaultTeam, g0, g1, selfDummyAddress) should ===(
Set(
+ // since this role is implicitly added
+ RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(aUp.address)),
RoleLeaderChanged("AA", Some(aUp.address)),
RoleLeaderChanged("AB", Some(aUp.address)),
RoleLeaderChanged("BB", Some(bUp.address)),
RoleLeaderChanged("DD", Some(dLeaving.address)),
RoleLeaderChanged("DE", Some(dLeaving.address)),
RoleLeaderChanged("EE", Some(eUp.address))))
- diffRolesLeader(g1, g2, selfDummyAddress) should ===(
+ diffRolesLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(
Set(
+ RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(bUp.address)),
RoleLeaderChanged("AA", None),
RoleLeaderChanged("AB", Some(bUp.address)),
RoleLeaderChanged("DE", Some(eJoining.address))))
}
+
+ "not be produced for role leader changes in other teams" in {
+ val g0 = Gossip.empty
+ val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
+ val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
+ diffRolesLeader("dc2", g0, g1, selfDummyAddress) should ===(Set.empty)
+ diffRolesLeader("dc2", g1, g2, selfDummyAddress) should ===(Set.empty)
+ }
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
index 956814d9df..810555adfc 100644
--- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
@@ -7,6 +7,8 @@ package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
+import akka.cluster.ClusterSettings.DefaultTeam
+
import scala.collection.immutable.SortedSet
class GossipSpec extends WordSpec with Matchers {
@@ -25,43 +27,55 @@ class GossipSpec extends WordSpec with Matchers {
val e2 = TestMember(e1.address, Up)
val e3 = TestMember(e1.address, Down)
+ val dc1a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, team = "dc1")
+ val dc1b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, team = "dc1")
+ val dc2c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, team = "dc2")
+ val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, team = "dc2")
+ val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, team = dc2d1.team)
+
"A Gossip" must {
+ "have correct test setup" in {
+ List(a1, a2, b1, b2, c1, c2, c3, d1, e1, e2, e3).foreach(m ⇒
+ m.team should ===(DefaultTeam)
+ )
+ }
+
"reach convergence when it's empty" in {
- Gossip.empty.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ Gossip.empty.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence for one node" in {
val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"not reach convergence until all have seen version" in {
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(false)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(false)
}
"reach convergence for two nodes" in {
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping joining" in {
// e1 is joining
val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping down" in {
// e3 is down
val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping Leaving with exitingConfirmed" in {
// c1 is Leaving
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"reach convergence, skipping unreachable Leaving with exitingConfirmed" in {
@@ -69,16 +83,16 @@ class GossipSpec extends WordSpec with Matchers {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress)
val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"not reach convergence when unreachable" in {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
- g1.convergence(b1.uniqueAddress, Set.empty) should ===(false)
+ g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(false)
// but from a1's point of view (it knows that itself is not unreachable)
- g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence when downed node has observed unreachable" in {
@@ -86,7 +100,7 @@ class GossipSpec extends WordSpec with Matchers {
val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress)
- g1.convergence(b1.uniqueAddress, Set.empty) should ===(true)
+ g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(true)
}
"merge members by status priority" in {
@@ -133,21 +147,37 @@ class GossipSpec extends WordSpec with Matchers {
}
"have leader as first member based on ordering, except Exiting status" in {
- Gossip(members = SortedSet(c2, e2)).leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
- Gossip(members = SortedSet(c3, e2)).leader(c3.uniqueAddress) should ===(Some(e2.uniqueAddress))
- Gossip(members = SortedSet(c3)).leader(c3.uniqueAddress) should ===(Some(c3.uniqueAddress))
+ Gossip(members = SortedSet(c2, e2)).teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
+ Gossip(members = SortedSet(c3, e2)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(e2.uniqueAddress))
+ Gossip(members = SortedSet(c3)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(c3.uniqueAddress))
}
"have leader as first reachable member based on ordering" in {
val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress)
val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1))
- g1.leader(e2.uniqueAddress) should ===(Some(e2.uniqueAddress))
+ g1.teamLeader(DefaultTeam, e2.uniqueAddress) should ===(Some(e2.uniqueAddress))
// but when c2 is selfUniqueAddress
- g1.leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
+ g1.teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
}
"not have Down member as leader" in {
- Gossip(members = SortedSet(e3)).leader(e3.uniqueAddress) should ===(None)
+ Gossip(members = SortedSet(e3)).teamLeader(DefaultTeam, e3.uniqueAddress) should ===(None)
+ }
+
+ "have a leader per team" in {
+ val g1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
+
+ // everybodys point of view is dc1a1 being leader of dc1
+ g1.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g1.teamLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g1.teamLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g1.teamLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+
+ // and dc2c1 being leader of dc2
+ g1.teamLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g1.teamLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g1.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g1.teamLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
}
"merge seen table correctly" in {
@@ -182,5 +212,217 @@ class GossipSpec extends WordSpec with Matchers {
val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4)))
g3.youngestMember should ===(e2)
}
+
+ "reach convergence per team" in {
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
+ .seen(dc1a1.uniqueAddress)
+ .seen(dc1b1.uniqueAddress)
+ .seen(dc2c1.uniqueAddress)
+ .seen(dc2d1.uniqueAddress)
+ g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
+
+ g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true)
+ }
+
+ "reach convergence per team even if members of another team has not seen the gossip" in {
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
+ .seen(dc1a1.uniqueAddress)
+ .seen(dc1b1.uniqueAddress)
+ .seen(dc2c1.uniqueAddress)
+ // dc2d1 has not seen the gossip
+
+ // so dc1 can reach convergence
+ g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
+
+ // but dc2 cannot
+ g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false)
+ }
+
+ "reach convergence per team even if another team contains unreachable" in {
+ val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress)
+
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
+ .seen(dc1a1.uniqueAddress)
+ .seen(dc1b1.uniqueAddress)
+ .seen(dc2c1.uniqueAddress)
+ .seen(dc2d1.uniqueAddress)
+
+ // this team doesn't care about dc2 having reachability problems and can reach convergence
+ g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
+
+ // this team is cannot reach convergence because of unreachability within the team
+ g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false)
+ }
+
+ "reach convergence per team even if there is unreachable nodes in another team" in {
+ val r1 = Reachability.empty
+ .unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress)
+ .unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress)
+
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
+ .seen(dc1a1.uniqueAddress)
+ .seen(dc1b1.uniqueAddress)
+ .seen(dc2c1.uniqueAddress)
+ .seen(dc2d1.uniqueAddress)
+
+ // neither team is affected by the inter-team unreachability as far as convergence goes
+ g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
+
+ g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true)
+ }
+
+ "ignore cross team unreachability when determining inside of team reachability" in {
+ val r1 = Reachability.empty
+ .unreachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress)
+ .unreachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress)
+
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
+
+ // inside of the teams we don't care about the cross team unreachability
+ g.isReachable(dc1a1.uniqueAddress, dc1b1.uniqueAddress) should ===(true)
+ g.isReachable(dc1b1.uniqueAddress, dc1a1.uniqueAddress) should ===(true)
+ g.isReachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) should ===(true)
+ g.isReachable(dc2d1.uniqueAddress, dc2c1.uniqueAddress) should ===(true)
+
+ g.isReachableExcludingDownedObservers(dc1a1.team, dc1b1.uniqueAddress) should ===(true)
+ g.isReachableExcludingDownedObservers(dc1b1.team, dc1a1.uniqueAddress) should ===(true)
+ g.isReachableExcludingDownedObservers(dc2c1.team, dc2d1.uniqueAddress) should ===(true)
+ g.isReachableExcludingDownedObservers(dc2d1.team, dc2c1.uniqueAddress) should ===(true)
+
+ // between teams it matters though
+ g.isReachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress) should ===(false)
+ g.isReachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress) should ===(false)
+ // this isReachable method only says false for specific unreachable entries between the nodes
+ g.isReachable(dc1b1.uniqueAddress, dc2c1.uniqueAddress) should ===(true)
+ g.isReachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) should ===(true)
+
+ // this one looks at all unreachable-entries for the to-address
+ g.isReachableExcludingDownedObservers(dc1a1.team, dc2c1.uniqueAddress) should ===(false)
+ g.isReachableExcludingDownedObservers(dc1b1.team, dc2c1.uniqueAddress) should ===(false)
+ g.isReachableExcludingDownedObservers(dc2c1.team, dc1a1.uniqueAddress) should ===(false)
+ g.isReachableExcludingDownedObservers(dc2d1.team, dc1a1.uniqueAddress) should ===(false)
+
+ // between the two other nodes there is no unreachability
+ g.isReachable(dc1b1.uniqueAddress, dc2d1.uniqueAddress) should ===(true)
+ g.isReachable(dc2d1.uniqueAddress, dc1b1.uniqueAddress) should ===(true)
+
+ g.isReachableExcludingDownedObservers(dc1b1.team, dc2d1.uniqueAddress) should ===(true)
+ g.isReachableExcludingDownedObservers(dc2d1.team, dc1b1.uniqueAddress) should ===(true)
+ }
+
+ "not returning a downed team leader" in {
+ val g = Gossip(members = SortedSet(dc1a1.copy(Down), dc1b1))
+ g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1b1.uniqueAddress))
+ }
+
+ "ignore cross team unreachability when determining team leader" in {
+ val r1 = Reachability.empty
+ .unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress)
+ .unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress)
+
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
+
+ g.leaderOf("dc1", g.members, dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.leaderOf("dc1", g.members, dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+ g.leaderOf("dc1", g.members, dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
+
+ g.leaderOf("dc2", g.members, dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.leaderOf("dc2", g.members, dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.leaderOf("dc2", g.members, dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ g.leaderOf("dc2", g.members, dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
+ }
+
+ // TODO test coverage for when leaderOf returns None - I have not been able to figure it out
+
+ "clear out a bunch of stuff when removing a node" in {
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2d2))
+ .remove(dc1b1.uniqueAddress, System.currentTimeMillis())
+
+ g.seenBy should not contain (dc1b1.uniqueAddress)
+ g.overview.reachability.records.exists(_.observer == dc1b1.uniqueAddress) should be(false)
+ g.overview.reachability.records.exists(_.subject == dc1b1.uniqueAddress) should be(false)
+ g.version.versions should have size (0)
+
+ // sort order should be kept
+ g.members.toList should ===(List(dc1a1, dc2d2))
+ }
+
+ "not reintroduce members from out-of-team gossip when merging" in {
+ // dc1 does not know about any unreachability nor that the node has been downed
+ val gdc1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
+
+ // dc2 has downed the dc2d1 node, seen it as unreachable and removed it
+ val gdc2 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
+ .remove(dc2d1.uniqueAddress, System.currentTimeMillis())
+
+ gdc2.tombstones.keys should contain(dc2d1.uniqueAddress)
+ gdc2.members should not contain (dc2d1)
+ gdc2.overview.reachability.records.filter(r ⇒ r.subject == dc2d1.uniqueAddress || r.observer == dc2d1.uniqueAddress) should be(empty)
+ gdc2.overview.reachability.versions.keys should not contain (dc2d1.uniqueAddress)
+
+ // when we merge the two, it should not be reintroduced
+ val merged1 = gdc2 merge gdc1
+ merged1.members should ===(SortedSet(dc1a1, dc1b1, dc2c1))
+
+ merged1.tombstones.keys should contain(dc2d1.uniqueAddress)
+ merged1.members should not contain (dc2d1)
+ merged1.overview.reachability.records.filter(r ⇒ r.subject == dc2d1.uniqueAddress || r.observer == dc2d1.uniqueAddress) should be(empty)
+ merged1.overview.reachability.versions.keys should not contain (dc2d1.uniqueAddress)
+
+ }
+
+ "prune old tombstones" in {
+ val timestamp = 352684800
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1))
+ .remove(dc1b1.uniqueAddress, timestamp)
+
+ g.tombstones.keys should contain(dc1b1.uniqueAddress)
+
+ val pruned = g.pruneTombstones(timestamp + 1)
+
+ // when we merge the two, it should not be reintroduced
+ pruned.tombstones.keys should not contain (dc1b1.uniqueAddress)
+ }
+
+ "mark a node as down" in {
+ val g = Gossip(members = SortedSet(dc1a1, dc1b1))
+ .seen(dc1a1.uniqueAddress)
+ .seen(dc1b1.uniqueAddress)
+ .markAsDown(dc1b1)
+
+ g.member(dc1b1.uniqueAddress).status should ===(MemberStatus.Down)
+ g.overview.seen should not contain (dc1b1.uniqueAddress)
+
+ // obviously the other member should be unaffected
+ g.member(dc1a1.uniqueAddress).status should ===(dc1a1.status)
+ g.overview.seen should contain(dc1a1.uniqueAddress)
+ }
+
+ "update members" in {
+ val joining = TestMember(Address("akka.tcp", "sys", "d", 2552), Joining, Set.empty, team = "dc2")
+ val g = Gossip(members = SortedSet(dc1a1, joining))
+
+ g.member(joining.uniqueAddress).status should ===(Joining)
+ val oldMembers = g.members
+
+ val updated = g.update(SortedSet(joining.copy(status = Up)))
+
+ updated.member(joining.uniqueAddress).status should ===(Up)
+
+ // obviously the other member should be unaffected
+ updated.member(dc1a1.uniqueAddress).status should ===(dc1a1.status)
+
+ // order should be kept
+ updated.members.toList.map(_.uniqueAddress) should ===(List(dc1a1.uniqueAddress, joining.uniqueAddress))
+ }
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala
index ef5299fa8a..028a727f33 100644
--- a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala
@@ -9,6 +9,6 @@ object TestMember {
def apply(address: Address, status: MemberStatus): Member =
apply(address, status, Set.empty)
- def apply(address: Address, status: MemberStatus, roles: Set[String]): Member =
- new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles)
+ def apply(address: Address, status: MemberStatus, roles: Set[String], team: ClusterSettings.Team = ClusterSettings.DefaultTeam): Member =
+ new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles + (ClusterSettings.TeamRolePrefix + team))
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
index 4bc6c998f0..db327a7fbc 100644
--- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
@@ -21,7 +21,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
val ref = serializer.fromBinary(blob, obj.getClass)
obj match {
case env: GossipEnvelope ⇒
- val env2 = obj.asInstanceOf[GossipEnvelope]
+ val env2 = ref.asInstanceOf[GossipEnvelope]
env2.from should ===(env.from)
env2.to should ===(env.to)
env2.gossip should ===(env.gossip)
@@ -65,9 +65,11 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress)
val reachability3 = Reachability.empty.unreachable(a1.uniqueAddress, e1.uniqueAddress).unreachable(b1.uniqueAddress, e1.uniqueAddress)
val g3 = g2.copy(members = SortedSet(a1, b1, c1, d1, e1), overview = g2.overview.copy(reachability = reachability3))
+ val g4 = g1.remove(d1.uniqueAddress, 352684800)
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3))
+ checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g4))
checkSerialization(GossipStatus(a1.uniqueAddress, g1.version))
checkSerialization(GossipStatus(a1.uniqueAddress, g2.version))
diff --git a/project/MiMa.scala b/project/MiMa.scala
index 6c15c4a27c..8eb9783025 100644
--- a/project/MiMa.scala
+++ b/project/MiMa.scala
@@ -1229,7 +1229,28 @@ object MiMa extends AutoPlugin {
// #23023 added a new overload with implementation to trait, so old transport implementations compiled against
// older versions will be missing the method. We accept that incompatibility for now.
- ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate"),
+
+ // #23228 single leader per cluster team
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.apply"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.copy"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.isLeader"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.leader"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.leaderOf"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.roleLeader"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.NumberOfGossipsBeforeShutdownWhenLeaderExits"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.vclockName"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.MaxGossipsBeforeShuttingDownMyself"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffLeader"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffRolesLeader"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterEvent.diffSeen"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesCount"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstones"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesList"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesOrBuilderList"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#GossipOrBuilder.getTombstonesOrBuilder")
)
)