make pruning of CRDT garbage work, #21647

* fix merge issues of DataEnvelope and its pruning
* simplify by removing the tombstones, which didn't work in all cases anyway
* keep the PruningPerformed markers in the DataEnvelope until configured
  TTL has elapsed (wall clock)
* simplify PruningState structure
* also store the pruning markers in durable data
* collect removed nodes from the data, listing on MemberRemoved is not enough
* possibility to disable pruning altogether
* documented caveat for durable data
This commit is contained in:
Patrik Nordwall 2017-01-11 13:19:45 +01:00
parent c5d18c30d6
commit 952be31a7d
28 changed files with 951 additions and 229 deletions

View file

@ -7622,6 +7622,16 @@ public final class ReplicatorMessages {
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.AddressOrBuilder getSeenOrBuilder(
int index);
// optional sint64 obsoleteTime = 5;
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
boolean hasObsoleteTime();
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
long getObsoleteTime();
}
/**
* Protobuf type {@code akka.cluster.ddata.DataEnvelope.PruningEntry}
@ -7713,6 +7723,11 @@ public final class ReplicatorMessages {
seen_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Address.PARSER, extensionRegistry));
break;
}
case 40: {
bitField0_ |= 0x00000008;
obsoleteTime_ = input.readSInt64();
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -7852,11 +7867,28 @@ public final class ReplicatorMessages {
return seen_.get(index);
}
// optional sint64 obsoleteTime = 5;
public static final int OBSOLETETIME_FIELD_NUMBER = 5;
private long obsoleteTime_;
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public boolean hasObsoleteTime() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public long getObsoleteTime() {
return obsoleteTime_;
}
private void initFields() {
removedAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
ownerAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
performed_ = false;
seen_ = java.util.Collections.emptyList();
obsoleteTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -7908,6 +7940,9 @@ public final class ReplicatorMessages {
for (int i = 0; i < seen_.size(); i++) {
output.writeMessage(4, seen_.get(i));
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeSInt64(5, obsoleteTime_);
}
getUnknownFields().writeTo(output);
}
@ -7933,6 +7968,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(4, seen_.get(i));
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += akka.protobuf.CodedOutputStream
.computeSInt64Size(5, obsoleteTime_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -8072,6 +8111,8 @@ public final class ReplicatorMessages {
} else {
seenBuilder_.clear();
}
obsoleteTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@ -8129,6 +8170,10 @@ public final class ReplicatorMessages {
} else {
result.seen_ = seenBuilder_.build();
}
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
to_bitField0_ |= 0x00000008;
}
result.obsoleteTime_ = obsoleteTime_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -8180,6 +8225,9 @@ public final class ReplicatorMessages {
}
}
}
if (other.hasObsoleteTime()) {
setObsoleteTime(other.getObsoleteTime());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -8740,6 +8788,39 @@ public final class ReplicatorMessages {
return seenBuilder_;
}
// optional sint64 obsoleteTime = 5;
private long obsoleteTime_ ;
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public boolean hasObsoleteTime() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public long getObsoleteTime() {
return obsoleteTime_;
}
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public Builder setObsoleteTime(long value) {
bitField0_ |= 0x00000010;
obsoleteTime_ = value;
onChanged();
return this;
}
/**
* <code>optional sint64 obsoleteTime = 5;</code>
*/
public Builder clearObsoleteTime() {
bitField0_ = (bitField0_ & ~0x00000010);
obsoleteTime_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DataEnvelope.PruningEntry)
}
@ -14781,6 +14862,31 @@ public final class ReplicatorMessages {
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder();
// repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry>
getPruningList();
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index);
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
int getPruningCount();
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
java.util.List<? extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
getPruningOrBuilderList();
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
int index);
}
/**
* Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
@ -14846,6 +14952,14 @@ public final class ReplicatorMessages {
bitField0_ |= 0x00000001;
break;
}
case 18: {
if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
pruning_ = new java.util.ArrayList<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry>();
mutable_bitField0_ |= 0x00000002;
}
pruning_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.PARSER, extensionRegistry));
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -14854,6 +14968,9 @@ public final class ReplicatorMessages {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
pruning_ = java.util.Collections.unmodifiableList(pruning_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -14908,8 +15025,45 @@ public final class ReplicatorMessages {
return data_;
}
// repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
public static final int PRUNING_FIELD_NUMBER = 2;
private java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> pruning_;
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> getPruningList() {
return pruning_;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public java.util.List<? extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
getPruningOrBuilderList() {
return pruning_;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public int getPruningCount() {
return pruning_.size();
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) {
return pruning_.get(index);
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
int index) {
return pruning_.get(index);
}
private void initFields() {
data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
pruning_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -14924,6 +15078,12 @@ public final class ReplicatorMessages {
memoizedIsInitialized = 0;
return false;
}
for (int i = 0; i < getPruningCount(); i++) {
if (!getPruning(i).isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
@ -14934,6 +15094,9 @@ public final class ReplicatorMessages {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, data_);
}
for (int i = 0; i < pruning_.size(); i++) {
output.writeMessage(2, pruning_.get(i));
}
getUnknownFields().writeTo(output);
}
@ -14947,6 +15110,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, data_);
}
for (int i = 0; i < pruning_.size(); i++) {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(2, pruning_.get(i));
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -15056,6 +15223,7 @@ public final class ReplicatorMessages {
private void maybeForceBuilderInitialization() {
if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getDataFieldBuilder();
getPruningFieldBuilder();
}
}
private static Builder create() {
@ -15070,6 +15238,12 @@ public final class ReplicatorMessages {
dataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
if (pruningBuilder_ == null) {
pruning_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000002);
} else {
pruningBuilder_.clear();
}
return this;
}
@ -15106,6 +15280,15 @@ public final class ReplicatorMessages {
} else {
result.data_ = dataBuilder_.build();
}
if (pruningBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
pruning_ = java.util.Collections.unmodifiableList(pruning_);
bitField0_ = (bitField0_ & ~0x00000002);
}
result.pruning_ = pruning_;
} else {
result.pruning_ = pruningBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -15125,6 +15308,32 @@ public final class ReplicatorMessages {
if (other.hasData()) {
mergeData(other.getData());
}
if (pruningBuilder_ == null) {
if (!other.pruning_.isEmpty()) {
if (pruning_.isEmpty()) {
pruning_ = other.pruning_;
bitField0_ = (bitField0_ & ~0x00000002);
} else {
ensurePruningIsMutable();
pruning_.addAll(other.pruning_);
}
onChanged();
}
} else {
if (!other.pruning_.isEmpty()) {
if (pruningBuilder_.isEmpty()) {
pruningBuilder_.dispose();
pruningBuilder_ = null;
pruning_ = other.pruning_;
bitField0_ = (bitField0_ & ~0x00000002);
pruningBuilder_ =
akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
getPruningFieldBuilder() : null;
} else {
pruningBuilder_.addAllMessages(other.pruning_);
}
}
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -15138,6 +15347,12 @@ public final class ReplicatorMessages {
return false;
}
for (int i = 0; i < getPruningCount(); i++) {
if (!getPruning(i).isInitialized()) {
return false;
}
}
return true;
}
@ -15277,6 +15492,246 @@ public final class ReplicatorMessages {
return dataBuilder_;
}
// repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;
private java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> pruning_ =
java.util.Collections.emptyList();
private void ensurePruningIsMutable() {
if (!((bitField0_ & 0x00000002) == 0x00000002)) {
pruning_ = new java.util.ArrayList<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry>(pruning_);
bitField0_ |= 0x00000002;
}
}
private akka.protobuf.RepeatedFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder> pruningBuilder_;
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> getPruningList() {
if (pruningBuilder_ == null) {
return java.util.Collections.unmodifiableList(pruning_);
} else {
return pruningBuilder_.getMessageList();
}
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public int getPruningCount() {
if (pruningBuilder_ == null) {
return pruning_.size();
} else {
return pruningBuilder_.getCount();
}
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) {
if (pruningBuilder_ == null) {
return pruning_.get(index);
} else {
return pruningBuilder_.getMessage(index);
}
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder setPruning(
int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
if (pruningBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensurePruningIsMutable();
pruning_.set(index, value);
onChanged();
} else {
pruningBuilder_.setMessage(index, value);
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder setPruning(
int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
if (pruningBuilder_ == null) {
ensurePruningIsMutable();
pruning_.set(index, builderForValue.build());
onChanged();
} else {
pruningBuilder_.setMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder addPruning(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
if (pruningBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensurePruningIsMutable();
pruning_.add(value);
onChanged();
} else {
pruningBuilder_.addMessage(value);
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder addPruning(
int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) {
if (pruningBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensurePruningIsMutable();
pruning_.add(index, value);
onChanged();
} else {
pruningBuilder_.addMessage(index, value);
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder addPruning(
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
if (pruningBuilder_ == null) {
ensurePruningIsMutable();
pruning_.add(builderForValue.build());
onChanged();
} else {
pruningBuilder_.addMessage(builderForValue.build());
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder addPruning(
int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) {
if (pruningBuilder_ == null) {
ensurePruningIsMutable();
pruning_.add(index, builderForValue.build());
onChanged();
} else {
pruningBuilder_.addMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder addAllPruning(
java.lang.Iterable<? extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry> values) {
if (pruningBuilder_ == null) {
ensurePruningIsMutable();
super.addAll(values, pruning_);
onChanged();
} else {
pruningBuilder_.addAllMessages(values);
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder clearPruning() {
if (pruningBuilder_ == null) {
pruning_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000002);
onChanged();
} else {
pruningBuilder_.clear();
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public Builder removePruning(int index) {
if (pruningBuilder_ == null) {
ensurePruningIsMutable();
pruning_.remove(index);
onChanged();
} else {
pruningBuilder_.remove(index);
}
return this;
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder getPruningBuilder(
int index) {
return getPruningFieldBuilder().getBuilder(index);
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder(
int index) {
if (pruningBuilder_ == null) {
return pruning_.get(index); } else {
return pruningBuilder_.getMessageOrBuilder(index);
}
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public java.util.List<? extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
getPruningOrBuilderList() {
if (pruningBuilder_ != null) {
return pruningBuilder_.getMessageOrBuilderList();
} else {
return java.util.Collections.unmodifiableList(pruning_);
}
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder() {
return getPruningFieldBuilder().addBuilder(
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance());
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder(
int index) {
return getPruningFieldBuilder().addBuilder(
index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance());
}
/**
* <code>repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2;</code>
*/
public java.util.List<akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder>
getPruningBuilderList() {
return getPruningFieldBuilder().getBuilderList();
}
private akka.protobuf.RepeatedFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>
getPruningFieldBuilder() {
if (pruningBuilder_ == null) {
pruningBuilder_ = new akka.protobuf.RepeatedFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>(
pruning_,
((bitField0_ & 0x00000002) == 0x00000002),
getParentForChildren(),
isClean());
pruning_ = null;
}
return pruningBuilder_;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DurableDataEnvelope)
}
@ -15431,32 +15886,34 @@ public final class ReplicatorMessages {
"key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.clust" +
"er.ddata.DataEnvelope\"\007\n\005Empty\"\023\n\004Read\022\013" +
"\n\003key\030\001 \002(\t\"@\n\nReadResult\0222\n\010envelope\030\001 " +
"\001(\0132 .akka.cluster.ddata.DataEnvelope\"\301\002" +
"\001(\0132 .akka.cluster.ddata.DataEnvelope\"\327\002" +
"\n\014DataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.clus" +
"ter.ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132" +
"-.akka.cluster.ddata.DataEnvelope.Prunin" +
"gEntry\032\300\001\n\014PruningEntry\0229\n\016removedAddres" +
"gEntry\032\326\001\n\014PruningEntry\0229\n\016removedAddres" +
"s\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddre",
"ss\0227\n\014ownerAddress\030\002 \002(\0132!.akka.cluster." +
"ddata.UniqueAddress\022\021\n\tperformed\030\003 \002(\010\022)" +
"\n\004seen\030\004 \003(\0132\033.akka.cluster.ddata.Addres" +
"s\"\203\001\n\006Status\022\r\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks" +
"\030\002 \002(\r\0221\n\007entries\030\003 \003(\0132 .akka.cluster.d" +
"data.Status.Entry\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022" +
"\016\n\006digest\030\002 \002(\014\"\227\001\n\006Gossip\022\020\n\010sendBack\030\001" +
" \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda" +
"ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" +
"\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat",
"aEnvelope\"X\n\rUniqueAddress\022,\n\007address\030\001 " +
"\002(\0132\033.akka.cluster.ddata.Address\022\013\n\003uid\030" +
"\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostna" +
"me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" +
"\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" +
"\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" +
"GSet\022\020\n\010elements\030\001 \003(\t\"E\n\023DurableDataEnv" +
"elope\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata" +
".OtherMessageB#\n\037akka.cluster.ddata.prot" +
"obuf.msgH\001"
"s\022\024\n\014obsoleteTime\030\005 \001(\022\"\203\001\n\006Status\022\r\n\005ch" +
"unk\030\001 \002(\r\022\021\n\ttotChunks\030\002 \002(\r\0221\n\007entries\030" +
"\003 \003(\0132 .akka.cluster.ddata.Status.Entry\032" +
"$\n\005Entry\022\013\n\003key\030\001 \002(\t\022\016\n\006digest\030\002 \002(\014\"\227\001" +
"\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007entries\030\002 " +
"\003(\0132 .akka.cluster.ddata.Gossip.Entry\032H\n" +
"\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .",
"akka.cluster.ddata.DataEnvelope\"X\n\rUniqu" +
"eAddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster" +
".ddata.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(" +
"\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002" +
" \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMessage\030" +
"\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" +
"nifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030\001" +
" \003(\t\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 \002" +
"(\0132 .akka.cluster.ddata.OtherMessage\022>\n\007" +
"pruning\030\002 \003(\0132-.akka.cluster.ddata.DataE",
"nvelope.PruningEntryB#\n\037akka.cluster.dda" +
"ta.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -15540,7 +15997,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_descriptor,
new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", });
new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", "ObsoleteTime", });
internal_static_akka_cluster_ddata_Status_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_akka_cluster_ddata_Status_fieldAccessorTable = new
@ -15594,7 +16051,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor,
new java.lang.String[] { "Data", });
new java.lang.String[] { "Data", "Pruning", });
return null;
}
};

