diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index 227f148965..590254ef83 100644
--- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -7622,6 +7622,16 @@ public final class ReplicatorMessages {
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.AddressOrBuilder getSeenOrBuilder(
int index);
+
+ // optional sint64 obsoleteTime = 5;
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ boolean hasObsoleteTime();
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ long getObsoleteTime();
}
/**
* Protobuf type {@code akka.cluster.ddata.DataEnvelope.PruningEntry}
@@ -7713,6 +7723,11 @@ public final class ReplicatorMessages {
seen_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Address.PARSER, extensionRegistry));
break;
}
+ case 40: {
+ bitField0_ |= 0x00000008;
+ obsoleteTime_ = input.readSInt64();
+ break;
+ }
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@@ -7852,11 +7867,28 @@ public final class ReplicatorMessages {
return seen_.get(index);
}
+ // optional sint64 obsoleteTime = 5;
+ public static final int OBSOLETETIME_FIELD_NUMBER = 5;
+ private long obsoleteTime_;
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public boolean hasObsoleteTime() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public long getObsoleteTime() {
+ return obsoleteTime_;
+ }
+
private void initFields() {
removedAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
ownerAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
performed_ = false;
seen_ = java.util.Collections.emptyList();
+ obsoleteTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7908,6 +7940,9 @@ public final class ReplicatorMessages {
for (int i = 0; i < seen_.size(); i++) {
output.writeMessage(4, seen_.get(i));
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeSInt64(5, obsoleteTime_);
+ }
getUnknownFields().writeTo(output);
}
@@ -7933,6 +7968,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(4, seen_.get(i));
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeSInt64Size(5, obsoleteTime_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -8072,6 +8111,8 @@ public final class ReplicatorMessages {
} else {
seenBuilder_.clear();
}
+ obsoleteTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -8129,6 +8170,10 @@ public final class ReplicatorMessages {
} else {
result.seen_ = seenBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.obsoleteTime_ = obsoleteTime_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -8180,6 +8225,9 @@ public final class ReplicatorMessages {
}
}
}
+ if (other.hasObsoleteTime()) {
+ setObsoleteTime(other.getObsoleteTime());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -8740,6 +8788,39 @@ public final class ReplicatorMessages {
return seenBuilder_;
}
+ // optional sint64 obsoleteTime = 5;
+ private long obsoleteTime_ ;
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public boolean hasObsoleteTime() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public long getObsoleteTime() {
+ return obsoleteTime_;
+ }
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public Builder setObsoleteTime(long value) {
+ bitField0_ |= 0x00000010;
+ obsoleteTime_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional sint64 obsoleteTime = 5;
+ */
+ public Builder clearObsoleteTime() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ obsoleteTime_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DataEnvelope.PruningEntry)
}
@@ -14781,6 +14862,31 @@ public final class ReplicatorMessages {
* required .akka.cluster.ddata.OtherMessage data = 1;
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder();
+
+ // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ java.util.List
+ getPruningList();
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index);
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ int getPruningCount();
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
+ getPruningOrBuilderList();
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
+ int index);
}
/**
* Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
@@ -14846,6 +14952,14 @@ public final class ReplicatorMessages {
bitField0_ |= 0x00000001;
break;
}
+ case 18: {
+ if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ pruning_ = new java.util.ArrayList();
+ mutable_bitField0_ |= 0x00000002;
+ }
+ pruning_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.PARSER, extensionRegistry));
+ break;
+ }
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@@ -14854,6 +14968,9 @@ public final class ReplicatorMessages {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
+ if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ pruning_ = java.util.Collections.unmodifiableList(pruning_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -14908,8 +15025,45 @@ public final class ReplicatorMessages {
return data_;
}
+ // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ public static final int PRUNING_FIELD_NUMBER = 2;
+ private java.util.List pruning_;
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public java.util.List getPruningList() {
+ return pruning_;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
+ getPruningOrBuilderList() {
+ return pruning_;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public int getPruningCount() {
+ return pruning_.size();
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) {
+ return pruning_.get(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
+ int index) {
+ return pruning_.get(index);
+ }
+
private void initFields() {
data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
+ pruning_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -14924,6 +15078,12 @@ public final class ReplicatorMessages {
memoizedIsInitialized = 0;
return false;
}
+ for (int i = 0; i < getPruningCount(); i++) {
+ if (!getPruning(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -14934,6 +15094,9 @@ public final class ReplicatorMessages {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, data_);
}
+ for (int i = 0; i < pruning_.size(); i++) {
+ output.writeMessage(2, pruning_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -14947,6 +15110,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, data_);
}
+ for (int i = 0; i < pruning_.size(); i++) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(2, pruning_.get(i));
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -15056,6 +15223,7 @@ public final class ReplicatorMessages {
private void maybeForceBuilderInitialization() {
if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getDataFieldBuilder();
+ getPruningFieldBuilder();
}
}
private static Builder create() {
@@ -15070,6 +15238,12 @@ public final class ReplicatorMessages {
dataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
+ if (pruningBuilder_ == null) {
+ pruning_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000002);
+ } else {
+ pruningBuilder_.clear();
+ }
return this;
}
@@ -15106,6 +15280,15 @@ public final class ReplicatorMessages {
} else {
result.data_ = dataBuilder_.build();
}
+ if (pruningBuilder_ == null) {
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ pruning_ = java.util.Collections.unmodifiableList(pruning_);
+ bitField0_ = (bitField0_ & ~0x00000002);
+ }
+ result.pruning_ = pruning_;
+ } else {
+ result.pruning_ = pruningBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -15125,6 +15308,32 @@ public final class ReplicatorMessages {
if (other.hasData()) {
mergeData(other.getData());
}
+ if (pruningBuilder_ == null) {
+ if (!other.pruning_.isEmpty()) {
+ if (pruning_.isEmpty()) {
+ pruning_ = other.pruning_;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ } else {
+ ensurePruningIsMutable();
+ pruning_.addAll(other.pruning_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.pruning_.isEmpty()) {
+ if (pruningBuilder_.isEmpty()) {
+ pruningBuilder_.dispose();
+ pruningBuilder_ = null;
+ pruning_ = other.pruning_;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ pruningBuilder_ =
+ akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getPruningFieldBuilder() : null;
+ } else {
+ pruningBuilder_.addAllMessages(other.pruning_);
+ }
+ }
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -15138,6 +15347,12 @@ public final class ReplicatorMessages {
return false;
}
+ for (int i = 0; i < getPruningCount(); i++) {
+ if (!getPruning(i).isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -15277,6 +15492,246 @@ public final class ReplicatorMessages {
return dataBuilder_;
}
+ // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ private java.util.List pruning_ =
+ java.util.Collections.emptyList();
+ private void ensurePruningIsMutable() {
+ if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+ pruning_ = new java.util.ArrayList(pruning_);
+ bitField0_ |= 0x00000002;
+ }
+ }
+
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder> pruningBuilder_;
+
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public java.util.List getPruningList() {
+ if (pruningBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(pruning_);
+ } else {
+ return pruningBuilder_.getMessageList();
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public int getPruningCount() {
+ if (pruningBuilder_ == null) {
+ return pruning_.size();
+ } else {
+ return pruningBuilder_.getCount();
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) {
+ if (pruningBuilder_ == null) {
+ return pruning_.get(index);
+ } else {
+ return pruningBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder setPruning(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
+ if (pruningBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePruningIsMutable();
+ pruning_.set(index, value);
+ onChanged();
+ } else {
+ pruningBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder setPruning(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
+ if (pruningBuilder_ == null) {
+ ensurePruningIsMutable();
+ pruning_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ pruningBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder addPruning(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
+ if (pruningBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePruningIsMutable();
+ pruning_.add(value);
+ onChanged();
+ } else {
+ pruningBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder addPruning(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
+ if (pruningBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePruningIsMutable();
+ pruning_.add(index, value);
+ onChanged();
+ } else {
+ pruningBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder addPruning(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
+ if (pruningBuilder_ == null) {
+ ensurePruningIsMutable();
+ pruning_.add(builderForValue.build());
+ onChanged();
+ } else {
+ pruningBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder addPruning(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
+ if (pruningBuilder_ == null) {
+ ensurePruningIsMutable();
+ pruning_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ pruningBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder addAllPruning(
+ java.lang.Iterable extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> values) {
+ if (pruningBuilder_ == null) {
+ ensurePruningIsMutable();
+ super.addAll(values, pruning_);
+ onChanged();
+ } else {
+ pruningBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder clearPruning() {
+ if (pruningBuilder_ == null) {
+ pruning_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000002);
+ onChanged();
+ } else {
+ pruningBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public Builder removePruning(int index) {
+ if (pruningBuilder_ == null) {
+ ensurePruningIsMutable();
+ pruning_.remove(index);
+ onChanged();
+ } else {
+ pruningBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder getPruningBuilder(
+ int index) {
+ return getPruningFieldBuilder().getBuilder(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
+ int index) {
+ if (pruningBuilder_ == null) {
+ return pruning_.get(index); } else {
+ return pruningBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
+ getPruningOrBuilderList() {
+ if (pruningBuilder_ != null) {
+ return pruningBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(pruning_);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder() {
+ return getPruningFieldBuilder().addBuilder(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder(
+ int index) {
+ return getPruningFieldBuilder().addBuilder(
+ index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
+ */
+ public java.util.List
+ getPruningBuilderList() {
+ return getPruningFieldBuilder().getBuilderList();
+ }
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
+ getPruningFieldBuilder() {
+ if (pruningBuilder_ == null) {
+ pruningBuilder_ = new akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>(
+ pruning_,
+ ((bitField0_ & 0x00000002) == 0x00000002),
+ getParentForChildren(),
+ isClean());
+ pruning_ = null;
+ }
+ return pruningBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DurableDataEnvelope)
}
@@ -15431,32 +15886,34 @@ public final class ReplicatorMessages {
"key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.clust" +
"er.ddata.DataEnvelope\"\007\n\005Empty\"\023\n\004Read\022\013" +
"\n\003key\030\001 \002(\t\"@\n\nReadResult\0222\n\010envelope\030\001 " +
- "\001(\0132 .akka.cluster.ddata.DataEnvelope\"\301\002" +
+ "\001(\0132 .akka.cluster.ddata.DataEnvelope\"\327\002" +
"\n\014DataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.clus" +
"ter.ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132" +
"-.akka.cluster.ddata.DataEnvelope.Prunin" +
- "gEntry\032\300\001\n\014PruningEntry\0229\n\016removedAddres" +
+ "gEntry\032\326\001\n\014PruningEntry\0229\n\016removedAddres" +
"s\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddre",
"ss\0227\n\014ownerAddress\030\002 \002(\0132!.akka.cluster." +
"ddata.UniqueAddress\022\021\n\tperformed\030\003 \002(\010\022)" +
"\n\004seen\030\004 \003(\0132\033.akka.cluster.ddata.Addres" +
- "s\"\203\001\n\006Status\022\r\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks" +
- "\030\002 \002(\r\0221\n\007entries\030\003 \003(\0132 .akka.cluster.d" +
- "data.Status.Entry\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022" +
- "\016\n\006digest\030\002 \002(\014\"\227\001\n\006Gossip\022\020\n\010sendBack\030\001" +
- " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda" +
- "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" +
- "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat",
- "aEnvelope\"X\n\rUniqueAddress\022,\n\007address\030\001 " +
- "\002(\0132\033.akka.cluster.ddata.Address\022\013\n\003uid\030" +
- "\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostna" +
- "me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" +
- "\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" +
- "\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" +
- "GSet\022\020\n\010elements\030\001 \003(\t\"E\n\023DurableDataEnv" +
- "elope\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata" +
- ".OtherMessageB#\n\037akka.cluster.ddata.prot" +
- "obuf.msgH\001"
+ "s\022\024\n\014obsoleteTime\030\005 \001(\022\"\203\001\n\006Status\022\r\n\005ch" +
+ "unk\030\001 \002(\r\022\021\n\ttotChunks\030\002 \002(\r\0221\n\007entries\030" +
+ "\003 \003(\0132 .akka.cluster.ddata.Status.Entry\032" +
+ "$\n\005Entry\022\013\n\003key\030\001 \002(\t\022\016\n\006digest\030\002 \002(\014\"\227\001" +
+ "\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007entries\030\002 " +
+ "\003(\0132 .akka.cluster.ddata.Gossip.Entry\032H\n" +
+ "\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .",
+ "akka.cluster.ddata.DataEnvelope\"X\n\rUniqu" +
+ "eAddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster" +
+ ".ddata.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(" +
+ "\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002" +
+ " \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMessage\030" +
+ "\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" +
+ "nifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030\001" +
+ " \003(\t\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 \002" +
+ "(\0132 .akka.cluster.ddata.OtherMessage\022>\n\007" +
+ "pruning\030\002 \003(\0132-.akka.cluster.ddata.DataE",
+ "nvelope.PruningEntryB#\n\037akka.cluster.dda" +
+ "ta.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15540,7 +15997,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_descriptor,
- new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", });
+ new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", "ObsoleteTime", });
internal_static_akka_cluster_ddata_Status_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_akka_cluster_ddata_Status_fieldAccessorTable = new
@@ -15594,7 +16051,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor,
- new java.lang.String[] { "Data", });
+ new java.lang.String[] { "Data", "Pruning", });
return null;
}
};
diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
index 69755f2314..7c57b84215 100644
--- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
+++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
@@ -68,6 +68,7 @@ message DataEnvelope {
required UniqueAddress ownerAddress = 2;
required bool performed = 3;
repeated Address seen = 4;
+ optional sint64 obsoleteTime = 5;
}
required OtherMessage data = 1;
@@ -119,4 +120,5 @@ message StringGSet {
message DurableDataEnvelope {
required OtherMessage data = 1;
+ repeated DataEnvelope.PruningEntry pruning = 2;
}
diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf
index ad8a7502a1..0df2bf195f 100644
--- a/akka-distributed-data/src/main/resources/reference.conf
+++ b/akka-distributed-data/src/main/resources/reference.conf
@@ -32,14 +32,26 @@ akka.cluster.distributed-data {
use-dispatcher = ""
# How often the Replicator checks for pruning of data associated with
- # removed cluster nodes.
- pruning-interval = 30 s
+ # removed cluster nodes. If this is set to 'off' the pruning feature will
+ # be completely disabled.
+ pruning-interval = 120 s
- # How long time it takes (worst case) to spread the data to all other replica nodes.
+ # How long time it takes to spread the data to all other replica nodes.
# This is used when initiating and completing the pruning process of data associated
# with removed cluster nodes. The time measurement is stopped when any replica is
- # unreachable, so it should be configured to worst case in a healthy cluster.
- max-pruning-dissemination = 60 s
+ # unreachable, but it's still recommended to configure this with certain margin.
+ # It should be in the magnitude of minutes even though typical dissemination time
+ # is shorter (grows logarithmic with number of nodes). There is no advantage of
+ # setting this too low. Setting it to large value will delay the pruning process.
+ max-pruning-dissemination = 300 s
+
+ # The markers of that pruning has been performed for a removed node are kept for this
+ # time and thereafter removed. If and old data entry that was never pruned is somehow
+ # injected and merged with existing data after this time the value will not be correct.
+ # This would be possible (although unlikely) in the case of a long network partition.
+ # It should be in the magnitude of hours. For durable data it is configured by
+ # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
+ pruning-marker-time-to-live = 6 h
# Serialized Write and Read messages are cached when they are sent to
# several nodes. If no further activity they are removed from the cache
@@ -51,6 +63,17 @@ akka.cluster.distributed-data {
# end of a key.
keys = []
+ # The markers of that pruning has been performed for a removed node are kept for this
+ # time and thereafter removed. If and old data entry that was never pruned is
+ # injected and merged with existing data after this time the value will not be correct.
+ # This would be possible if replica with durable data didn't participate in the pruning
+ # (e.g. it was shutdown) and later started after this time. A durable replica should not
+ # be stopped for longer time than this duration and if it is joining again after this
+ # duration its data should first be manually removed (from the lmdb directory).
+ # It should be in the magnitude of days. Note that there is a corresponding setting
+ # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
+ pruning-marker-time-to-live = 10 d
+
# Fully qualified class name of the durable store actor. It must be a subclass
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala
index fd9d6437c5..04f744936e 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala
@@ -19,6 +19,7 @@ import akka.actor.DeadLetterSuppression
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator.ReplicatorMessage
+import akka.cluster.ddata.Replicator.Internal.DataEnvelope
import akka.io.DirectByteBufferPool
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
@@ -52,7 +53,7 @@ object DurableStore {
* should be used to signal success or failure of the operation to the contained
* `replyTo` actor.
*/
- final case class Store(key: String, data: ReplicatedData, reply: Option[StoreReply])
+ final case class Store(key: String, data: DurableDataEnvelope, reply: Option[StoreReply])
final case class StoreReply(successMsg: Any, failureMsg: Any, replyTo: ActorRef)
/**
@@ -65,7 +66,7 @@ object DurableStore {
* will stop itself and the durable store.
*/
case object LoadAll
- final case class LoadData(data: Map[String, ReplicatedData])
+ final case class LoadData(data: Map[String, DurableDataEnvelope])
case object LoadAllCompleted
class LoadFailed(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
@@ -77,7 +78,13 @@ object DurableStore {
* the wrapped `ReplicatedData` including its serializerId and
* manifest.
*/
- final class DurableDataEnvelope(val data: ReplicatedData) extends ReplicatorMessage {
+ final class DurableDataEnvelope private[akka] (
+ private[akka] val dataEnvelope: DataEnvelope) extends ReplicatorMessage {
+
+ def this(data: ReplicatedData) = this(DataEnvelope(data))
+
+ def data: ReplicatedData = dataEnvelope.data
+
override def toString(): String = s"DurableDataEnvelope($data)"
override def hashCode(): Int = data.hashCode
override def equals(o: Any): Boolean = o match {
@@ -136,7 +143,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
}
// pending write behind
- val pending = new java.util.HashMap[String, ReplicatedData]
+ val pending = new java.util.HashMap[String, DurableDataEnvelope]
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
@@ -171,7 +178,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
val valArray = Array.ofDim[Byte](entry.`val`.remaining)
entry.`val`.get(valArray)
val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope]
- key → envelope.data
+ key → envelope
}.toMap)
if (loadData.data.nonEmpty)
sender() ! loadData
@@ -220,10 +227,10 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
writeBehind()
}
- def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: ReplicatedData): Unit = {
+ def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: DurableDataEnvelope): Unit = {
try {
keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
- val value = serializer.toBinary(new DurableDataEnvelope(data))
+ val value = serializer.toBinary(data)
ensureValueBufferSize(value.length)
valueBuffer.put(value).flip()
tx match {
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
index aba050fc73..3d1148f431 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
@@ -102,6 +102,8 @@ final class GCounter private[akka] (
new GCounter(merged)
}
+ override def modifiedByNodes: Set[UniqueAddress] = state.keySet
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
state.contains(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
index 144db4609e..9a2c15a050 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
@@ -143,6 +143,9 @@ final class LWWMap[A, B] private[akka] (
override def merge(that: LWWMap[A, B]): LWWMap[A, B] =
new LWWMap(underlying.merge(that.underlying))
+ override def modifiedByNodes: Set[UniqueAddress] =
+ underlying.modifiedByNodes
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
index 1c756d10ec..309d052ddc 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
@@ -177,6 +177,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
new ORMap(mergedKeys, mergedValues)
}
+ override def modifiedByNodes: Set[UniqueAddress] = {
+ keys.modifiedByNodes union values.foldLeft(Set.empty[UniqueAddress]) {
+ case (acc, (_, data: RemovedNodePruning)) ⇒ acc union data.modifiedByNodes
+ case (acc, _) ⇒ acc
+ }
+ }
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean = {
keys.needPruningFrom(removedNode) || values.exists {
case (_, data: RemovedNodePruning) ⇒ data.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
index ae127aeb54..4bfea5b4f4 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
@@ -204,6 +204,9 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
else
this
+ override def modifiedByNodes: Set[UniqueAddress] =
+ underlying.modifiedByNodes
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala
index d58b83630c..17b37d018e 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala
@@ -307,6 +307,9 @@ final class ORSet[A] private[akka] (
}
}
+ override def modifiedByNodes: Set[UniqueAddress] =
+ vvector.modifiedByNodes
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
vvector.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
index 78e1124922..7135becc3b 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
@@ -94,6 +94,9 @@ final class PNCounter private[akka] (
increments = that.increments.merge(this.increments),
decrements = that.decrements.merge(this.decrements))
+ override def modifiedByNodes: Set[UniqueAddress] =
+ increments.modifiedByNodes union decrements.modifiedByNodes
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
increments.needPruningFrom(removedNode) || decrements.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
index 593e81f000..301800a3df 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
@@ -123,6 +123,9 @@ final class PNCounterMap[A] private[akka] (
override def merge(that: PNCounterMap[A]): PNCounterMap[A] =
new PNCounterMap(underlying.merge(that.underlying))
+ override def modifiedByNodes: Set[UniqueAddress] =
+ underlying.modifiedByNodes
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
index 0c089c2d83..d16bd79c43 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
@@ -11,36 +11,37 @@ import akka.cluster.UniqueAddress
* INTERNAL API
*/
private[akka] object PruningState {
- sealed trait PruningPhase
- final case class PruningInitialized(seen: Set[Address]) extends PruningPhase
- case object PruningPerformed extends PruningPhase
+ final case class PruningInitialized(owner: UniqueAddress, seen: Set[Address]) extends PruningState {
+ override def addSeen(node: Address): PruningState = {
+ if (seen(node) || owner.address == node) this
+ else copy(seen = seen + node)
+ }
+ }
+ final case class PruningPerformed(obsoleteTime: Long) extends PruningState {
+ def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime
+ }
}
/**
* INTERNAL API
*/
-private[akka] final case class PruningState(owner: UniqueAddress, phase: PruningState.PruningPhase) {
+private[akka] sealed trait PruningState {
import PruningState._
def merge(that: PruningState): PruningState =
- (this.phase, that.phase) match {
- // FIXME this will add the PruningPerformed back again when one is None
- case (PruningPerformed, _) ⇒ this
- case (_, PruningPerformed) ⇒ that
- case (PruningInitialized(thisSeen), PruningInitialized(thatSeen)) ⇒
- if (this.owner == that.owner)
- copy(phase = PruningInitialized(thisSeen union thatSeen))
- else if (Member.addressOrdering.compare(this.owner.address, that.owner.address) > 0)
+ (this, that) match {
+ case (p1: PruningPerformed, p2: PruningPerformed) ⇒ if (p1.obsoleteTime >= p2.obsoleteTime) this else that
+ case (_: PruningPerformed, _) ⇒ this
+ case (_, _: PruningPerformed) ⇒ that
+ case (PruningInitialized(thisOwner, thisSeen), PruningInitialized(thatOwner, thatSeen)) ⇒
+ if (thisOwner == thatOwner)
+ PruningInitialized(thisOwner, thisSeen union thatSeen)
+ else if (Member.addressOrdering.compare(thisOwner.address, thatOwner.address) > 0)
that
else
this
}
- def addSeen(node: Address): PruningState = phase match {
- case PruningInitialized(seen) ⇒
- if (seen(node) || owner.address == node) this
- else copy(phase = PruningInitialized(seen + node))
- case _ ⇒ this
- }
+ def addSeen(node: Address): PruningState = this
}
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
index 6c7cb9827a..b93793e0dd 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
@@ -67,9 +67,19 @@ abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends Re
* When a node is removed from the cluster these methods will be
* used by the [[Replicator]] to collapse data from the removed node
* into some other node in the cluster.
+ *
+ * See process description in the 'CRDT Garbage' section of the [[Replicator]]
+ * documentation.
*/
trait RemovedNodePruning extends ReplicatedData {
+ /**
+ * The nodes that have changed the state for this data
+ * and would need pruning when such node is no longer part
+ * of the cluster.
+ */
+ def modifiedByNodes: Set[UniqueAddress]
+
/**
* Does it have any state changes from a specific node,
* which has been removed from the cluster.
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
index 405d517689..86827413b9 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
@@ -43,6 +43,7 @@ import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
+import akka.util.Helpers.toRootLowerCase
object ReplicatorSettings {
@@ -63,6 +64,11 @@ object ReplicatorSettings {
case id ⇒ id
}
+ val pruningInterval = toRootLowerCase(config.getString("pruning-interval")) match {
+ case "off" | "false" ⇒ Duration.Zero
+ case _ ⇒ config.getDuration("pruning-interval", MILLISECONDS).millis
+ }
+
import scala.collection.JavaConverters._
new ReplicatorSettings(
role = roleOption(config.getString("role")),
@@ -70,10 +76,12 @@ object ReplicatorSettings {
notifySubscribersInterval = config.getDuration("notify-subscribers-interval", MILLISECONDS).millis,
maxDeltaElements = config.getInt("max-delta-elements"),
dispatcher = dispatcher,
- pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
+ pruningInterval = pruningInterval,
maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis,
durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
- durableKeys = config.getStringList("durable.keys").asScala.toSet)
+ durableKeys = config.getStringList("durable.keys").asScala.toSet,
+ pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis,
+ durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis)
}
/**
@@ -110,21 +118,30 @@ object ReplicatorSettings {
* in the `Set`.
*/
final class ReplicatorSettings(
- val role: Option[String],
- val gossipInterval: FiniteDuration,
- val notifySubscribersInterval: FiniteDuration,
- val maxDeltaElements: Int,
- val dispatcher: String,
- val pruningInterval: FiniteDuration,
- val maxPruningDissemination: FiniteDuration,
- val durableStoreProps: Either[(String, Config), Props],
- val durableKeys: Set[String]) {
+ val role: Option[String],
+ val gossipInterval: FiniteDuration,
+ val notifySubscribersInterval: FiniteDuration,
+ val maxDeltaElements: Int,
+ val dispatcher: String,
+ val pruningInterval: FiniteDuration,
+ val maxPruningDissemination: FiniteDuration,
+ val durableStoreProps: Either[(String, Config), Props],
+ val durableKeys: Set[String],
+ val pruningMarkerTimeToLive: FiniteDuration,
+ val durablePruningMarkerTimeToLive: FiniteDuration) {
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
- maxPruningDissemination, Right(Props.empty), Set.empty)
+ maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days)
+
+ // For backwards compatibility
+ def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
+ maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
+ durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
+ this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
+ maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@@ -150,6 +167,13 @@ final class ReplicatorSettings(
def withPruning(pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration): ReplicatorSettings =
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
+ def withPruningMarkerTimeToLive(
+ pruningMarkerTimeToLive: FiniteDuration,
+ durablePruningMarkerTimeToLive: FiniteDuration): ReplicatorSettings =
+ copy(
+ pruningMarkerTimeToLive = pruningMarkerTimeToLive,
+ durablePruningMarkerTimeToLive = durablePruningMarkerTimeToLive)
+
def withDurableStoreProps(durableStoreProps: Props): ReplicatorSettings =
copy(durableStoreProps = Right(durableStoreProps))
@@ -168,17 +192,20 @@ final class ReplicatorSettings(
}
private def copy(
- role: Option[String] = role,
- gossipInterval: FiniteDuration = gossipInterval,
- notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
- maxDeltaElements: Int = maxDeltaElements,
- dispatcher: String = dispatcher,
- pruningInterval: FiniteDuration = pruningInterval,
- maxPruningDissemination: FiniteDuration = maxPruningDissemination,
- durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
- durableKeys: Set[String] = durableKeys): ReplicatorSettings =
+ role: Option[String] = role,
+ gossipInterval: FiniteDuration = gossipInterval,
+ notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
+ maxDeltaElements: Int = maxDeltaElements,
+ dispatcher: String = dispatcher,
+ pruningInterval: FiniteDuration = pruningInterval,
+ maxPruningDissemination: FiniteDuration = maxPruningDissemination,
+ durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
+ durableKeys: Set[String] = durableKeys,
+ pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
+ durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
- pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys)
+ pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
+ pruningMarkerTimeToLive, durablePruningMarkerTimeToLive)
}
object Replicator {
@@ -562,6 +589,9 @@ object Replicator {
val LazyDigest: Digest = ByteString(0)
val NotFoundDigest: Digest = ByteString(-1)
+ /**
+ * The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
+ */
final case class DataEnvelope(
data: ReplicatedData,
pruning: Map[UniqueAddress, PruningState] = Map.empty)
@@ -576,36 +606,51 @@ object Replicator {
}
def initRemovedNodePruning(removed: UniqueAddress, owner: UniqueAddress): DataEnvelope = {
- copy(pruning = pruning.updated(removed, PruningState(owner, PruningInitialized(Set.empty))))
+ copy(pruning = pruning.updated(removed, PruningInitialized(owner, Set.empty)))
}
- def prune(from: UniqueAddress): DataEnvelope = {
+ def prune(from: UniqueAddress, pruningPerformed: PruningPerformed): DataEnvelope = {
data match {
case dataWithRemovedNodePruning: RemovedNodePruning ⇒
require(pruning.contains(from))
- val to = pruning(from).owner
- val prunedData = dataWithRemovedNodePruning.prune(from, to)
- copy(data = prunedData, pruning = pruning.updated(from, PruningState(to, PruningPerformed)))
+ pruning(from) match {
+ case PruningInitialized(owner, _) ⇒
+ val prunedData = dataWithRemovedNodePruning.prune(from, owner)
+ copy(data = prunedData, pruning = pruning.updated(from, pruningPerformed))
+ case _ ⇒
+ this
+ }
+
case _ ⇒ this
}
-
}
def merge(other: DataEnvelope): DataEnvelope =
if (other.data == DeletedData) DeletedEnvelope
else {
- var mergedRemovedNodePruning = other.pruning
- for ((key, thisValue) ← pruning) {
- mergedRemovedNodePruning.get(key) match {
- case None ⇒
- mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
- case Some(thatValue) ⇒
- mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue merge thatValue)
+ val mergedPruning =
+ pruning.foldLeft(other.pruning) {
+ case (acc, (key, thisValue)) ⇒
+ acc.get(key) match {
+ case None ⇒
+ acc.updated(key, thisValue)
+ case Some(thatValue) ⇒
+ acc.updated(key, thisValue merge thatValue)
+ }
+ }
+ val filteredMergedPruning = {
+ if (mergedPruning.isEmpty) mergedPruning
+ else {
+ val currentTime = System.currentTimeMillis()
+ mergedPruning.filter {
+ case (_, p: PruningPerformed) ⇒ !p.isObsolete(currentTime)
+ case _ ⇒ true
+ }
}
}
// cleanup both sides before merging, `merge((otherData: ReplicatedData)` will cleanup other.data
- copy(data = cleaned(data, mergedRemovedNodePruning), pruning = mergedRemovedNodePruning).merge(other.data)
+ copy(data = cleaned(data, filteredMergedPruning), pruning = filteredMergedPruning).merge(other.data)
}
def merge(otherData: ReplicatedData): DataEnvelope =
@@ -613,7 +658,7 @@ object Replicator {
else copy(data = data merge cleaned(otherData, pruning).asInstanceOf[data.T])
private def cleaned(c: ReplicatedData, p: Map[UniqueAddress, PruningState]): ReplicatedData = p.foldLeft(c) {
- case (c: RemovedNodePruning, (removed, PruningState(_, PruningPerformed))) ⇒
+ case (c: RemovedNodePruning, (removed, _: PruningPerformed)) ⇒
if (c.needPruningFrom(removed)) c.pruningCleanup(removed) else c
case (c, _) ⇒ c
}
@@ -801,7 +846,8 @@ object Replicator {
* When a node is removed from the cluster it is first important that all updates that were
* done by that node are disseminated to all other nodes. The pruning will not start before the
* `maxPruningDissemination` duration has elapsed. The time measurement is stopped when any
- * replica is unreachable, so it should be configured to worst case in a healthy cluster.
+ * replica is unreachable, but it's still recommended to configure this with certain margin.
+ * It should be in the magnitude of minutes.
* The nodes are ordered by their address and the node ordered first is called leader.
* The leader initiates the pruning by adding a `PruningInitialized` marker in the data envelope.
* This is gossiped to all other nodes and they mark it as seen when they receive it.
@@ -840,7 +886,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import context.dispatcher
val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick)
val notifyTask = context.system.scheduler.schedule(notifySubscribersInterval, notifySubscribersInterval, self, FlushChanges)
- val pruningTask = context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick)
+ val pruningTask =
+ if (pruningInterval >= Duration.Zero)
+ Some(context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick))
+ else None
val clockTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, ClockTick)
val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
@@ -867,11 +916,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// cluster weaklyUp nodes, doesn't contain selfAddress
var weaklyUpNodes: Set[Address] = Set.empty
- // nodes removed from cluster, to be pruned, and tombstoned
var removedNodes: Map[UniqueAddress, Long] = Map.empty
- var pruningPerformed: Map[UniqueAddress, Long] = Map.empty
- var tombstoneNodes: Set[UniqueAddress] = Set.empty
-
var leader: Option[Address] = None
def isLeader: Boolean = leader.exists(_ == selfAddress)
@@ -921,7 +966,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
cluster.unsubscribe(self)
gossipTask.cancel()
notifyTask.cancel()
- pruningTask.cancel()
+ pruningTask.foreach(_.cancel())
clockTask.cancel()
}
@@ -964,11 +1009,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
count += data.size
data.foreach {
case (key, d) ⇒
- val envelope = DataEnvelope(d)
- write(key, envelope) match {
+ write(key, d.dataEnvelope) match {
case Some(newEnvelope) ⇒
- if (newEnvelope.data ne envelope.data)
- durableStore ! Store(key, newEnvelope.data, None)
+ if (newEnvelope.data ne d.dataEnvelope.data)
+ durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None ⇒
}
}
@@ -1059,18 +1103,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
localValue match {
case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _)) ⇒
- existing.merge(modify(Some(existing)).asInstanceOf[existing.T])
- case None ⇒ modify(None)
+ envelope.merge(modify(Some(existing)).asInstanceOf[existing.T])
+ case None ⇒ DataEnvelope(modify(None))
}
} match {
- case Success(newData) ⇒
- log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, newData)
- val envelope = DataEnvelope(pruningCleanupTombstoned(newData))
+ case Success(envelope) ⇒
+ log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, envelope.data)
setData(key.id, envelope)
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
- durableStore ! Store(key.id, envelope.data,
+ durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! UpdateSuccess(key, req)
@@ -1079,7 +1122,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
- durableStore ! Store(key.id, envelope.data,
+ durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
@@ -1106,7 +1149,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
write(key, envelope) match {
case Some(newEnvelope) ⇒
if (isDurable(key))
- durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, replyTo)))
+ durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo)))
else
replyTo ! WriteAck
case None ⇒
@@ -1115,10 +1158,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] =
getData(key) match {
- case Some(DataEnvelope(DeletedData, _)) ⇒ Some(writeEnvelope) // already deleted
+ case Some(DataEnvelope(DeletedData, _)) ⇒ Some(DeletedEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _)) ⇒
if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) {
- val merged = envelope.merge(pruningCleanupTombstoned(writeEnvelope)).addSeen(selfAddress)
+ val merged = envelope.merge(writeEnvelope).addSeen(selfAddress)
setData(key, merged)
Some(merged)
} else {
@@ -1128,16 +1171,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
None
}
case None ⇒
- val cleaned = pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress)
- setData(key, cleaned)
- Some(cleaned)
+ val writeEnvelope2 = writeEnvelope.addSeen(selfAddress)
+ setData(key, writeEnvelope2)
+ Some(writeEnvelope2)
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope) ⇒
if (isDurable(key))
- durableStore ! Store(key, newEnvelope.data, None)
+ durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None ⇒
}
replyTo ! ReadRepairAck
@@ -1160,7 +1203,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
if (durable)
- durableStore ! Store(key.id, DeletedData,
+ durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! DeleteSuccess(key, req)
@@ -1169,7 +1212,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
- durableStore ! Store(key.id, DeletedData,
+ durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
@@ -1313,7 +1356,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
write(key, envelope) match {
case Some(newEnvelope) ⇒
if (isDurable(key))
- durableStore ! Store(key, newEnvelope.data, None)
+ durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None ⇒
}
if (sendBack) getData(key) match {
@@ -1380,6 +1423,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else if (matchingRole(m)) {
nodes -= m.address
weaklyUpNodes -= m.address
+ log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
}
@@ -1402,12 +1446,31 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveRemovedNodePruningTick(): Unit = {
- if (isLeader && removedNodes.nonEmpty) {
- initRemovedNodePruning()
+ // See 'CRDT Garbage' section in Replicator Scaladoc for description of the process
+ if (unreachable.isEmpty) {
+ if (isLeader) {
+ collectRemovedNodes()
+ initRemovedNodePruning()
+ }
+ performRemovedNodePruning()
+ deleteObsoletePruningPerformed()
+ }
+ }
+
+ def collectRemovedNodes(): Unit = {
+ val knownNodes = nodes union weaklyUpNodes union removedNodes.keySet.map(_.address)
+ val newRemovedNodes =
+ dataEntries.foldLeft(Set.empty[UniqueAddress]) {
+ case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _))) ⇒
+ acc union data.modifiedByNodes.filterNot(n ⇒ n == selfUniqueAddress || knownNodes(n.address))
+ case (acc, _) ⇒
+ acc
+ }
+
+ newRemovedNodes.foreach { n ⇒
+ log.debug("Adding removed node [{}] from data", n)
+ removedNodes = removedNodes.updated(n, allReachableClockTime)
}
- performRemovedNodePruning()
- // FIXME tombstoneRemovedNodePruning doesn't work, since merge of PruningState will add the PruningPerformed back again
- // tombstoneRemovedNodePruning()
}
def initRemovedNodePruning(): Unit = {
@@ -1417,22 +1480,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}(collection.breakOut)
if (removedSet.nonEmpty) {
- // FIXME handle pruning of durable data, this is difficult and requires more thought
for ((key, (envelope, _)) ← dataEntries; removed ← removedSet) {
def init(): Unit = {
val newEnvelope = envelope.initRemovedNodePruning(removed, selfUniqueAddress)
- log.debug("Initiated pruning of [{}] for data key [{}]", removed, key)
+ log.debug("Initiated pruning of [{}] for data key [{}] to [{}]", removed, key, selfUniqueAddress)
setData(key, newEnvelope)
}
if (envelope.needPruningFrom(removed)) {
envelope.data match {
case dataWithRemovedNodePruning: RemovedNodePruning ⇒
-
envelope.pruning.get(removed) match {
case None ⇒ init()
- case Some(PruningState(owner, PruningInitialized(_))) if owner != selfUniqueAddress ⇒ init()
+ case Some(PruningInitialized(owner, _)) if owner != selfUniqueAddress ⇒ init()
case _ ⇒ // already in progress
}
case _ ⇒
@@ -1444,78 +1505,44 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def performRemovedNodePruning(): Unit = {
// perform pruning when all seen Init
+ val allNodes = nodes union weaklyUpNodes
+ val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis)
+ val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _)) ⇒
pruning.foreach {
- case (removed, PruningState(owner, PruningInitialized(seen))) if owner == selfUniqueAddress
- && (nodes.isEmpty || nodes.forall(seen)) ⇒
- val newEnvelope = envelope.prune(removed)
- pruningPerformed = pruningPerformed.updated(removed, allReachableClockTime)
+ case (removed, PruningInitialized(owner, seen)) if owner == selfUniqueAddress
+ && (allNodes.isEmpty || allNodes.forall(seen)) ⇒
+ val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
if ((newEnvelope.data ne data) && isDurable(key))
- durableStore ! Store(key, newEnvelope.data, None)
+ durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case _ ⇒
}
case _ ⇒ // deleted, or pruning not needed
}
}
- def tombstoneRemovedNodePruning(): Unit = {
-
- def allPruningPerformed(removed: UniqueAddress): Boolean = {
- dataEntries forall {
- case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _)) ⇒
- pruning.get(removed) match {
- case Some(PruningState(_, PruningInitialized(_))) ⇒ false
- case _ ⇒ true
- }
- case _ ⇒ true // deleted, or pruning not needed
- }
- }
-
- // FIXME pruningPerformed is only updated on one node, but tombstoneNodes should be on all
- pruningPerformed.foreach {
- case (removed, timestamp) if ((allReachableClockTime - timestamp) > maxPruningDisseminationNanos) &&
- allPruningPerformed(removed) ⇒
- log.debug("All pruning performed for [{}], tombstoned", removed)
- pruningPerformed -= removed
- removedNodes -= removed
- tombstoneNodes += removed
- dataEntries.foreach {
- case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _)) ⇒
- val newEnvelope = pruningCleanupTombstoned(removed, envelope)
- setData(key, newEnvelope)
- if ((newEnvelope.data ne data) && isDurable(key))
- durableStore ! Store(key, newEnvelope.data, None)
- case _ ⇒ // deleted, or pruning not needed
+ def deleteObsoletePruningPerformed(): Unit = {
+ val currentTime = System.currentTimeMillis()
+ dataEntries.foreach {
+ case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning), _)) ⇒
+ val newEnvelope = pruning.foldLeft(envelope) {
+ case (acc, (removed, p: PruningPerformed)) if p.isObsolete(currentTime) ⇒
+ log.debug("Removing obsolete pruning marker for [{}] in [{}]", removed, key)
+ removedNodes -= removed
+ acc.copy(pruning = acc.pruning - removed)
+ case (acc, _) ⇒ acc
}
- case (removed, timestamp) ⇒ // not ready
+ if (newEnvelope ne envelope)
+ setData(key, newEnvelope)
+
+ case _ ⇒ // deleted, or pruning not needed
}
+
}
- def pruningCleanupTombstoned(envelope: DataEnvelope): DataEnvelope =
- tombstoneNodes.foldLeft(envelope)((c, removed) ⇒ pruningCleanupTombstoned(removed, c))
-
- def pruningCleanupTombstoned(removed: UniqueAddress, envelope: DataEnvelope): DataEnvelope = {
- val pruningCleanuped = pruningCleanupTombstoned(removed, envelope.data)
- if ((pruningCleanuped ne envelope.data) || envelope.pruning.contains(removed))
- envelope.copy(data = pruningCleanuped, pruning = envelope.pruning - removed)
- else
- envelope
- }
-
- def pruningCleanupTombstoned(data: ReplicatedData): ReplicatedData =
- if (tombstoneNodes.isEmpty) data
- else tombstoneNodes.foldLeft(data)((c, removed) ⇒ pruningCleanupTombstoned(removed, c))
-
- def pruningCleanupTombstoned(removed: UniqueAddress, data: ReplicatedData): ReplicatedData =
- data match {
- case dataWithRemovedNodePruning: RemovedNodePruning ⇒
- if (dataWithRemovedNodePruning.needPruningFrom(removed)) dataWithRemovedNodePruning.pruningCleanup(removed) else data
- case _ ⇒ data
- }
-
def receiveGetReplicaCount(): Unit = {
// selfAddress is not included in the set
replyTo ! ReplicaCount(nodes.size + 1)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala
index 8cb16d974e..e7b126047a 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala
@@ -292,6 +292,9 @@ final case class OneVersionVector private[akka] (node: UniqueAddress, version: L
}
}
+ override def modifiedByNodes: Set[UniqueAddress] =
+ Set(node)
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
node == removedNode
@@ -353,6 +356,9 @@ final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) exten
}
}
+ override def modifiedByNodes: Set[UniqueAddress] =
+ versions.keySet
+
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
versions.contains(removedNode)
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
index 59fd93c8cb..8ba9dabede 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
@@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import java.io.NotSerializableException
+import akka.actor.Address
/**
* INTERNAL API
@@ -155,6 +156,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
}(system.dispatcher)
private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray
+ private val dummyAddress = UniqueAddress(Address("a", "b", "c", 2552), 1L)
val GetManifest = "A"
val GetSuccessManifest = "B"
@@ -396,14 +398,17 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
dataEnvelope.pruning.foreach {
case (removedAddress, state) ⇒
val b = dm.DataEnvelope.PruningEntry.newBuilder().
- setRemovedAddress(uniqueAddressToProto(removedAddress)).
- setOwnerAddress(uniqueAddressToProto(state.owner))
- state.phase match {
- case PruningState.PruningInitialized(seen) ⇒
+ setRemovedAddress(uniqueAddressToProto(removedAddress))
+ state match {
+ case PruningState.PruningInitialized(owner, seen) ⇒
seen.toVector.sorted(Member.addressOrdering).map(addressToProto).foreach { a ⇒ b.addSeen(a) }
+ b.setOwnerAddress(uniqueAddressToProto(owner))
b.setPerformed(false)
- case PruningState.PruningPerformed ⇒
- b.setPerformed(true)
+ case PruningState.PruningPerformed(obsoleteTime) ⇒
+ b.setPerformed(true).setObsoleteTime(obsoleteTime)
+ // TODO ownerAddress is only needed for PruningInitialized, but kept here for
+ // wire backwards compatibility with 2.4.16 (required field)
+ b.setOwnerAddress(uniqueAddressToProto(dummyAddress))
}
dataEnvelopeBuilder.addPruning(b)
}
@@ -414,17 +419,28 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
dataEnvelopeFromProto(dm.DataEnvelope.parseFrom(bytes))
private def dataEnvelopeFromProto(dataEnvelope: dm.DataEnvelope): DataEnvelope = {
- val pruning: Map[UniqueAddress, PruningState] =
- dataEnvelope.getPruningList.asScala.map { pruningEntry ⇒
- val phase =
- if (pruningEntry.getPerformed) PruningState.PruningPerformed
- else PruningState.PruningInitialized(pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut))
- val state = PruningState(uniqueAddressFromProto(pruningEntry.getOwnerAddress), phase)
+ val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData]
+ val pruning = pruningFromProto(dataEnvelope.getPruningList)
+ DataEnvelope(data, pruning)
+ }
+
+ private def pruningFromProto(pruningEntries: java.util.List[dm.DataEnvelope.PruningEntry]): Map[UniqueAddress, PruningState] = {
+ if (pruningEntries.isEmpty)
+ Map.empty
+ else
+ pruningEntries.asScala.map { pruningEntry ⇒
+ val state =
+ if (pruningEntry.getPerformed) {
+ // for wire compatibility with Akka 2.4.x
+ val obsoleteTime = if (pruningEntry.hasObsoleteTime) pruningEntry.getObsoleteTime else Long.MaxValue
+ PruningState.PruningPerformed(obsoleteTime)
+ } else
+ PruningState.PruningInitialized(
+ uniqueAddressFromProto(pruningEntry.getOwnerAddress),
+ pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut))
val removed = uniqueAddressFromProto(pruningEntry.getRemovedAddress)
removed → state
}(breakOut)
- val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData]
- DataEnvelope(data, pruning)
}
private def writeToProto(write: Write): dm.Write =
@@ -472,6 +488,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
private def durableDataEnvelopeFromProto(durableDataEnvelope: dm.DurableDataEnvelope): DurableDataEnvelope = {
val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData]
+ val pruning = pruningFromProto(durableDataEnvelope.getPruningList)
+
new DurableDataEnvelope(data)
}
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala
index bfb8fb16f8..b4cf743c5f 100644
--- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala
@@ -23,7 +23,7 @@ final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeCo
val second = role("second")
commonConfig(ConfigFactory.parseString(s"""
- akka.loglevel = DEBUG
+ akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = ["durable*"]
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala
index 9535289015..553f0d727f 100644
--- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala
@@ -126,18 +126,19 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
}
within(15.seconds) {
+ var values = Set.empty[Int]
awaitAssert {
replicator ! Get(KeyA, ReadLocal)
val counter3 = expectMsgType[GetSuccess[GCounter]].dataValue
- counter3.value should be(10)
+ val value = counter3.value.intValue
+ values += value
+ value should be(10)
counter3.state.size should be(3)
}
+ values should ===(Set(10))
}
enterBarrier("pruned")
- // let it become tombstone
- Thread.sleep(5000)
-
runOn(first) {
val addr = cluster2.selfAddress
val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
@@ -150,15 +151,31 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
Cluster(sys3).join(node(first).address)
within(10.seconds) {
+ var values = Set.empty[Int]
awaitAssert {
replicator3.tell(Get(KeyA, ReadLocal), probe3.ref)
val counter4 = probe3.expectMsgType[GetSuccess[GCounter]].dataValue
- counter4.value should be(10)
+ val value = counter4.value.intValue
+ values += value
+ value should be(10)
counter4.state.size should be(3)
}
+ values should ===(Set(10))
}
+
+ // after merging with others
+ replicator3 ! Get(KeyA, ReadAll(remainingOrDefault))
+ val counter5 = expectMsgType[GetSuccess[GCounter]].dataValue
+ counter5.value should be(10)
+ counter5.state.size should be(3)
}
+ enterBarrier("sys3-started")
+ replicator ! Get(KeyA, ReadAll(remainingOrDefault))
+ val counter6 = expectMsgType[GetSuccess[GCounter]].dataValue
+ counter6.value should be(10)
+ counter6.state.size should be(3)
+
enterBarrier("after-1")
}
}
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala
index 6ac4dde83d..33154a59b0 100644
--- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala
@@ -122,14 +122,18 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
runOn(first, second) {
within(15.seconds) {
+ var values = Set.empty[Int]
awaitAssert {
replicator ! Get(KeyA, ReadLocal)
expectMsgPF() {
case g @ GetSuccess(KeyA, _) ⇒
- g.get(KeyA).value should be(9)
+ val value = g.get(KeyA).value.toInt
+ values += value
+ value should be(9)
g.get(KeyA).needPruningFrom(thirdUniqueAddress) should be(false)
}
}
+ values should ===(Set(9))
}
within(5.seconds) {
awaitAssert {
@@ -154,10 +158,12 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
}
enterBarrier("pruning-done")
- // on one of the nodes the data has been updated by the pruning,
- // client can update anyway
+ // after pruning performed we should not be able to update with data from removed node
def updateAfterPruning(expectedValue: Int): Unit = {
- replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None)(_ + 1)
+ replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None) { existing ⇒
+ // inject data from removed node to simulate bad data
+ existing.merge(oldCounter) + 1
+ }
expectMsgPF() {
case UpdateSuccess(KeyA, _) ⇒
replicator ! Get(KeyA, ReadLocal)
@@ -165,6 +171,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
retrieved.value should be(expectedValue)
}
}
+
runOn(first) {
updateAfterPruning(expectedValue = 10)
}
@@ -175,19 +182,19 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
}
enterBarrier("update-second-after-pruning")
- // after pruning performed and maxDissemination it is tombstoned
- // and we should still not be able to update with data from removed node
+ // after full replication should still not be able to update with data from removed node
+ // but it would not work after removal of the PruningPerformed markers
expectNoMsg(maxPruningDissemination + 3.seconds)
runOn(first) {
updateAfterPruning(expectedValue = 12)
}
- enterBarrier("update-first-after-tombstone")
+ enterBarrier("update-first-after-dissemination")
runOn(second) {
updateAfterPruning(expectedValue = 13)
}
- enterBarrier("update-second-after-tombstone")
+ enterBarrier("update-second-after-dissemination")
enterBarrier("after-1")
}
diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java
index dc40be084f..e7762f85a3 100644
--- a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java
+++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java
@@ -13,6 +13,11 @@ public class JavaImplOfReplicatedData extends AbstractReplicatedData modifiedByNodes() {
+ return akka.japi.Util.immutableSeq(new java.util.ArrayList()).toSet();
+ }
+
@Override
public boolean needPruningFrom(UniqueAddress removedNode) {
return false;
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala
new file mode 100644
index 0000000000..7f1ef32452
--- /dev/null
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2009-2017 Lightbend Inc.
+ */
+
+package akka.cluster.ddata
+
+import akka.actor.Address
+import akka.cluster.UniqueAddress
+import org.scalatest.Matchers
+import org.scalatest.WordSpec
+import akka.cluster.ddata.Replicator.Internal.DataEnvelope
+
+class DataEnvelopeSpec extends WordSpec with Matchers {
+ import PruningState._
+
+ val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
+ val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
+ val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)
+ val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L)
+ val obsoleteTimeInFuture = System.currentTimeMillis() + 3600 * 1000
+ val oldObsoleteTime = System.currentTimeMillis() - 3600 * 1000
+
+ "DataEnvelope" must {
+
+ "handle pruning transitions" in {
+ val g1 = GCounter.empty.increment(node1, 1)
+ val d1 = DataEnvelope(g1)
+
+ val d2 = d1.initRemovedNodePruning(node1, node2)
+ d2.pruning(node1).isInstanceOf[PruningInitialized] should ===(true)
+ d2.pruning(node1).asInstanceOf[PruningInitialized].owner should ===(node2)
+
+ val d3 = d2.addSeen(node3.address)
+ d3.pruning(node1).asInstanceOf[PruningInitialized].seen should ===(Set(node3.address))
+
+ val d4 = d3.prune(node1, PruningPerformed(obsoleteTimeInFuture))
+ d4.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
+ }
+
+ "merge correctly" in {
+ val g1 = GCounter.empty.increment(node1, 1)
+ val d1 = DataEnvelope(g1)
+ val g2 = GCounter.empty.increment(node2, 2)
+ val d2 = DataEnvelope(g2)
+
+ val d3 = d1.merge(d2)
+ d3.data.asInstanceOf[GCounter].value should ===(3)
+ d3.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node1, node2))
+ val d4 = d3.initRemovedNodePruning(node1, node2)
+ val d5 = d4.prune(node1, PruningPerformed(obsoleteTimeInFuture))
+ d5.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
+
+ // late update from node1
+ val g11 = g1.increment(node1, 10)
+ val d6 = d5.merge(DataEnvelope(g11))
+ d6.data.asInstanceOf[GCounter].value should ===(3)
+ d6.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
+
+ // remove obsolete
+ val d7 = d5.copy(pruning = d5.pruning.updated(node1, PruningPerformed(oldObsoleteTime)))
+ val d8 = d5.copy(pruning = Map.empty)
+ d8.merge(d7).pruning should ===(Map.empty)
+ d7.merge(d8).pruning should ===(Map.empty)
+
+ d5.merge(d7).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture))
+ d7.merge(d5).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture))
+ }
+
+ }
+}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
index 60571f1542..46a54f3661 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
@@ -139,18 +139,22 @@ class GCounterSpec extends WordSpec with Matchers {
val c1 = GCounter()
val c2 = c1 increment node1
val c3 = c2 increment node2
+ c2.modifiedByNodes should ===(Set(node1))
c2.needPruningFrom(node1) should be(true)
c2.needPruningFrom(node2) should be(false)
+ c3.modifiedByNodes should ===(Set(node1, node2))
c3.needPruningFrom(node1) should be(true)
c3.needPruningFrom(node2) should be(true)
c3.value should be(2)
val c4 = c3.prune(node1, node2)
+ c4.modifiedByNodes should ===(Set(node2))
c4.needPruningFrom(node2) should be(true)
c4.needPruningFrom(node1) should be(false)
c4.value should be(2)
val c5 = (c4 increment node1).pruningCleanup(node1)
+ c5.modifiedByNodes should ===(Set(node2))
c5.needPruningFrom(node1) should be(false)
c4.value should be(2)
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
index 211c43c093..4d56d7dee6 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
@@ -143,16 +143,20 @@ class PNCounterSpec extends WordSpec with Matchers {
val c1 = PNCounter()
val c2 = c1 increment node1
val c3 = c2 decrement node2
+ c2.modifiedByNodes should ===(Set(node1))
c2.needPruningFrom(node1) should be(true)
c2.needPruningFrom(node2) should be(false)
+ c3.modifiedByNodes should ===(Set(node1, node2))
c3.needPruningFrom(node1) should be(true)
c3.needPruningFrom(node2) should be(true)
val c4 = c3.prune(node1, node2)
+ c4.modifiedByNodes should ===(Set(node2))
c4.needPruningFrom(node2) should be(true)
c4.needPruningFrom(node1) should be(false)
val c5 = (c4 increment node1).pruningCleanup(node1)
+ c5.modifiedByNodes should ===(Set(node2))
c5.needPruningFrom(node1) should be(false)
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala
index 709cea1880..af37267baf 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala
@@ -12,32 +12,37 @@ import org.scalatest.WordSpec
class PruningStateSpec extends WordSpec with Matchers {
import PruningState._
- val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
- val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2)
- val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3)
- val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4)
+ val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
+ val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
+ val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)
+ val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L)
"Pruning state" must {
- "merge phase correctly" in {
- val p1 = PruningState(node1, PruningInitialized(Set.empty))
- val p2 = PruningState(node1, PruningPerformed)
- p1.merge(p2).phase should be(PruningPerformed)
- p2.merge(p1).phase should be(PruningPerformed)
+ "merge state correctly" in {
+ val p1 = PruningInitialized(node1, Set.empty)
+ val p2 = PruningPerformed(System.currentTimeMillis() + 3600 * 1000)
+ p1.merge(p2) should be(p2)
+ p2.merge(p1) should be(p2)
+
+ val p3 = p2.copy(p2.obsoleteTime - 1)
+ p2.merge(p3) should be(p2) // keep greatest obsoleteTime
+ p3.merge(p2) should be(p2)
+
}
"merge owner correctly" in {
- val p1 = PruningState(node1, PruningInitialized(Set.empty))
- val p2 = PruningState(node2, PruningInitialized(Set.empty))
- val expected = PruningState(node1, PruningInitialized(Set.empty))
+ val p1 = PruningInitialized(node1, Set.empty)
+ val p2 = PruningInitialized(node2, Set.empty)
+ val expected = PruningInitialized(node1, Set.empty)
p1.merge(p2) should be(expected)
p2.merge(p1) should be(expected)
}
"merge seen correctly" in {
- val p1 = PruningState(node1, PruningInitialized(Set(node2.address)))
- val p2 = PruningState(node1, PruningInitialized(Set(node4.address)))
- val expected = PruningState(node1, PruningInitialized(Set(node2.address, node4.address)))
+ val p1 = PruningInitialized(node1, Set(node2.address))
+ val p2 = PruningInitialized(node1, Set(node4.address))
+ val expected = PruningInitialized(node1, Set(node2.address, node4.address))
p1.merge(p2) should be(expected)
p2.merge(p1) should be(expected)
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
index b47e9a5332..46bdf27e7b 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
@@ -70,8 +70,8 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Changed(keyA)(data1))
checkSerialization(DataEnvelope(data1))
checkSerialization(DataEnvelope(data1, pruning = Map(
- address1 → PruningState(address2, PruningPerformed),
- address3 → PruningState(address2, PruningInitialized(Set(address1.address))))))
+ address1 → PruningPerformed(System.currentTimeMillis()),
+ address3 → PruningInitialized(address2, Set(address1.address)))))
checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(WriteAck)
checkSerialization(WriteNack)
@@ -85,6 +85,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"A" → DataEnvelope(data1),
"B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true))
checkSerialization(new DurableDataEnvelope(data1))
+ checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map(
+ address1 → PruningPerformed(System.currentTimeMillis()),
+ address3 → PruningInitialized(address2, Set(address1.address))))))
}
}
diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst
index 320a996616..777c0605cc 100644
--- a/akka-docs/rst/java/distributed-data.rst
+++ b/akka-docs/rst/java/distributed-data.rst
@@ -511,6 +511,18 @@ Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Up
durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
+There is one important caveat when it comes pruning of :ref:`crdt_garbage_java` for durable data.
+If and old data entry that was never pruned is injected and merged with existing data after
+that the pruning markers have been removed the value will not be correct. The time-to-live
+of the markers is defined by configuration
+``akka.cluster.distributed-data.durable.remove-pruning-marker-after`` and is in the magnitude of days.
+This would be possible if a node with durable data didn't participate in the pruning
+(e.g. it was shutdown) and later started after this time. A node with durable data should not
+be stopped for longer time than this duration and if it is joining again after this
+duration its data should first be manually removed (from the lmdb directory).
+
+.. _crdt_garbage_java:
+
CRDT Garbage
------------
@@ -519,7 +531,8 @@ For example a ``GCounter`` keeps track of one counter per node. If a ``GCounter`
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the ``Replicator`` performs pruning of data associated with nodes that have been removed from the
-cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait.
+cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. See the
+API documentation of the ``Replicator`` for details.
Samples
=======
diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst
index 366ada5788..4033e17e51 100644
--- a/akka-docs/rst/scala/distributed-data.rst
+++ b/akka-docs/rst/scala/distributed-data.rst
@@ -523,6 +523,18 @@ Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Up
durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
+There is one important caveat when it comes pruning of :ref:`crdt_garbage_scala` for durable data.
+If and old data entry that was never pruned is injected and merged with existing data after
+that the pruning markers have been removed the value will not be correct. The time-to-live
+of the markers is defined by configuration
+``akka.cluster.distributed-data.durable.remove-pruning-marker-after`` and is in the magnitude of days.
+This would be possible if a node with durable data didn't participate in the pruning
+(e.g. it was shutdown) and later started after this time. A node with durable data should not
+be stopped for longer time than this duration and if it is joining again after this
+duration its data should first be manually removed (from the lmdb directory).
+
+.. _crdt_garbage_scala:
+
CRDT Garbage
------------
@@ -531,7 +543,8 @@ For example a ``GCounter`` keeps track of one counter per node. If a ``GCounter`
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the ``Replicator`` performs pruning of data associated with nodes that have been removed from the
-cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait.
+cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. See the
+API documentation of the ``Replicator`` for details.
Samples
=======
diff --git a/project/MiMa.scala b/project/MiMa.scala
index 23bf5d8847..83adab3687 100644
--- a/project/MiMa.scala
+++ b/project/MiMa.scala
@@ -162,10 +162,16 @@ object MiMa extends AutoPlugin {
FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"),
FilterAnyProblemStartingWith("akka.cluster.sharding.ShardRegion"),
+ // #21647 pruning
+ FilterAnyProblemStartingWith("akka.cluster.ddata.PruningState"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.RemovedNodePruning.modifiedByNodes"),
+ FilterAnyProblemStartingWith("akka.cluster.ddata.Replicator"),
+ FilterAnyProblemStartingWith("akka.cluster.ddata.protobuf.msg"),
+
// #21537 coordinated shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"),
-
+
// #21423 removal of deprecated stages (in 2.5.x)
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"),