View file

@ -68,6 +68,7 @@ message DataEnvelope {
required UniqueAddress ownerAddress = 2;
required bool performed = 3;
repeated Address seen = 4;
optional sint64 obsoleteTime = 5;
}
required OtherMessage data = 1;
@ -119,4 +120,5 @@ message StringGSet {
message DurableDataEnvelope {
required OtherMessage data = 1;
repeated DataEnvelope.PruningEntry pruning = 2;
}

View file

@ -32,14 +32,26 @@ akka.cluster.distributed-data {
use-dispatcher = ""
# How often the Replicator checks for pruning of data associated with
# removed cluster nodes.
pruning-interval = 30 s
# removed cluster nodes. If this is set to 'off' the pruning feature will
# be completely disabled.
pruning-interval = 120 s
# How long time it takes (worst case) to spread the data to all other replica nodes.
# How long time it takes to spread the data to all other replica nodes.
# This is used when initiating and completing the pruning process of data associated
# with removed cluster nodes. The time measurement is stopped when any replica is
# unreachable, so it should be configured to worst case in a healthy cluster.
max-pruning-dissemination = 60 s
# unreachable, but it's still recommended to configure this with certain margin.
# It should be in the magnitude of minutes even though typical dissemination time
# is shorter (grows logarithmic with number of nodes). There is no advantage of
# setting this too low. Setting it to large value will delay the pruning process.
max-pruning-dissemination = 300 s
# The markers of that pruning has been performed for a removed node are kept for this
# time and thereafter removed. If and old data entry that was never pruned is somehow
# injected and merged with existing data after this time the value will not be correct.
# This would be possible (although unlikely) in the case of a long network partition.
# It should be in the magnitude of hours. For durable data it is configured by
# 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
pruning-marker-time-to-live = 6 h
# Serialized Write and Read messages are cached when they are sent to
# several nodes. If no further activity they are removed from the cache
@ -51,6 +63,17 @@ akka.cluster.distributed-data {
# end of a key.
keys = []
# The markers of that pruning has been performed for a removed node are kept for this
# time and thereafter removed. If and old data entry that was never pruned is
# injected and merged with existing data after this time the value will not be correct.
# This would be possible if replica with durable data didn't participate in the pruning
# (e.g. it was shutdown) and later started after this time. A durable replica should not
# be stopped for longer time than this duration and if it is joining again after this
# duration its data should first be manually removed (from the lmdb directory).
# It should be in the magnitude of days. Note that there is a corresponding setting
# for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
pruning-marker-time-to-live = 10 d
# Fully qualified class name of the durable store actor. It must be a subclass
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with

View file

@ -19,6 +19,7 @@ import akka.actor.DeadLetterSuppression
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator.ReplicatorMessage
import akka.cluster.ddata.Replicator.Internal.DataEnvelope
import akka.io.DirectByteBufferPool
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
@ -52,7 +53,7 @@ object DurableStore {
* should be used to signal success or failure of the operation to the contained
* `replyTo` actor.
*/
final case class Store(key: String, data: ReplicatedData, reply: Option[StoreReply])
final case class Store(key: String, data: DurableDataEnvelope, reply: Option[StoreReply])
final case class StoreReply(successMsg: Any, failureMsg: Any, replyTo: ActorRef)
/**
@ -65,7 +66,7 @@ object DurableStore {
* will stop itself and the durable store.
*/
case object LoadAll
final case class LoadData(data: Map[String, ReplicatedData])
final case class LoadData(data: Map[String, DurableDataEnvelope])
case object LoadAllCompleted
class LoadFailed(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
@ -77,7 +78,13 @@ object DurableStore {
* the wrapped `ReplicatedData` including its serializerId and
* manifest.
*/
final class DurableDataEnvelope(val data: ReplicatedData) extends ReplicatorMessage {
final class DurableDataEnvelope private[akka] (
private[akka] val dataEnvelope: DataEnvelope) extends ReplicatorMessage {
def this(data: ReplicatedData) = this(DataEnvelope(data))
def data: ReplicatedData = dataEnvelope.data
override def toString(): String = s"DurableDataEnvelope($data)"
override def hashCode(): Int = data.hashCode
override def equals(o: Any): Boolean = o match {
@ -136,7 +143,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
}
// pending write behind
val pending = new java.util.HashMap[String, ReplicatedData]
val pending = new java.util.HashMap[String, DurableDataEnvelope]
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
@ -171,7 +178,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
val valArray = Array.ofDim[Byte](entry.`val`.remaining)
entry.`val`.get(valArray)
val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope]
key envelope.data
key envelope
}.toMap)
if (loadData.data.nonEmpty)
sender() ! loadData
@ -220,10 +227,10 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
writeBehind()
}
def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: ReplicatedData): Unit = {
def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: DurableDataEnvelope): Unit = {
try {
keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
val value = serializer.toBinary(new DurableDataEnvelope(data))
val value = serializer.toBinary(data)
ensureValueBufferSize(value.length)
valueBuffer.put(value).flip()
tx match {

View file

@ -102,6 +102,8 @@ final class GCounter private[akka] (
new GCounter(merged)
}
override def modifiedByNodes: Set[UniqueAddress] = state.keySet
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
state.contains(removedNode)

View file

@ -143,6 +143,9 @@ final class LWWMap[A, B] private[akka] (
override def merge(that: LWWMap[A, B]): LWWMap[A, B] =
new LWWMap(underlying.merge(that.underlying))
override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)

View file

@ -177,6 +177,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
new ORMap(mergedKeys, mergedValues)
}
override def modifiedByNodes: Set[UniqueAddress] = {
keys.modifiedByNodes union values.foldLeft(Set.empty[UniqueAddress]) {
case (acc, (_, data: RemovedNodePruning)) acc union data.modifiedByNodes
case (acc, _) acc
}
}
override def needPruningFrom(removedNode: UniqueAddress): Boolean = {
keys.needPruningFrom(removedNode) || values.exists {
case (_, data: RemovedNodePruning) data.needPruningFrom(removedNode)

View file

@ -204,6 +204,9 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
else
this
override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)

View file

@ -307,6 +307,9 @@ final class ORSet[A] private[akka] (
}
}
override def modifiedByNodes: Set[UniqueAddress] =
vvector.modifiedByNodes
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
vvector.needPruningFrom(removedNode)

View file

@ -94,6 +94,9 @@ final class PNCounter private[akka] (
increments = that.increments.merge(this.increments),
decrements = that.decrements.merge(this.decrements))
override def modifiedByNodes: Set[UniqueAddress] =
increments.modifiedByNodes union decrements.modifiedByNodes
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
increments.needPruningFrom(removedNode) || decrements.needPruningFrom(removedNode)

View file

@ -123,6 +123,9 @@ final class PNCounterMap[A] private[akka] (
override def merge(that: PNCounterMap[A]): PNCounterMap[A] =
new PNCounterMap(underlying.merge(that.underlying))
override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)

View file

@ -11,36 +11,37 @@ import akka.cluster.UniqueAddress
* INTERNAL API
*/
private[akka] object PruningState {
sealed trait PruningPhase
final case class PruningInitialized(seen: Set[Address]) extends PruningPhase
case object PruningPerformed extends PruningPhase
final case class PruningInitialized(owner: UniqueAddress, seen: Set[Address]) extends PruningState {
override def addSeen(node: Address): PruningState = {
if (seen(node) || owner.address == node) this
else copy(seen = seen + node)
}
}
final case class PruningPerformed(obsoleteTime: Long) extends PruningState {
def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime
}
}
/**
* INTERNAL API
*/
private[akka] final case class PruningState(owner: UniqueAddress, phase: PruningState.PruningPhase) {
private[akka] sealed trait PruningState {
import PruningState._
def merge(that: PruningState): PruningState =
(this.phase, that.phase) match {
// FIXME this will add the PruningPerformed back again when one is None
case (PruningPerformed, _) this
case (_, PruningPerformed) that
case (PruningInitialized(thisSeen), PruningInitialized(thatSeen))
if (this.owner == that.owner)
copy(phase = PruningInitialized(thisSeen union thatSeen))
else if (Member.addressOrdering.compare(this.owner.address, that.owner.address) > 0)
(this, that) match {
case (p1: PruningPerformed, p2: PruningPerformed) if (p1.obsoleteTime >= p2.obsoleteTime) this else that
case (_: PruningPerformed, _) this
case (_, _: PruningPerformed) that
case (PruningInitialized(thisOwner, thisSeen), PruningInitialized(thatOwner, thatSeen))
if (thisOwner == thatOwner)
PruningInitialized(thisOwner, thisSeen union thatSeen)
else if (Member.addressOrdering.compare(thisOwner.address, thatOwner.address) > 0)
that
else
this
}
def addSeen(node: Address): PruningState = phase match {
case PruningInitialized(seen)
if (seen(node) || owner.address == node) this
else copy(phase = PruningInitialized(seen + node))
case _ this
}
def addSeen(node: Address): PruningState = this
}

View file

@ -67,9 +67,19 @@ abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends Re
* When a node is removed from the cluster these methods will be
* used by the [[Replicator]] to collapse data from the removed node
* into some other node in the cluster.
*
* See process description in the 'CRDT Garbage' section of the [[Replicator]]
* documentation.
*/
trait RemovedNodePruning extends ReplicatedData {
/**
* The nodes that have changed the state for this data
* and would need pruning when such node is no longer part
* of the cluster.
*/
def modifiedByNodes: Set[UniqueAddress]
/**
* Does it have any state changes from a specific node,
* which has been removed from the cluster.

View file

@ -43,6 +43,7 @@ import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
object ReplicatorSettings {
@ -63,6 +64,11 @@ object ReplicatorSettings {
case id id
}
val pruningInterval = toRootLowerCase(config.getString("pruning-interval")) match {
case "off" | "false" Duration.Zero
case _ config.getDuration("pruning-interval", MILLISECONDS).millis
}
import scala.collection.JavaConverters._
new ReplicatorSettings(
role = roleOption(config.getString("role")),
@ -70,10 +76,12 @@ object ReplicatorSettings {
notifySubscribersInterval = config.getDuration("notify-subscribers-interval", MILLISECONDS).millis,
maxDeltaElements = config.getInt("max-delta-elements"),
dispatcher = dispatcher,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
pruningInterval = pruningInterval,
maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis,
durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
durableKeys = config.getStringList("durable.keys").asScala.toSet)
durableKeys = config.getStringList("durable.keys").asScala.toSet,
pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis,
durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis)
}
/**
@ -110,21 +118,30 @@ object ReplicatorSettings {
* in the `Set`.
*/
final class ReplicatorSettings(
val role: Option[String],
val gossipInterval: FiniteDuration,
val notifySubscribersInterval: FiniteDuration,
val maxDeltaElements: Int,
val dispatcher: String,
val pruningInterval: FiniteDuration,
val maxPruningDissemination: FiniteDuration,
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String]) {
val role: Option[String],
val gossipInterval: FiniteDuration,
val notifySubscribersInterval: FiniteDuration,
val maxDeltaElements: Int,
val dispatcher: String,
val pruningInterval: FiniteDuration,
val maxPruningDissemination: FiniteDuration,
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String],
val pruningMarkerTimeToLive: FiniteDuration,
val durablePruningMarkerTimeToLive: FiniteDuration) {
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, Right(Props.empty), Set.empty)
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days)
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@ -150,6 +167,13 @@ final class ReplicatorSettings(
def withPruning(pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration): ReplicatorSettings =
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
def withPruningMarkerTimeToLive(
pruningMarkerTimeToLive: FiniteDuration,
durablePruningMarkerTimeToLive: FiniteDuration): ReplicatorSettings =
copy(
pruningMarkerTimeToLive = pruningMarkerTimeToLive,
durablePruningMarkerTimeToLive = durablePruningMarkerTimeToLive)
def withDurableStoreProps(durableStoreProps: Props): ReplicatorSettings =
copy(durableStoreProps = Right(durableStoreProps))
@ -168,17 +192,20 @@ final class ReplicatorSettings(
}
private def copy(
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
maxDeltaElements: Int = maxDeltaElements,
dispatcher: String = dispatcher,
pruningInterval: FiniteDuration = pruningInterval,
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys): ReplicatorSettings =
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
maxDeltaElements: Int = maxDeltaElements,
dispatcher: String = dispatcher,
pruningInterval: FiniteDuration = pruningInterval,
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys,
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys)
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive)
}
object Replicator {
@ -562,6 +589,9 @@ object Replicator {
val LazyDigest: Digest = ByteString(0)
val NotFoundDigest: Digest = ByteString(-1)
/**
* The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
*/
final case class DataEnvelope(
data: ReplicatedData,
pruning: Map[UniqueAddress, PruningState] = Map.empty)
@ -576,36 +606,51 @@ object Replicator {
}
def initRemovedNodePruning(removed: UniqueAddress, owner: UniqueAddress): DataEnvelope = {
copy(pruning = pruning.updated(removed, PruningState(owner, PruningInitialized(Set.empty))))
copy(pruning = pruning.updated(removed, PruningInitialized(owner, Set.empty)))
}
def prune(from: UniqueAddress): DataEnvelope = {
def prune(from: UniqueAddress, pruningPerformed: PruningPerformed): DataEnvelope = {
data match {
case dataWithRemovedNodePruning: RemovedNodePruning
require(pruning.contains(from))
val to = pruning(from).owner
val prunedData = dataWithRemovedNodePruning.prune(from, to)
copy(data = prunedData, pruning = pruning.updated(from, PruningState(to, PruningPerformed)))
pruning(from) match {
case PruningInitialized(owner, _)
val prunedData = dataWithRemovedNodePruning.prune(from, owner)
copy(data = prunedData, pruning = pruning.updated(from, pruningPerformed))
case _
this
}
case _ this
}
}
def merge(other: DataEnvelope): DataEnvelope =
if (other.data == DeletedData) DeletedEnvelope
else {
var mergedRemovedNodePruning = other.pruning
for ((key, thisValue) pruning) {
mergedRemovedNodePruning.get(key) match {
case None
mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
case Some(thatValue)
mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue merge thatValue)
val mergedPruning =
pruning.foldLeft(other.pruning) {
case (acc, (key, thisValue))
acc.get(key) match {
case None
acc.updated(key, thisValue)
case Some(thatValue)
acc.updated(key, thisValue merge thatValue)
}
}
val filteredMergedPruning = {
if (mergedPruning.isEmpty) mergedPruning
else {
val currentTime = System.currentTimeMillis()
mergedPruning.filter {
case (_, p: PruningPerformed) !p.isObsolete(currentTime)
case _ true
}
}
}
// cleanup both sides before merging, `merge((otherData: ReplicatedData)` will cleanup other.data
copy(data = cleaned(data, mergedRemovedNodePruning), pruning = mergedRemovedNodePruning).merge(other.data)
copy(data = cleaned(data, filteredMergedPruning), pruning = filteredMergedPruning).merge(other.data)
}
def merge(otherData: ReplicatedData): DataEnvelope =
@ -613,7 +658,7 @@ object Replicator {
else copy(data = data merge cleaned(otherData, pruning).asInstanceOf[data.T])
private def cleaned(c: ReplicatedData, p: Map[UniqueAddress, PruningState]): ReplicatedData = p.foldLeft(c) {
case (c: RemovedNodePruning, (removed, PruningState(_, PruningPerformed)))
case (c: RemovedNodePruning, (removed, _: PruningPerformed))
if (c.needPruningFrom(removed)) c.pruningCleanup(removed) else c
case (c, _) c
}
@ -801,7 +846,8 @@ object Replicator {
* <li>When a node is removed from the cluster it is first important that all updates that were
* done by that node are disseminated to all other nodes. The pruning will not start before the
* `maxPruningDissemination` duration has elapsed. The time measurement is stopped when any
* replica is unreachable, so it should be configured to worst case in a healthy cluster.</li>
* replica is unreachable, but it's still recommended to configure this with certain margin.
* It should be in the magnitude of minutes.</li>
* <li>The nodes are ordered by their address and the node ordered first is called leader.
* The leader initiates the pruning by adding a `PruningInitialized` marker in the data envelope.
* This is gossiped to all other nodes and they mark it as seen when they receive it.</li>
@ -840,7 +886,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import context.dispatcher
val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick)
val notifyTask = context.system.scheduler.schedule(notifySubscribersInterval, notifySubscribersInterval, self, FlushChanges)
val pruningTask = context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick)
val pruningTask =
if (pruningInterval >= Duration.Zero)
Some(context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick))
else None
val clockTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, ClockTick)
val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
@ -867,11 +916,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// cluster weaklyUp nodes, doesn't contain selfAddress
var weaklyUpNodes: Set[Address] = Set.empty
// nodes removed from cluster, to be pruned, and tombstoned
var removedNodes: Map[UniqueAddress, Long] = Map.empty
var pruningPerformed: Map[UniqueAddress, Long] = Map.empty
var tombstoneNodes: Set[UniqueAddress] = Set.empty
var leader: Option[Address] = None
def isLeader: Boolean = leader.exists(_ == selfAddress)
@ -921,7 +966,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
cluster.unsubscribe(self)
gossipTask.cancel()
notifyTask.cancel()
pruningTask.cancel()
pruningTask.foreach(_.cancel())
clockTask.cancel()
}
@ -964,11 +1009,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
count += data.size
data.foreach {
case (key, d)
val envelope = DataEnvelope(d)
write(key, envelope) match {
write(key, d.dataEnvelope) match {
case Some(newEnvelope)
if (newEnvelope.data ne envelope.data)
durableStore ! Store(key, newEnvelope.data, None)
if (newEnvelope.data ne d.dataEnvelope.data)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
}
@ -1059,18 +1103,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
localValue match {
case Some(DataEnvelope(DeletedData, _)) throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _))
existing.merge(modify(Some(existing)).asInstanceOf[existing.T])
case None modify(None)
envelope.merge(modify(Some(existing)).asInstanceOf[existing.T])
case None DataEnvelope(modify(None))
}
} match {
case Success(newData)
log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, newData)
val envelope = DataEnvelope(pruningCleanupTombstoned(newData))
case Success(envelope)
log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, envelope.data)
setData(key.id, envelope)
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
durableStore ! Store(key.id, envelope.data,
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! UpdateSuccess(key, req)
@ -1079,7 +1122,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, envelope.data,
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
@ -1106,7 +1149,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, replyTo)))
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo)))
else
replyTo ! WriteAck
case None
@ -1115,10 +1158,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] =
getData(key) match {
case Some(DataEnvelope(DeletedData, _)) Some(writeEnvelope) // already deleted
case Some(DataEnvelope(DeletedData, _)) Some(DeletedEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _))
if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) {
val merged = envelope.merge(pruningCleanupTombstoned(writeEnvelope)).addSeen(selfAddress)
val merged = envelope.merge(writeEnvelope).addSeen(selfAddress)
setData(key, merged)
Some(merged)
} else {
@ -1128,16 +1171,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
None
}
case None
val cleaned = pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress)
setData(key, cleaned)
Some(cleaned)
val writeEnvelope2 = writeEnvelope.addSeen(selfAddress)
setData(key, writeEnvelope2)
Some(writeEnvelope2)
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
replyTo ! ReadRepairAck
@ -1160,7 +1203,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
if (durable)
durableStore ! Store(key.id, DeletedData,
durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! DeleteSuccess(key, req)
@ -1169,7 +1212,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, DeletedData,
durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
@ -1313,7 +1356,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
if (sendBack) getData(key) match {
@ -1380,6 +1423,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else if (matchingRole(m)) {
nodes -= m.address
weaklyUpNodes -= m.address
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
}
@ -1402,12 +1446,31 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveRemovedNodePruningTick(): Unit = {
if (isLeader && removedNodes.nonEmpty) {
initRemovedNodePruning()
// See 'CRDT Garbage' section in Replicator Scaladoc for description of the process
if (unreachable.isEmpty) {
if (isLeader) {
collectRemovedNodes()
initRemovedNodePruning()
}
performRemovedNodePruning()
deleteObsoletePruningPerformed()
}
}
def collectRemovedNodes(): Unit = {
val knownNodes = nodes union weaklyUpNodes union removedNodes.keySet.map(_.address)
val newRemovedNodes =
dataEntries.foldLeft(Set.empty[UniqueAddress]) {
case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _)))
acc union data.modifiedByNodes.filterNot(n n == selfUniqueAddress || knownNodes(n.address))
case (acc, _)
acc
}
newRemovedNodes.foreach { n
log.debug("Adding removed node [{}] from data", n)
removedNodes = removedNodes.updated(n, allReachableClockTime)
}
performRemovedNodePruning()
// FIXME tombstoneRemovedNodePruning doesn't work, since merge of PruningState will add the PruningPerformed back again
// tombstoneRemovedNodePruning()
}
def initRemovedNodePruning(): Unit = {
@ -1417,22 +1480,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}(collection.breakOut)
if (removedSet.nonEmpty) {
// FIXME handle pruning of durable data, this is difficult and requires more thought
for ((key, (envelope, _)) dataEntries; removed removedSet) {
def init(): Unit = {
val newEnvelope = envelope.initRemovedNodePruning(removed, selfUniqueAddress)
log.debug("Initiated pruning of [{}] for data key [{}]", removed, key)
log.debug("Initiated pruning of [{}] for data key [{}] to [{}]", removed, key, selfUniqueAddress)
setData(key, newEnvelope)
}
if (envelope.needPruningFrom(removed)) {
envelope.data match {
case dataWithRemovedNodePruning: RemovedNodePruning
envelope.pruning.get(removed) match {
case None init()
case Some(PruningState(owner, PruningInitialized(_))) if owner != selfUniqueAddress init()
case Some(PruningInitialized(owner, _)) if owner != selfUniqueAddress init()
case _ // already in progress
}
case _
@ -1444,78 +1505,44 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def performRemovedNodePruning(): Unit = {
// perform pruning when all seen Init
val allNodes = nodes union weaklyUpNodes
val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis)
val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _))
pruning.foreach {
case (removed, PruningState(owner, PruningInitialized(seen))) if owner == selfUniqueAddress
&& (nodes.isEmpty || nodes.forall(seen))
val newEnvelope = envelope.prune(removed)
pruningPerformed = pruningPerformed.updated(removed, allReachableClockTime)
case (removed, PruningInitialized(owner, seen)) if owner == selfUniqueAddress
&& (allNodes.isEmpty || allNodes.forall(seen))
val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
if ((newEnvelope.data ne data) && isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case _
}
case _ // deleted, or pruning not needed
}
}
def tombstoneRemovedNodePruning(): Unit = {
def allPruningPerformed(removed: UniqueAddress): Boolean = {
dataEntries forall {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _))
pruning.get(removed) match {
case Some(PruningState(_, PruningInitialized(_))) false
case _ true
}
case _ true // deleted, or pruning not needed
}
}
// FIXME pruningPerformed is only updated on one node, but tombstoneNodes should be on all
pruningPerformed.foreach {
case (removed, timestamp) if ((allReachableClockTime - timestamp) > maxPruningDisseminationNanos) &&
allPruningPerformed(removed)
log.debug("All pruning performed for [{}], tombstoned", removed)
pruningPerformed -= removed
removedNodes -= removed
tombstoneNodes += removed
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _))
val newEnvelope = pruningCleanupTombstoned(removed, envelope)
setData(key, newEnvelope)
if ((newEnvelope.data ne data) && isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
case _ // deleted, or pruning not needed
def deleteObsoletePruningPerformed(): Unit = {
val currentTime = System.currentTimeMillis()
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning), _))
val newEnvelope = pruning.foldLeft(envelope) {
case (acc, (removed, p: PruningPerformed)) if p.isObsolete(currentTime)
log.debug("Removing obsolete pruning marker for [{}] in [{}]", removed, key)
removedNodes -= removed
acc.copy(pruning = acc.pruning - removed)
case (acc, _) acc
}
case (removed, timestamp) // not ready
if (newEnvelope ne envelope)
setData(key, newEnvelope)
case _ // deleted, or pruning not needed
}
}
def pruningCleanupTombstoned(envelope: DataEnvelope): DataEnvelope =
tombstoneNodes.foldLeft(envelope)((c, removed) pruningCleanupTombstoned(removed, c))
def pruningCleanupTombstoned(removed: UniqueAddress, envelope: DataEnvelope): DataEnvelope = {
val pruningCleanuped = pruningCleanupTombstoned(removed, envelope.data)
if ((pruningCleanuped ne envelope.data) || envelope.pruning.contains(removed))
envelope.copy(data = pruningCleanuped, pruning = envelope.pruning - removed)
else
envelope
}
def pruningCleanupTombstoned(data: ReplicatedData): ReplicatedData =
if (tombstoneNodes.isEmpty) data
else tombstoneNodes.foldLeft(data)((c, removed) pruningCleanupTombstoned(removed, c))
def pruningCleanupTombstoned(removed: UniqueAddress, data: ReplicatedData): ReplicatedData =
data match {
case dataWithRemovedNodePruning: RemovedNodePruning
if (dataWithRemovedNodePruning.needPruningFrom(removed)) dataWithRemovedNodePruning.pruningCleanup(removed) else data
case _ data
}
def receiveGetReplicaCount(): Unit = {
// selfAddress is not included in the set
replyTo ! ReplicaCount(nodes.size + 1)

View file

@ -292,6 +292,9 @@ final case class OneVersionVector private[akka] (node: UniqueAddress, version: L
}
}
override def modifiedByNodes: Set[UniqueAddress] =
Set(node)
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
node == removedNode
@ -353,6 +356,9 @@ final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) exten
}
}
override def modifiedByNodes: Set[UniqueAddress] =
versions.keySet
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
versions.contains(removedNode)

View file

@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import java.io.NotSerializableException
import akka.actor.Address
/**
* INTERNAL API
@ -155,6 +156,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
}(system.dispatcher)
private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray
private val dummyAddress = UniqueAddress(Address("a", "b", "c", 2552), 1L)
val GetManifest = "A"
val GetSuccessManifest = "B"
@ -396,14 +398,17 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
dataEnvelope.pruning.foreach {
case (removedAddress, state)
val b = dm.DataEnvelope.PruningEntry.newBuilder().
setRemovedAddress(uniqueAddressToProto(removedAddress)).
setOwnerAddress(uniqueAddressToProto(state.owner))
state.phase match {
case PruningState.PruningInitialized(seen)
setRemovedAddress(uniqueAddressToProto(removedAddress))
state match {
case PruningState.PruningInitialized(owner, seen)
seen.toVector.sorted(Member.addressOrdering).map(addressToProto).foreach { a b.addSeen(a) }
b.setOwnerAddress(uniqueAddressToProto(owner))
b.setPerformed(false)
case PruningState.PruningPerformed
b.setPerformed(true)
case PruningState.PruningPerformed(obsoleteTime)
b.setPerformed(true).setObsoleteTime(obsoleteTime)
// TODO ownerAddress is only needed for PruningInitialized, but kept here for
// wire backwards compatibility with 2.4.16 (required field)
b.setOwnerAddress(uniqueAddressToProto(dummyAddress))
}
dataEnvelopeBuilder.addPruning(b)
}
@ -414,17 +419,28 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
dataEnvelopeFromProto(dm.DataEnvelope.parseFrom(bytes))
private def dataEnvelopeFromProto(dataEnvelope: dm.DataEnvelope): DataEnvelope = {
val pruning: Map[UniqueAddress, PruningState] =
dataEnvelope.getPruningList.asScala.map { pruningEntry
val phase =
if (pruningEntry.getPerformed) PruningState.PruningPerformed
else PruningState.PruningInitialized(pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut))
val state = PruningState(uniqueAddressFromProto(pruningEntry.getOwnerAddress), phase)
val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData]
val pruning = pruningFromProto(dataEnvelope.getPruningList)
DataEnvelope(data, pruning)
}
private def pruningFromProto(pruningEntries: java.util.List[dm.DataEnvelope.PruningEntry]): Map[UniqueAddress, PruningState] = {
if (pruningEntries.isEmpty)
Map.empty
else
pruningEntries.asScala.map { pruningEntry
val state =
if (pruningEntry.getPerformed) {
// for wire compatibility with Akka 2.4.x
val obsoleteTime = if (pruningEntry.hasObsoleteTime) pruningEntry.getObsoleteTime else Long.MaxValue
PruningState.PruningPerformed(obsoleteTime)
} else
PruningState.PruningInitialized(
uniqueAddressFromProto(pruningEntry.getOwnerAddress),
pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut))
val removed = uniqueAddressFromProto(pruningEntry.getRemovedAddress)
removed state
}(breakOut)
val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData]
DataEnvelope(data, pruning)
}
private def writeToProto(write: Write): dm.Write =
@ -472,6 +488,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
private def durableDataEnvelopeFromProto(durableDataEnvelope: dm.DurableDataEnvelope): DurableDataEnvelope = {
val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData]
val pruning = pruningFromProto(durableDataEnvelope.getPruningList)
new DurableDataEnvelope(data)
}

View file

@ -23,7 +23,7 @@ final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeCo
val second = role("second")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = ["durable*"]

View file

@ -126,18 +126,19 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
}
within(15.seconds) {
var values = Set.empty[Int]
awaitAssert {
replicator ! Get(KeyA, ReadLocal)
val counter3 = expectMsgType[GetSuccess[GCounter]].dataValue
counter3.value should be(10)
val value = counter3.value.intValue
values += value
value should be(10)
counter3.state.size should be(3)
}
values should ===(Set(10))
}
enterBarrier("pruned")
// let it become tombstone
Thread.sleep(5000)
runOn(first) {
val addr = cluster2.selfAddress
val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
@ -150,15 +151,31 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
Cluster(sys3).join(node(first).address)
within(10.seconds) {
var values = Set.empty[Int]
awaitAssert {
replicator3.tell(Get(KeyA, ReadLocal), probe3.ref)
val counter4 = probe3.expectMsgType[GetSuccess[GCounter]].dataValue
counter4.value should be(10)
val value = counter4.value.intValue
values += value
value should be(10)
counter4.state.size should be(3)
}
values should ===(Set(10))
}
// after merging with others
replicator3 ! Get(KeyA, ReadAll(remainingOrDefault))
val counter5 = expectMsgType[GetSuccess[GCounter]].dataValue
counter5.value should be(10)
counter5.state.size should be(3)
}
enterBarrier("sys3-started")
replicator ! Get(KeyA, ReadAll(remainingOrDefault))
val counter6 = expectMsgType[GetSuccess[GCounter]].dataValue
counter6.value should be(10)
counter6.state.size should be(3)
enterBarrier("after-1")
}
}

View file

@ -122,14 +122,18 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
runOn(first, second) {
within(15.seconds) {
var values = Set.empty[Int]
awaitAssert {
replicator ! Get(KeyA, ReadLocal)
expectMsgPF() {
case g @ GetSuccess(KeyA, _)
g.get(KeyA).value should be(9)
val value = g.get(KeyA).value.toInt
values += value
value should be(9)
g.get(KeyA).needPruningFrom(thirdUniqueAddress) should be(false)
}
}
values should ===(Set(9))
}
within(5.seconds) {
awaitAssert {
@ -154,10 +158,12 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
}
enterBarrier("pruning-done")
// on one of the nodes the data has been updated by the pruning,
// client can update anyway
// after pruning performed we should not be able to update with data from removed node
def updateAfterPruning(expectedValue: Int): Unit = {
replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None)(_ + 1)
replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None) { existing
// inject data from removed node to simulate bad data
existing.merge(oldCounter) + 1
}
expectMsgPF() {
case UpdateSuccess(KeyA, _)
replicator ! Get(KeyA, ReadLocal)
@ -165,6 +171,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
retrieved.value should be(expectedValue)
}
}
runOn(first) {
updateAfterPruning(expectedValue = 10)
}
@ -175,19 +182,19 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
}
enterBarrier("update-second-after-pruning")
// after pruning performed and maxDissemination it is tombstoned
// and we should still not be able to update with data from removed node
// after full replication should still not be able to update with data from removed node
// but it would not work after removal of the PruningPerformed markers
expectNoMsg(maxPruningDissemination + 3.seconds)
runOn(first) {
updateAfterPruning(expectedValue = 12)
}
enterBarrier("update-first-after-tombstone")
enterBarrier("update-first-after-dissemination")
runOn(second) {
updateAfterPruning(expectedValue = 13)
}
enterBarrier("update-second-after-tombstone")
enterBarrier("update-second-after-dissemination")
enterBarrier("after-1")
}

View file

@ -13,6 +13,11 @@ public class JavaImplOfReplicatedData extends AbstractReplicatedData<JavaImplOfR
return this;
}
@Override
public scala.collection.immutable.Set<UniqueAddress> modifiedByNodes() {
return akka.japi.Util.immutableSeq(new java.util.ArrayList<UniqueAddress>()).toSet();
}
@Override
public boolean needPruningFrom(UniqueAddress removedNode) {
return false;

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.ddata
import akka.actor.Address
import akka.cluster.UniqueAddress
import org.scalatest.Matchers
import org.scalatest.WordSpec
import akka.cluster.ddata.Replicator.Internal.DataEnvelope
class DataEnvelopeSpec extends WordSpec with Matchers {
import PruningState._
val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)
val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L)
val obsoleteTimeInFuture = System.currentTimeMillis() + 3600 * 1000
val oldObsoleteTime = System.currentTimeMillis() - 3600 * 1000
"DataEnvelope" must {
"handle pruning transitions" in {
val g1 = GCounter.empty.increment(node1, 1)
val d1 = DataEnvelope(g1)
val d2 = d1.initRemovedNodePruning(node1, node2)
d2.pruning(node1).isInstanceOf[PruningInitialized] should ===(true)
d2.pruning(node1).asInstanceOf[PruningInitialized].owner should ===(node2)
val d3 = d2.addSeen(node3.address)
d3.pruning(node1).asInstanceOf[PruningInitialized].seen should ===(Set(node3.address))
val d4 = d3.prune(node1, PruningPerformed(obsoleteTimeInFuture))
d4.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
}
"merge correctly" in {
val g1 = GCounter.empty.increment(node1, 1)
val d1 = DataEnvelope(g1)
val g2 = GCounter.empty.increment(node2, 2)
val d2 = DataEnvelope(g2)
val d3 = d1.merge(d2)
d3.data.asInstanceOf[GCounter].value should ===(3)
d3.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node1, node2))
val d4 = d3.initRemovedNodePruning(node1, node2)
val d5 = d4.prune(node1, PruningPerformed(obsoleteTimeInFuture))
d5.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
// late update from node1
val g11 = g1.increment(node1, 10)
val d6 = d5.merge(DataEnvelope(g11))
d6.data.asInstanceOf[GCounter].value should ===(3)
d6.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2))
// remove obsolete
val d7 = d5.copy(pruning = d5.pruning.updated(node1, PruningPerformed(oldObsoleteTime)))
val d8 = d5.copy(pruning = Map.empty)
d8.merge(d7).pruning should ===(Map.empty)
d7.merge(d8).pruning should ===(Map.empty)
d5.merge(d7).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture))
d7.merge(d5).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture))
}
}
}

View file

@ -139,18 +139,22 @@ class GCounterSpec extends WordSpec with Matchers {
val c1 = GCounter()
val c2 = c1 increment node1
val c3 = c2 increment node2
c2.modifiedByNodes should ===(Set(node1))
c2.needPruningFrom(node1) should be(true)
c2.needPruningFrom(node2) should be(false)
c3.modifiedByNodes should ===(Set(node1, node2))
c3.needPruningFrom(node1) should be(true)
c3.needPruningFrom(node2) should be(true)
c3.value should be(2)
val c4 = c3.prune(node1, node2)
c4.modifiedByNodes should ===(Set(node2))
c4.needPruningFrom(node2) should be(true)
c4.needPruningFrom(node1) should be(false)
c4.value should be(2)
val c5 = (c4 increment node1).pruningCleanup(node1)
c5.modifiedByNodes should ===(Set(node2))
c5.needPruningFrom(node1) should be(false)
c4.value should be(2)
}

View file

@ -143,16 +143,20 @@ class PNCounterSpec extends WordSpec with Matchers {
val c1 = PNCounter()
val c2 = c1 increment node1
val c3 = c2 decrement node2
c2.modifiedByNodes should ===(Set(node1))
c2.needPruningFrom(node1) should be(true)
c2.needPruningFrom(node2) should be(false)
c3.modifiedByNodes should ===(Set(node1, node2))
c3.needPruningFrom(node1) should be(true)
c3.needPruningFrom(node2) should be(true)
val c4 = c3.prune(node1, node2)
c4.modifiedByNodes should ===(Set(node2))
c4.needPruningFrom(node2) should be(true)
c4.needPruningFrom(node1) should be(false)
val c5 = (c4 increment node1).pruningCleanup(node1)
c5.modifiedByNodes should ===(Set(node2))
c5.needPruningFrom(node1) should be(false)
}

View file

@ -12,32 +12,37 @@ import org.scalatest.WordSpec
class PruningStateSpec extends WordSpec with Matchers {
import PruningState._
val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2)
val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3)
val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4)
val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)
val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L)
"Pruning state" must {
"merge phase correctly" in {
val p1 = PruningState(node1, PruningInitialized(Set.empty))
val p2 = PruningState(node1, PruningPerformed)
p1.merge(p2).phase should be(PruningPerformed)
p2.merge(p1).phase should be(PruningPerformed)
"merge state correctly" in {
val p1 = PruningInitialized(node1, Set.empty)
val p2 = PruningPerformed(System.currentTimeMillis() + 3600 * 1000)
p1.merge(p2) should be(p2)
p2.merge(p1) should be(p2)
val p3 = p2.copy(p2.obsoleteTime - 1)
p2.merge(p3) should be(p2) // keep greatest obsoleteTime
p3.merge(p2) should be(p2)
}
"merge owner correctly" in {
val p1 = PruningState(node1, PruningInitialized(Set.empty))
val p2 = PruningState(node2, PruningInitialized(Set.empty))
val expected = PruningState(node1, PruningInitialized(Set.empty))
val p1 = PruningInitialized(node1, Set.empty)
val p2 = PruningInitialized(node2, Set.empty)
val expected = PruningInitialized(node1, Set.empty)
p1.merge(p2) should be(expected)
p2.merge(p1) should be(expected)
}
"merge seen correctly" in {
val p1 = PruningState(node1, PruningInitialized(Set(node2.address)))
val p2 = PruningState(node1, PruningInitialized(Set(node4.address)))
val expected = PruningState(node1, PruningInitialized(Set(node2.address, node4.address)))
val p1 = PruningInitialized(node1, Set(node2.address))
val p2 = PruningInitialized(node1, Set(node4.address))
val expected = PruningInitialized(node1, Set(node2.address, node4.address))
p1.merge(p2) should be(expected)
p2.merge(p1) should be(expected)
}

View file

@ -70,8 +70,8 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Changed(keyA)(data1))
checkSerialization(DataEnvelope(data1))
checkSerialization(DataEnvelope(data1, pruning = Map(
address1 PruningState(address2, PruningPerformed),
address3 PruningState(address2, PruningInitialized(Set(address1.address))))))
address1 PruningPerformed(System.currentTimeMillis()),
address3 PruningInitialized(address2, Set(address1.address)))))
checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(WriteAck)
checkSerialization(WriteNack)
@ -85,6 +85,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"A" DataEnvelope(data1),
"B" DataEnvelope(GSet() + "b" + "c")), sendBack = true))
checkSerialization(new DurableDataEnvelope(data1))
checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map(
address1 PruningPerformed(System.currentTimeMillis()),
address3 PruningInitialized(address2, Set(address1.address))))))
}
}