From 952be31a7dee602a11c770ee7e9a94d911f024a5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Jan 2017 13:19:45 +0100 Subject: [PATCH] make pruning of CRDT garbage work, #21647 * fix merge issues of DataEnvelope and its pruning * simplify by removing the tombstones, which didn't work in all cases anyway * keep the PruningPerformed markers in the DataEnvelope until configured TTL has elapsed (wall clock) * simplify PruningState structure * also store the pruning markers in durable data * collect removed nodes from the data, listing on MemberRemoved is not enough * possibility to disable pruning altogether * documented caveat for durable data --- .../protobuf/msg/ReplicatorMessages.java | 499 +++++++++++++++++- .../main/protobuf/ReplicatorMessages.proto | 2 + .../src/main/resources/reference.conf | 33 +- .../akka/cluster/ddata/DurableStore.scala | 21 +- .../scala/akka/cluster/ddata/GCounter.scala | 2 + .../scala/akka/cluster/ddata/LWWMap.scala | 3 + .../main/scala/akka/cluster/ddata/ORMap.scala | 7 + .../scala/akka/cluster/ddata/ORMultiMap.scala | 3 + .../main/scala/akka/cluster/ddata/ORSet.scala | 3 + .../scala/akka/cluster/ddata/PNCounter.scala | 3 + .../akka/cluster/ddata/PNCounterMap.scala | 3 + .../akka/cluster/ddata/PruningState.scala | 37 +- .../akka/cluster/ddata/ReplicatedData.scala | 10 + .../scala/akka/cluster/ddata/Replicator.scala | 287 +++++----- .../akka/cluster/ddata/VersionVector.scala | 6 + .../ReplicatorMessageSerializer.scala | 46 +- .../akka/cluster/ddata/DurableDataSpec.scala | 2 +- .../cluster/ddata/DurablePruningSpec.scala | 27 +- .../cluster/ddata/ReplicatorPruningSpec.scala | 23 +- .../ddata/JavaImplOfReplicatedData.java | 5 + .../akka/cluster/ddata/DataEnvelopeSpec.scala | 70 +++ .../akka/cluster/ddata/GCounterSpec.scala | 4 + .../akka/cluster/ddata/PNCounterSpec.scala | 4 + .../akka/cluster/ddata/PruningStateSpec.scala | 35 +- .../ReplicatorMessageSerializerSpec.scala | 7 +- akka-docs/rst/java/distributed-data.rst | 15 +- akka-docs/rst/scala/distributed-data.rst | 15 +- project/MiMa.scala | 8 +- 28 files changed, 951 insertions(+), 229 deletions(-) create mode 100644 akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java index 227f148965..590254ef83 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java @@ -7622,6 +7622,16 @@ public final class ReplicatorMessages { */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.AddressOrBuilder getSeenOrBuilder( int index); + + // optional sint64 obsoleteTime = 5; + /** + * optional sint64 obsoleteTime = 5; + */ + boolean hasObsoleteTime(); + /** + * optional sint64 obsoleteTime = 5; + */ + long getObsoleteTime(); } /** * Protobuf type {@code akka.cluster.ddata.DataEnvelope.PruningEntry} @@ -7713,6 +7723,11 @@ public final class ReplicatorMessages { seen_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Address.PARSER, extensionRegistry)); break; } + case 40: { + bitField0_ |= 0x00000008; + obsoleteTime_ = input.readSInt64(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -7852,11 +7867,28 @@ public final class ReplicatorMessages { return seen_.get(index); } + // optional sint64 obsoleteTime = 5; + public static final int OBSOLETETIME_FIELD_NUMBER = 5; + private long obsoleteTime_; + /** + * optional sint64 obsoleteTime = 5; + */ + public boolean hasObsoleteTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional sint64 obsoleteTime = 5; + */ + public long getObsoleteTime() { + return obsoleteTime_; + } + private void initFields() { removedAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); ownerAddress_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); performed_ = false; seen_ = java.util.Collections.emptyList(); + obsoleteTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7908,6 +7940,9 @@ public final class ReplicatorMessages { for (int i = 0; i < seen_.size(); i++) { output.writeMessage(4, seen_.get(i)); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeSInt64(5, obsoleteTime_); + } getUnknownFields().writeTo(output); } @@ -7933,6 +7968,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(4, seen_.get(i)); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += akka.protobuf.CodedOutputStream + .computeSInt64Size(5, obsoleteTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8072,6 +8111,8 @@ public final class ReplicatorMessages { } else { seenBuilder_.clear(); } + obsoleteTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -8129,6 +8170,10 @@ public final class ReplicatorMessages { } else { result.seen_ = seenBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.obsoleteTime_ = obsoleteTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8180,6 +8225,9 @@ public final class ReplicatorMessages { } } } + if (other.hasObsoleteTime()) { + setObsoleteTime(other.getObsoleteTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8740,6 +8788,39 @@ public final class ReplicatorMessages { return seenBuilder_; } + // optional sint64 obsoleteTime = 5; + private long obsoleteTime_ ; + /** + * optional sint64 obsoleteTime = 5; + */ + public boolean hasObsoleteTime() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional sint64 obsoleteTime = 5; + */ + public long getObsoleteTime() { + return obsoleteTime_; + } + /** + * optional sint64 obsoleteTime = 5; + */ + public Builder setObsoleteTime(long value) { + bitField0_ |= 0x00000010; + obsoleteTime_ = value; + onChanged(); + return this; + } + /** + * optional sint64 obsoleteTime = 5; + */ + public Builder clearObsoleteTime() { + bitField0_ = (bitField0_ & ~0x00000010); + obsoleteTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DataEnvelope.PruningEntry) } @@ -14781,6 +14862,31 @@ public final class ReplicatorMessages { * required .akka.cluster.ddata.OtherMessage data = 1; */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder(); + + // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + java.util.List + getPruningList(); + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index); + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + int getPruningCount(); + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + java.util.List + getPruningOrBuilderList(); + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder( + int index); } /** * Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope} @@ -14846,6 +14952,14 @@ public final class ReplicatorMessages { bitField0_ |= 0x00000001; break; } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + pruning_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000002; + } + pruning_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.PARSER, extensionRegistry)); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -14854,6 +14968,9 @@ public final class ReplicatorMessages { throw new akka.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + pruning_ = java.util.Collections.unmodifiableList(pruning_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -14908,8 +15025,45 @@ public final class ReplicatorMessages { return data_; } + // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + public static final int PRUNING_FIELD_NUMBER = 2; + private java.util.List pruning_; + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public java.util.List getPruningList() { + return pruning_; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public java.util.List + getPruningOrBuilderList() { + return pruning_; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public int getPruningCount() { + return pruning_.size(); + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) { + return pruning_.get(index); + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder( + int index) { + return pruning_.get(index); + } + private void initFields() { data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance(); + pruning_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14924,6 +15078,12 @@ public final class ReplicatorMessages { memoizedIsInitialized = 0; return false; } + for (int i = 0; i < getPruningCount(); i++) { + if (!getPruning(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -14934,6 +15094,9 @@ public final class ReplicatorMessages { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeMessage(1, data_); } + for (int i = 0; i < pruning_.size(); i++) { + output.writeMessage(2, pruning_.get(i)); + } getUnknownFields().writeTo(output); } @@ -14947,6 +15110,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(1, data_); } + for (int i = 0; i < pruning_.size(); i++) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(2, pruning_.get(i)); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -15056,6 +15223,7 @@ public final class ReplicatorMessages { private void maybeForceBuilderInitialization() { if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getDataFieldBuilder(); + getPruningFieldBuilder(); } } private static Builder create() { @@ -15070,6 +15238,12 @@ public final class ReplicatorMessages { dataBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000001); + if (pruningBuilder_ == null) { + pruning_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + } else { + pruningBuilder_.clear(); + } return this; } @@ -15106,6 +15280,15 @@ public final class ReplicatorMessages { } else { result.data_ = dataBuilder_.build(); } + if (pruningBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002)) { + pruning_ = java.util.Collections.unmodifiableList(pruning_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.pruning_ = pruning_; + } else { + result.pruning_ = pruningBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15125,6 +15308,32 @@ public final class ReplicatorMessages { if (other.hasData()) { mergeData(other.getData()); } + if (pruningBuilder_ == null) { + if (!other.pruning_.isEmpty()) { + if (pruning_.isEmpty()) { + pruning_ = other.pruning_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensurePruningIsMutable(); + pruning_.addAll(other.pruning_); + } + onChanged(); + } + } else { + if (!other.pruning_.isEmpty()) { + if (pruningBuilder_.isEmpty()) { + pruningBuilder_.dispose(); + pruningBuilder_ = null; + pruning_ = other.pruning_; + bitField0_ = (bitField0_ & ~0x00000002); + pruningBuilder_ = + akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getPruningFieldBuilder() : null; + } else { + pruningBuilder_.addAllMessages(other.pruning_); + } + } + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -15138,6 +15347,12 @@ public final class ReplicatorMessages { return false; } + for (int i = 0; i < getPruningCount(); i++) { + if (!getPruning(i).isInitialized()) { + + return false; + } + } return true; } @@ -15277,6 +15492,246 @@ public final class ReplicatorMessages { return dataBuilder_; } + // repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + private java.util.List pruning_ = + java.util.Collections.emptyList(); + private void ensurePruningIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + pruning_ = new java.util.ArrayList(pruning_); + bitField0_ |= 0x00000002; + } + } + + private akka.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder> pruningBuilder_; + + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public java.util.List getPruningList() { + if (pruningBuilder_ == null) { + return java.util.Collections.unmodifiableList(pruning_); + } else { + return pruningBuilder_.getMessageList(); + } + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public int getPruningCount() { + if (pruningBuilder_ == null) { + return pruning_.size(); + } else { + return pruningBuilder_.getCount(); + } + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry getPruning(int index) { + if (pruningBuilder_ == null) { + return pruning_.get(index); + } else { + return pruningBuilder_.getMessage(index); + } + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder setPruning( + int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) { + if (pruningBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePruningIsMutable(); + pruning_.set(index, value); + onChanged(); + } else { + pruningBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder setPruning( + int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) { + if (pruningBuilder_ == null) { + ensurePruningIsMutable(); + pruning_.set(index, builderForValue.build()); + onChanged(); + } else { + pruningBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder addPruning(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) { + if (pruningBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePruningIsMutable(); + pruning_.add(value); + onChanged(); + } else { + pruningBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder addPruning( + int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry value) { + if (pruningBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePruningIsMutable(); + pruning_.add(index, value); + onChanged(); + } else { + pruningBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder addPruning( + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) { + if (pruningBuilder_ == null) { + ensurePruningIsMutable(); + pruning_.add(builderForValue.build()); + onChanged(); + } else { + pruningBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder addPruning( + int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder builderForValue) { + if (pruningBuilder_ == null) { + ensurePruningIsMutable(); + pruning_.add(index, builderForValue.build()); + onChanged(); + } else { + pruningBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder addAllPruning( + java.lang.Iterable values) { + if (pruningBuilder_ == null) { + ensurePruningIsMutable(); + super.addAll(values, pruning_); + onChanged(); + } else { + pruningBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder clearPruning() { + if (pruningBuilder_ == null) { + pruning_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + } else { + pruningBuilder_.clear(); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public Builder removePruning(int index) { + if (pruningBuilder_ == null) { + ensurePruningIsMutable(); + pruning_.remove(index); + onChanged(); + } else { + pruningBuilder_.remove(index); + } + return this; + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder getPruningBuilder( + int index) { + return getPruningFieldBuilder().getBuilder(index); + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder getPruningOrBuilder( + int index) { + if (pruningBuilder_ == null) { + return pruning_.get(index); } else { + return pruningBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public java.util.List + getPruningOrBuilderList() { + if (pruningBuilder_ != null) { + return pruningBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(pruning_); + } + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder() { + return getPruningFieldBuilder().addBuilder( + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder addPruningBuilder( + int index) { + return getPruningFieldBuilder().addBuilder( + index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.DataEnvelope.PruningEntry pruning = 2; + */ + public java.util.List + getPruningBuilderList() { + return getPruningFieldBuilder().getBuilderList(); + } + private akka.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder> + getPruningFieldBuilder() { + if (pruningBuilder_ == null) { + pruningBuilder_ = new akka.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PruningEntryOrBuilder>( + pruning_, + ((bitField0_ & 0x00000002) == 0x00000002), + getParentForChildren(), + isClean()); + pruning_ = null; + } + return pruningBuilder_; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DurableDataEnvelope) } @@ -15431,32 +15886,34 @@ public final class ReplicatorMessages { "key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.clust" + "er.ddata.DataEnvelope\"\007\n\005Empty\"\023\n\004Read\022\013" + "\n\003key\030\001 \002(\t\"@\n\nReadResult\0222\n\010envelope\030\001 " + - "\001(\0132 .akka.cluster.ddata.DataEnvelope\"\301\002" + + "\001(\0132 .akka.cluster.ddata.DataEnvelope\"\327\002" + "\n\014DataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.clus" + "ter.ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132" + "-.akka.cluster.ddata.DataEnvelope.Prunin" + - "gEntry\032\300\001\n\014PruningEntry\0229\n\016removedAddres" + + "gEntry\032\326\001\n\014PruningEntry\0229\n\016removedAddres" + "s\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddre", "ss\0227\n\014ownerAddress\030\002 \002(\0132!.akka.cluster." + "ddata.UniqueAddress\022\021\n\tperformed\030\003 \002(\010\022)" + "\n\004seen\030\004 \003(\0132\033.akka.cluster.ddata.Addres" + - "s\"\203\001\n\006Status\022\r\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks" + - "\030\002 \002(\r\0221\n\007entries\030\003 \003(\0132 .akka.cluster.d" + - "data.Status.Entry\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022" + - "\016\n\006digest\030\002 \002(\014\"\227\001\n\006Gossip\022\020\n\010sendBack\030\001" + - " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda" + - "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + - "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat", - "aEnvelope\"X\n\rUniqueAddress\022,\n\007address\030\001 " + - "\002(\0132\033.akka.cluster.ddata.Address\022\013\n\003uid\030" + - "\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostna" + - "me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" + - "\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" + - "\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" + - "GSet\022\020\n\010elements\030\001 \003(\t\"E\n\023DurableDataEnv" + - "elope\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata" + - ".OtherMessageB#\n\037akka.cluster.ddata.prot" + - "obuf.msgH\001" + "s\022\024\n\014obsoleteTime\030\005 \001(\022\"\203\001\n\006Status\022\r\n\005ch" + + "unk\030\001 \002(\r\022\021\n\ttotChunks\030\002 \002(\r\0221\n\007entries\030" + + "\003 \003(\0132 .akka.cluster.ddata.Status.Entry\032" + + "$\n\005Entry\022\013\n\003key\030\001 \002(\t\022\016\n\006digest\030\002 \002(\014\"\227\001" + + "\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007entries\030\002 " + + "\003(\0132 .akka.cluster.ddata.Gossip.Entry\032H\n" + + "\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .", + "akka.cluster.ddata.DataEnvelope\"X\n\rUniqu" + + "eAddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster" + + ".ddata.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(" + + "\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002" + + " \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMessage\030" + + "\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" + + "nifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030\001" + + " \003(\t\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 \002" + + "(\0132 .akka.cluster.ddata.OtherMessage\022>\n\007" + + "pruning\030\002 \003(\0132-.akka.cluster.ddata.DataE", + "nvelope.PruningEntryB#\n\037akka.cluster.dda" + + "ta.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15540,7 +15997,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_DataEnvelope_PruningEntry_descriptor, - new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", }); + new java.lang.String[] { "RemovedAddress", "OwnerAddress", "Performed", "Seen", "ObsoleteTime", }); internal_static_akka_cluster_ddata_Status_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_akka_cluster_ddata_Status_fieldAccessorTable = new @@ -15594,7 +16051,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor, - new java.lang.String[] { "Data", }); + new java.lang.String[] { "Data", "Pruning", }); return null; } }; diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index 69755f2314..7c57b84215 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -68,6 +68,7 @@ message DataEnvelope { required UniqueAddress ownerAddress = 2; required bool performed = 3; repeated Address seen = 4; + optional sint64 obsoleteTime = 5; } required OtherMessage data = 1; @@ -119,4 +120,5 @@ message StringGSet { message DurableDataEnvelope { required OtherMessage data = 1; + repeated DataEnvelope.PruningEntry pruning = 2; } diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index ad8a7502a1..0df2bf195f 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -32,14 +32,26 @@ akka.cluster.distributed-data { use-dispatcher = "" # How often the Replicator checks for pruning of data associated with - # removed cluster nodes. - pruning-interval = 30 s + # removed cluster nodes. If this is set to 'off' the pruning feature will + # be completely disabled. + pruning-interval = 120 s - # How long time it takes (worst case) to spread the data to all other replica nodes. + # How long time it takes to spread the data to all other replica nodes. # This is used when initiating and completing the pruning process of data associated # with removed cluster nodes. The time measurement is stopped when any replica is - # unreachable, so it should be configured to worst case in a healthy cluster. - max-pruning-dissemination = 60 s + # unreachable, but it's still recommended to configure this with certain margin. + # It should be in the magnitude of minutes even though typical dissemination time + # is shorter (grows logarithmic with number of nodes). There is no advantage of + # setting this too low. Setting it to large value will delay the pruning process. + max-pruning-dissemination = 300 s + + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is somehow + # injected and merged with existing data after this time the value will not be correct. + # This would be possible (although unlikely) in the case of a long network partition. + # It should be in the magnitude of hours. For durable data it is configured by + # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 6 h # Serialized Write and Read messages are cached when they are sent to # several nodes. If no further activity they are removed from the cache @@ -51,6 +63,17 @@ akka.cluster.distributed-data { # end of a key. keys = [] + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is + # injected and merged with existing data after this time the value will not be correct. + # This would be possible if replica with durable data didn't participate in the pruning + # (e.g. it was shutdown) and later started after this time. A durable replica should not + # be stopped for longer time than this duration and if it is joining again after this + # duration its data should first be manually removed (from the lmdb directory). + # It should be in the magnitude of days. Note that there is a corresponding setting + # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 10 d + # Fully qualified class name of the durable store actor. It must be a subclass # of akka.actor.Actor and handle the protocol defined in # akka.cluster.ddata.DurableStore. The class must have a constructor with diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala index fd9d6437c5..04f744936e 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala @@ -19,6 +19,7 @@ import akka.actor.DeadLetterSuppression import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ddata.Replicator.ReplicatorMessage +import akka.cluster.ddata.Replicator.Internal.DataEnvelope import akka.io.DirectByteBufferPool import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest @@ -52,7 +53,7 @@ object DurableStore { * should be used to signal success or failure of the operation to the contained * `replyTo` actor. */ - final case class Store(key: String, data: ReplicatedData, reply: Option[StoreReply]) + final case class Store(key: String, data: DurableDataEnvelope, reply: Option[StoreReply]) final case class StoreReply(successMsg: Any, failureMsg: Any, replyTo: ActorRef) /** @@ -65,7 +66,7 @@ object DurableStore { * will stop itself and the durable store. */ case object LoadAll - final case class LoadData(data: Map[String, ReplicatedData]) + final case class LoadData(data: Map[String, DurableDataEnvelope]) case object LoadAllCompleted class LoadFailed(message: String, cause: Throwable) extends RuntimeException(message, cause) { def this(message: String) = this(message, null) @@ -77,7 +78,13 @@ object DurableStore { * the wrapped `ReplicatedData` including its serializerId and * manifest. */ - final class DurableDataEnvelope(val data: ReplicatedData) extends ReplicatorMessage { + final class DurableDataEnvelope private[akka] ( + private[akka] val dataEnvelope: DataEnvelope) extends ReplicatorMessage { + + def this(data: ReplicatedData) = this(DataEnvelope(data)) + + def data: ReplicatedData = dataEnvelope.data + override def toString(): String = s"DurableDataEnvelope($data)" override def hashCode(): Int = data.hashCode override def equals(o: Any): Boolean = o match { @@ -136,7 +143,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { } // pending write behind - val pending = new java.util.HashMap[String, ReplicatedData] + val pending = new java.util.HashMap[String, DurableDataEnvelope] override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) @@ -171,7 +178,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { val valArray = Array.ofDim[Byte](entry.`val`.remaining) entry.`val`.get(valArray) val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope] - key → envelope.data + key → envelope }.toMap) if (loadData.data.nonEmpty) sender() ! loadData @@ -220,10 +227,10 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { writeBehind() } - def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: ReplicatedData): Unit = { + def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: DurableDataEnvelope): Unit = { try { keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip() - val value = serializer.toBinary(new DurableDataEnvelope(data)) + val value = serializer.toBinary(data) ensureValueBufferSize(value.length) valueBuffer.put(value).flip() tx match { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala index aba050fc73..3d1148f431 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala @@ -102,6 +102,8 @@ final class GCounter private[akka] ( new GCounter(merged) } + override def modifiedByNodes: Set[UniqueAddress] = state.keySet + override def needPruningFrom(removedNode: UniqueAddress): Boolean = state.contains(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala index 144db4609e..9a2c15a050 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala @@ -143,6 +143,9 @@ final class LWWMap[A, B] private[akka] ( override def merge(that: LWWMap[A, B]): LWWMap[A, B] = new LWWMap(underlying.merge(that.underlying)) + override def modifiedByNodes: Set[UniqueAddress] = + underlying.modifiedByNodes + override def needPruningFrom(removedNode: UniqueAddress): Boolean = underlying.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 1c756d10ec..309d052ddc 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -177,6 +177,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( new ORMap(mergedKeys, mergedValues) } + override def modifiedByNodes: Set[UniqueAddress] = { + keys.modifiedByNodes union values.foldLeft(Set.empty[UniqueAddress]) { + case (acc, (_, data: RemovedNodePruning)) ⇒ acc union data.modifiedByNodes + case (acc, _) ⇒ acc + } + } + override def needPruningFrom(removedNode: UniqueAddress): Boolean = { keys.needPruningFrom(removedNode) || values.exists { case (_, data: RemovedNodePruning) ⇒ data.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index ae127aeb54..4bfea5b4f4 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -204,6 +204,9 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[ else this + override def modifiedByNodes: Set[UniqueAddress] = + underlying.modifiedByNodes + override def needPruningFrom(removedNode: UniqueAddress): Boolean = underlying.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index d58b83630c..17b37d018e 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -307,6 +307,9 @@ final class ORSet[A] private[akka] ( } } + override def modifiedByNodes: Set[UniqueAddress] = + vvector.modifiedByNodes + override def needPruningFrom(removedNode: UniqueAddress): Boolean = vvector.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala index 78e1124922..7135becc3b 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala @@ -94,6 +94,9 @@ final class PNCounter private[akka] ( increments = that.increments.merge(this.increments), decrements = that.decrements.merge(this.decrements)) + override def modifiedByNodes: Set[UniqueAddress] = + increments.modifiedByNodes union decrements.modifiedByNodes + override def needPruningFrom(removedNode: UniqueAddress): Boolean = increments.needPruningFrom(removedNode) || decrements.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala index 593e81f000..301800a3df 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala @@ -123,6 +123,9 @@ final class PNCounterMap[A] private[akka] ( override def merge(that: PNCounterMap[A]): PNCounterMap[A] = new PNCounterMap(underlying.merge(that.underlying)) + override def modifiedByNodes: Set[UniqueAddress] = + underlying.modifiedByNodes + override def needPruningFrom(removedNode: UniqueAddress): Boolean = underlying.needPruningFrom(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala index 0c089c2d83..d16bd79c43 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala @@ -11,36 +11,37 @@ import akka.cluster.UniqueAddress * INTERNAL API */ private[akka] object PruningState { - sealed trait PruningPhase - final case class PruningInitialized(seen: Set[Address]) extends PruningPhase - case object PruningPerformed extends PruningPhase + final case class PruningInitialized(owner: UniqueAddress, seen: Set[Address]) extends PruningState { + override def addSeen(node: Address): PruningState = { + if (seen(node) || owner.address == node) this + else copy(seen = seen + node) + } + } + final case class PruningPerformed(obsoleteTime: Long) extends PruningState { + def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime + } } /** * INTERNAL API */ -private[akka] final case class PruningState(owner: UniqueAddress, phase: PruningState.PruningPhase) { +private[akka] sealed trait PruningState { import PruningState._ def merge(that: PruningState): PruningState = - (this.phase, that.phase) match { - // FIXME this will add the PruningPerformed back again when one is None - case (PruningPerformed, _) ⇒ this - case (_, PruningPerformed) ⇒ that - case (PruningInitialized(thisSeen), PruningInitialized(thatSeen)) ⇒ - if (this.owner == that.owner) - copy(phase = PruningInitialized(thisSeen union thatSeen)) - else if (Member.addressOrdering.compare(this.owner.address, that.owner.address) > 0) + (this, that) match { + case (p1: PruningPerformed, p2: PruningPerformed) ⇒ if (p1.obsoleteTime >= p2.obsoleteTime) this else that + case (_: PruningPerformed, _) ⇒ this + case (_, _: PruningPerformed) ⇒ that + case (PruningInitialized(thisOwner, thisSeen), PruningInitialized(thatOwner, thatSeen)) ⇒ + if (thisOwner == thatOwner) + PruningInitialized(thisOwner, thisSeen union thatSeen) + else if (Member.addressOrdering.compare(thisOwner.address, thatOwner.address) > 0) that else this } - def addSeen(node: Address): PruningState = phase match { - case PruningInitialized(seen) ⇒ - if (seen(node) || owner.address == node) this - else copy(phase = PruningInitialized(seen + node)) - case _ ⇒ this - } + def addSeen(node: Address): PruningState = this } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala index 6c7cb9827a..b93793e0dd 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala @@ -67,9 +67,19 @@ abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends Re * When a node is removed from the cluster these methods will be * used by the [[Replicator]] to collapse data from the removed node * into some other node in the cluster. + * + * See process description in the 'CRDT Garbage' section of the [[Replicator]] + * documentation. */ trait RemovedNodePruning extends ReplicatedData { + /** + * The nodes that have changed the state for this data + * and would need pruning when such node is no longer part + * of the cluster. + */ + def modifiedByNodes: Set[UniqueAddress] + /** * Does it have any state changes from a specific node, * which has been removed from the cluster. diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 405d517689..86827413b9 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -43,6 +43,7 @@ import akka.actor.SupervisorStrategy import akka.actor.OneForOneStrategy import akka.actor.ActorInitializationException import java.util.concurrent.TimeUnit +import akka.util.Helpers.toRootLowerCase object ReplicatorSettings { @@ -63,6 +64,11 @@ object ReplicatorSettings { case id ⇒ id } + val pruningInterval = toRootLowerCase(config.getString("pruning-interval")) match { + case "off" | "false" ⇒ Duration.Zero + case _ ⇒ config.getDuration("pruning-interval", MILLISECONDS).millis + } + import scala.collection.JavaConverters._ new ReplicatorSettings( role = roleOption(config.getString("role")), @@ -70,10 +76,12 @@ object ReplicatorSettings { notifySubscribersInterval = config.getDuration("notify-subscribers-interval", MILLISECONDS).millis, maxDeltaElements = config.getInt("max-delta-elements"), dispatcher = dispatcher, - pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis, + pruningInterval = pruningInterval, maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis, durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))), - durableKeys = config.getStringList("durable.keys").asScala.toSet) + durableKeys = config.getStringList("durable.keys").asScala.toSet, + pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis, + durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis) } /** @@ -110,21 +118,30 @@ object ReplicatorSettings { * in the `Set`. */ final class ReplicatorSettings( - val role: Option[String], - val gossipInterval: FiniteDuration, - val notifySubscribersInterval: FiniteDuration, - val maxDeltaElements: Int, - val dispatcher: String, - val pruningInterval: FiniteDuration, - val maxPruningDissemination: FiniteDuration, - val durableStoreProps: Either[(String, Config), Props], - val durableKeys: Set[String]) { + val role: Option[String], + val gossipInterval: FiniteDuration, + val notifySubscribersInterval: FiniteDuration, + val maxDeltaElements: Int, + val dispatcher: String, + val pruningInterval: FiniteDuration, + val maxPruningDissemination: FiniteDuration, + val durableStoreProps: Either[(String, Config), Props], + val durableKeys: Set[String], + val pruningMarkerTimeToLive: FiniteDuration, + val durablePruningMarkerTimeToLive: FiniteDuration) { // For backwards compatibility def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) = this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, - maxPruningDissemination, Right(Props.empty), Set.empty) + maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days) + + // For backwards compatibility + def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, + maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration, + durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) = + this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, + maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days) def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role)) @@ -150,6 +167,13 @@ final class ReplicatorSettings( def withPruning(pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration): ReplicatorSettings = copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination) + def withPruningMarkerTimeToLive( + pruningMarkerTimeToLive: FiniteDuration, + durablePruningMarkerTimeToLive: FiniteDuration): ReplicatorSettings = + copy( + pruningMarkerTimeToLive = pruningMarkerTimeToLive, + durablePruningMarkerTimeToLive = durablePruningMarkerTimeToLive) + def withDurableStoreProps(durableStoreProps: Props): ReplicatorSettings = copy(durableStoreProps = Right(durableStoreProps)) @@ -168,17 +192,20 @@ final class ReplicatorSettings( } private def copy( - role: Option[String] = role, - gossipInterval: FiniteDuration = gossipInterval, - notifySubscribersInterval: FiniteDuration = notifySubscribersInterval, - maxDeltaElements: Int = maxDeltaElements, - dispatcher: String = dispatcher, - pruningInterval: FiniteDuration = pruningInterval, - maxPruningDissemination: FiniteDuration = maxPruningDissemination, - durableStoreProps: Either[(String, Config), Props] = durableStoreProps, - durableKeys: Set[String] = durableKeys): ReplicatorSettings = + role: Option[String] = role, + gossipInterval: FiniteDuration = gossipInterval, + notifySubscribersInterval: FiniteDuration = notifySubscribersInterval, + maxDeltaElements: Int = maxDeltaElements, + dispatcher: String = dispatcher, + pruningInterval: FiniteDuration = pruningInterval, + maxPruningDissemination: FiniteDuration = maxPruningDissemination, + durableStoreProps: Either[(String, Config), Props] = durableStoreProps, + durableKeys: Set[String] = durableKeys, + pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive, + durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive): ReplicatorSettings = new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, - pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys) + pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys, + pruningMarkerTimeToLive, durablePruningMarkerTimeToLive) } object Replicator { @@ -562,6 +589,9 @@ object Replicator { val LazyDigest: Digest = ByteString(0) val NotFoundDigest: Digest = ByteString(-1) + /** + * The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry. + */ final case class DataEnvelope( data: ReplicatedData, pruning: Map[UniqueAddress, PruningState] = Map.empty) @@ -576,36 +606,51 @@ object Replicator { } def initRemovedNodePruning(removed: UniqueAddress, owner: UniqueAddress): DataEnvelope = { - copy(pruning = pruning.updated(removed, PruningState(owner, PruningInitialized(Set.empty)))) + copy(pruning = pruning.updated(removed, PruningInitialized(owner, Set.empty))) } - def prune(from: UniqueAddress): DataEnvelope = { + def prune(from: UniqueAddress, pruningPerformed: PruningPerformed): DataEnvelope = { data match { case dataWithRemovedNodePruning: RemovedNodePruning ⇒ require(pruning.contains(from)) - val to = pruning(from).owner - val prunedData = dataWithRemovedNodePruning.prune(from, to) - copy(data = prunedData, pruning = pruning.updated(from, PruningState(to, PruningPerformed))) + pruning(from) match { + case PruningInitialized(owner, _) ⇒ + val prunedData = dataWithRemovedNodePruning.prune(from, owner) + copy(data = prunedData, pruning = pruning.updated(from, pruningPerformed)) + case _ ⇒ + this + } + case _ ⇒ this } - } def merge(other: DataEnvelope): DataEnvelope = if (other.data == DeletedData) DeletedEnvelope else { - var mergedRemovedNodePruning = other.pruning - for ((key, thisValue) ← pruning) { - mergedRemovedNodePruning.get(key) match { - case None ⇒ - mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue) - case Some(thatValue) ⇒ - mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue merge thatValue) + val mergedPruning = + pruning.foldLeft(other.pruning) { + case (acc, (key, thisValue)) ⇒ + acc.get(key) match { + case None ⇒ + acc.updated(key, thisValue) + case Some(thatValue) ⇒ + acc.updated(key, thisValue merge thatValue) + } + } + val filteredMergedPruning = { + if (mergedPruning.isEmpty) mergedPruning + else { + val currentTime = System.currentTimeMillis() + mergedPruning.filter { + case (_, p: PruningPerformed) ⇒ !p.isObsolete(currentTime) + case _ ⇒ true + } } } // cleanup both sides before merging, `merge((otherData: ReplicatedData)` will cleanup other.data - copy(data = cleaned(data, mergedRemovedNodePruning), pruning = mergedRemovedNodePruning).merge(other.data) + copy(data = cleaned(data, filteredMergedPruning), pruning = filteredMergedPruning).merge(other.data) } def merge(otherData: ReplicatedData): DataEnvelope = @@ -613,7 +658,7 @@ object Replicator { else copy(data = data merge cleaned(otherData, pruning).asInstanceOf[data.T]) private def cleaned(c: ReplicatedData, p: Map[UniqueAddress, PruningState]): ReplicatedData = p.foldLeft(c) { - case (c: RemovedNodePruning, (removed, PruningState(_, PruningPerformed))) ⇒ + case (c: RemovedNodePruning, (removed, _: PruningPerformed)) ⇒ if (c.needPruningFrom(removed)) c.pruningCleanup(removed) else c case (c, _) ⇒ c } @@ -801,7 +846,8 @@ object Replicator { *
  • When a node is removed from the cluster it is first important that all updates that were * done by that node are disseminated to all other nodes. The pruning will not start before the * `maxPruningDissemination` duration has elapsed. The time measurement is stopped when any - * replica is unreachable, so it should be configured to worst case in a healthy cluster.
  • + * replica is unreachable, but it's still recommended to configure this with certain margin. + * It should be in the magnitude of minutes. *
  • The nodes are ordered by their address and the node ordered first is called leader. * The leader initiates the pruning by adding a `PruningInitialized` marker in the data envelope. * This is gossiped to all other nodes and they mark it as seen when they receive it.
  • @@ -840,7 +886,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import context.dispatcher val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick) val notifyTask = context.system.scheduler.schedule(notifySubscribersInterval, notifySubscribersInterval, self, FlushChanges) - val pruningTask = context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick) + val pruningTask = + if (pruningInterval >= Duration.Zero) + Some(context.system.scheduler.schedule(pruningInterval, pruningInterval, self, RemovedNodePruningTick)) + else None val clockTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, ClockTick) val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope]) @@ -867,11 +916,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // cluster weaklyUp nodes, doesn't contain selfAddress var weaklyUpNodes: Set[Address] = Set.empty - // nodes removed from cluster, to be pruned, and tombstoned var removedNodes: Map[UniqueAddress, Long] = Map.empty - var pruningPerformed: Map[UniqueAddress, Long] = Map.empty - var tombstoneNodes: Set[UniqueAddress] = Set.empty - var leader: Option[Address] = None def isLeader: Boolean = leader.exists(_ == selfAddress) @@ -921,7 +966,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog cluster.unsubscribe(self) gossipTask.cancel() notifyTask.cancel() - pruningTask.cancel() + pruningTask.foreach(_.cancel()) clockTask.cancel() } @@ -964,11 +1009,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog count += data.size data.foreach { case (key, d) ⇒ - val envelope = DataEnvelope(d) - write(key, envelope) match { + write(key, d.dataEnvelope) match { case Some(newEnvelope) ⇒ - if (newEnvelope.data ne envelope.data) - durableStore ! Store(key, newEnvelope.data, None) + if (newEnvelope.data ne d.dataEnvelope.data) + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) case None ⇒ } } @@ -1059,18 +1103,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog localValue match { case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key, req) case Some(envelope @ DataEnvelope(existing, _)) ⇒ - existing.merge(modify(Some(existing)).asInstanceOf[existing.T]) - case None ⇒ modify(None) + envelope.merge(modify(Some(existing)).asInstanceOf[existing.T]) + case None ⇒ DataEnvelope(modify(None)) } } match { - case Success(newData) ⇒ - log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, newData) - val envelope = DataEnvelope(pruningCleanupTombstoned(newData)) + case Success(envelope) ⇒ + log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, envelope.data) setData(key.id, envelope) val durable = isDurable(key.id) if (isLocalUpdate(writeConsistency)) { if (durable) - durableStore ! Store(key.id, envelope.data, + durableStore ! Store(key.id, new DurableDataEnvelope(envelope), Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo))) else replyTo ! UpdateSuccess(key, req) @@ -1079,7 +1122,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { - durableStore ! Store(key.id, envelope.data, + durableStore ! Store(key.id, new DurableDataEnvelope(envelope), Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator))) } } @@ -1106,7 +1149,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog write(key, envelope) match { case Some(newEnvelope) ⇒ if (isDurable(key)) - durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, replyTo))) + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo))) else replyTo ! WriteAck case None ⇒ @@ -1115,10 +1158,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] = getData(key) match { - case Some(DataEnvelope(DeletedData, _)) ⇒ Some(writeEnvelope) // already deleted + case Some(DataEnvelope(DeletedData, _)) ⇒ Some(DeletedEnvelope) // already deleted case Some(envelope @ DataEnvelope(existing, _)) ⇒ if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) { - val merged = envelope.merge(pruningCleanupTombstoned(writeEnvelope)).addSeen(selfAddress) + val merged = envelope.merge(writeEnvelope).addSeen(selfAddress) setData(key, merged) Some(merged) } else { @@ -1128,16 +1171,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog None } case None ⇒ - val cleaned = pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress) - setData(key, cleaned) - Some(cleaned) + val writeEnvelope2 = writeEnvelope.addSeen(selfAddress) + setData(key, writeEnvelope2) + Some(writeEnvelope2) } def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = { write(key, writeEnvelope) match { case Some(newEnvelope) ⇒ if (isDurable(key)) - durableStore ! Store(key, newEnvelope.data, None) + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) case None ⇒ } replyTo ! ReadRepairAck @@ -1160,7 +1203,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val durable = isDurable(key.id) if (isLocalUpdate(consistency)) { if (durable) - durableStore ! Store(key.id, DeletedData, + durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope), Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), replyTo))) else replyTo ! DeleteSuccess(key, req) @@ -1169,7 +1212,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { - durableStore ! Store(key.id, DeletedData, + durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope), Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), writeAggregator))) } } @@ -1313,7 +1356,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog write(key, envelope) match { case Some(newEnvelope) ⇒ if (isDurable(key)) - durableStore ! Store(key, newEnvelope.data, None) + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) case None ⇒ } if (sendBack) getData(key) match { @@ -1380,6 +1423,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog else if (matchingRole(m)) { nodes -= m.address weaklyUpNodes -= m.address + log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress) removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) unreachable -= m.address } @@ -1402,12 +1446,31 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def receiveRemovedNodePruningTick(): Unit = { - if (isLeader && removedNodes.nonEmpty) { - initRemovedNodePruning() + // See 'CRDT Garbage' section in Replicator Scaladoc for description of the process + if (unreachable.isEmpty) { + if (isLeader) { + collectRemovedNodes() + initRemovedNodePruning() + } + performRemovedNodePruning() + deleteObsoletePruningPerformed() + } + } + + def collectRemovedNodes(): Unit = { + val knownNodes = nodes union weaklyUpNodes union removedNodes.keySet.map(_.address) + val newRemovedNodes = + dataEntries.foldLeft(Set.empty[UniqueAddress]) { + case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _))) ⇒ + acc union data.modifiedByNodes.filterNot(n ⇒ n == selfUniqueAddress || knownNodes(n.address)) + case (acc, _) ⇒ + acc + } + + newRemovedNodes.foreach { n ⇒ + log.debug("Adding removed node [{}] from data", n) + removedNodes = removedNodes.updated(n, allReachableClockTime) } - performRemovedNodePruning() - // FIXME tombstoneRemovedNodePruning doesn't work, since merge of PruningState will add the PruningPerformed back again - // tombstoneRemovedNodePruning() } def initRemovedNodePruning(): Unit = { @@ -1417,22 +1480,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog }(collection.breakOut) if (removedSet.nonEmpty) { - // FIXME handle pruning of durable data, this is difficult and requires more thought for ((key, (envelope, _)) ← dataEntries; removed ← removedSet) { def init(): Unit = { val newEnvelope = envelope.initRemovedNodePruning(removed, selfUniqueAddress) - log.debug("Initiated pruning of [{}] for data key [{}]", removed, key) + log.debug("Initiated pruning of [{}] for data key [{}] to [{}]", removed, key, selfUniqueAddress) setData(key, newEnvelope) } if (envelope.needPruningFrom(removed)) { envelope.data match { case dataWithRemovedNodePruning: RemovedNodePruning ⇒ - envelope.pruning.get(removed) match { case None ⇒ init() - case Some(PruningState(owner, PruningInitialized(_))) if owner != selfUniqueAddress ⇒ init() + case Some(PruningInitialized(owner, _)) if owner != selfUniqueAddress ⇒ init() case _ ⇒ // already in progress } case _ ⇒ @@ -1444,78 +1505,44 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def performRemovedNodePruning(): Unit = { // perform pruning when all seen Init + val allNodes = nodes union weaklyUpNodes + val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis) + val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis) dataEntries.foreach { case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _)) ⇒ pruning.foreach { - case (removed, PruningState(owner, PruningInitialized(seen))) if owner == selfUniqueAddress - && (nodes.isEmpty || nodes.forall(seen)) ⇒ - val newEnvelope = envelope.prune(removed) - pruningPerformed = pruningPerformed.updated(removed, allReachableClockTime) + case (removed, PruningInitialized(owner, seen)) if owner == selfUniqueAddress + && (allNodes.isEmpty || allNodes.forall(seen)) ⇒ + val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed) log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress) setData(key, newEnvelope) if ((newEnvelope.data ne data) && isDurable(key)) - durableStore ! Store(key, newEnvelope.data, None) + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) case _ ⇒ } case _ ⇒ // deleted, or pruning not needed } } - def tombstoneRemovedNodePruning(): Unit = { - - def allPruningPerformed(removed: UniqueAddress): Boolean = { - dataEntries forall { - case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _)) ⇒ - pruning.get(removed) match { - case Some(PruningState(_, PruningInitialized(_))) ⇒ false - case _ ⇒ true - } - case _ ⇒ true // deleted, or pruning not needed - } - } - - // FIXME pruningPerformed is only updated on one node, but tombstoneNodes should be on all - pruningPerformed.foreach { - case (removed, timestamp) if ((allReachableClockTime - timestamp) > maxPruningDisseminationNanos) && - allPruningPerformed(removed) ⇒ - log.debug("All pruning performed for [{}], tombstoned", removed) - pruningPerformed -= removed - removedNodes -= removed - tombstoneNodes += removed - dataEntries.foreach { - case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _)) ⇒ - val newEnvelope = pruningCleanupTombstoned(removed, envelope) - setData(key, newEnvelope) - if ((newEnvelope.data ne data) && isDurable(key)) - durableStore ! Store(key, newEnvelope.data, None) - case _ ⇒ // deleted, or pruning not needed + def deleteObsoletePruningPerformed(): Unit = { + val currentTime = System.currentTimeMillis() + dataEntries.foreach { + case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning), _)) ⇒ + val newEnvelope = pruning.foldLeft(envelope) { + case (acc, (removed, p: PruningPerformed)) if p.isObsolete(currentTime) ⇒ + log.debug("Removing obsolete pruning marker for [{}] in [{}]", removed, key) + removedNodes -= removed + acc.copy(pruning = acc.pruning - removed) + case (acc, _) ⇒ acc } - case (removed, timestamp) ⇒ // not ready + if (newEnvelope ne envelope) + setData(key, newEnvelope) + + case _ ⇒ // deleted, or pruning not needed } + } - def pruningCleanupTombstoned(envelope: DataEnvelope): DataEnvelope = - tombstoneNodes.foldLeft(envelope)((c, removed) ⇒ pruningCleanupTombstoned(removed, c)) - - def pruningCleanupTombstoned(removed: UniqueAddress, envelope: DataEnvelope): DataEnvelope = { - val pruningCleanuped = pruningCleanupTombstoned(removed, envelope.data) - if ((pruningCleanuped ne envelope.data) || envelope.pruning.contains(removed)) - envelope.copy(data = pruningCleanuped, pruning = envelope.pruning - removed) - else - envelope - } - - def pruningCleanupTombstoned(data: ReplicatedData): ReplicatedData = - if (tombstoneNodes.isEmpty) data - else tombstoneNodes.foldLeft(data)((c, removed) ⇒ pruningCleanupTombstoned(removed, c)) - - def pruningCleanupTombstoned(removed: UniqueAddress, data: ReplicatedData): ReplicatedData = - data match { - case dataWithRemovedNodePruning: RemovedNodePruning ⇒ - if (dataWithRemovedNodePruning.needPruningFrom(removed)) dataWithRemovedNodePruning.pruningCleanup(removed) else data - case _ ⇒ data - } - def receiveGetReplicaCount(): Unit = { // selfAddress is not included in the set replyTo ! ReplicaCount(nodes.size + 1) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 8cb16d974e..e7b126047a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -292,6 +292,9 @@ final case class OneVersionVector private[akka] (node: UniqueAddress, version: L } } + override def modifiedByNodes: Set[UniqueAddress] = + Set(node) + override def needPruningFrom(removedNode: UniqueAddress): Boolean = node == removedNode @@ -353,6 +356,9 @@ final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) exten } } + override def modifiedByNodes: Set[UniqueAddress] = + versions.keySet + override def needPruningFrom(removedNode: UniqueAddress): Boolean = versions.contains(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index 59fd93c8cb..8ba9dabede 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.DurableStore.DurableDataEnvelope import java.io.NotSerializableException +import akka.actor.Address /** * INTERNAL API @@ -155,6 +156,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) }(system.dispatcher) private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray + private val dummyAddress = UniqueAddress(Address("a", "b", "c", 2552), 1L) val GetManifest = "A" val GetSuccessManifest = "B" @@ -396,14 +398,17 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) dataEnvelope.pruning.foreach { case (removedAddress, state) ⇒ val b = dm.DataEnvelope.PruningEntry.newBuilder(). - setRemovedAddress(uniqueAddressToProto(removedAddress)). - setOwnerAddress(uniqueAddressToProto(state.owner)) - state.phase match { - case PruningState.PruningInitialized(seen) ⇒ + setRemovedAddress(uniqueAddressToProto(removedAddress)) + state match { + case PruningState.PruningInitialized(owner, seen) ⇒ seen.toVector.sorted(Member.addressOrdering).map(addressToProto).foreach { a ⇒ b.addSeen(a) } + b.setOwnerAddress(uniqueAddressToProto(owner)) b.setPerformed(false) - case PruningState.PruningPerformed ⇒ - b.setPerformed(true) + case PruningState.PruningPerformed(obsoleteTime) ⇒ + b.setPerformed(true).setObsoleteTime(obsoleteTime) + // TODO ownerAddress is only needed for PruningInitialized, but kept here for + // wire backwards compatibility with 2.4.16 (required field) + b.setOwnerAddress(uniqueAddressToProto(dummyAddress)) } dataEnvelopeBuilder.addPruning(b) } @@ -414,17 +419,28 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) dataEnvelopeFromProto(dm.DataEnvelope.parseFrom(bytes)) private def dataEnvelopeFromProto(dataEnvelope: dm.DataEnvelope): DataEnvelope = { - val pruning: Map[UniqueAddress, PruningState] = - dataEnvelope.getPruningList.asScala.map { pruningEntry ⇒ - val phase = - if (pruningEntry.getPerformed) PruningState.PruningPerformed - else PruningState.PruningInitialized(pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut)) - val state = PruningState(uniqueAddressFromProto(pruningEntry.getOwnerAddress), phase) + val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData] + val pruning = pruningFromProto(dataEnvelope.getPruningList) + DataEnvelope(data, pruning) + } + + private def pruningFromProto(pruningEntries: java.util.List[dm.DataEnvelope.PruningEntry]): Map[UniqueAddress, PruningState] = { + if (pruningEntries.isEmpty) + Map.empty + else + pruningEntries.asScala.map { pruningEntry ⇒ + val state = + if (pruningEntry.getPerformed) { + // for wire compatibility with Akka 2.4.x + val obsoleteTime = if (pruningEntry.hasObsoleteTime) pruningEntry.getObsoleteTime else Long.MaxValue + PruningState.PruningPerformed(obsoleteTime) + } else + PruningState.PruningInitialized( + uniqueAddressFromProto(pruningEntry.getOwnerAddress), + pruningEntry.getSeenList.asScala.map(addressFromProto)(breakOut)) val removed = uniqueAddressFromProto(pruningEntry.getRemovedAddress) removed → state }(breakOut) - val data = otherMessageFromProto(dataEnvelope.getData).asInstanceOf[ReplicatedData] - DataEnvelope(data, pruning) } private def writeToProto(write: Write): dm.Write = @@ -472,6 +488,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def durableDataEnvelopeFromProto(durableDataEnvelope: dm.DurableDataEnvelope): DurableDataEnvelope = { val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData] + val pruning = pruningFromProto(durableDataEnvelope.getPruningList) + new DurableDataEnvelope(data) } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala index bfb8fb16f8..b4cf743c5f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala @@ -23,7 +23,7 @@ final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeCo val second = role("second") commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.log-dead-letters-during-shutdown = off akka.cluster.distributed-data.durable.keys = ["durable*"] diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala index 9535289015..553f0d727f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala @@ -126,18 +126,19 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN } within(15.seconds) { + var values = Set.empty[Int] awaitAssert { replicator ! Get(KeyA, ReadLocal) val counter3 = expectMsgType[GetSuccess[GCounter]].dataValue - counter3.value should be(10) + val value = counter3.value.intValue + values += value + value should be(10) counter3.state.size should be(3) } + values should ===(Set(10)) } enterBarrier("pruned") - // let it become tombstone - Thread.sleep(5000) - runOn(first) { val addr = cluster2.selfAddress val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s""" @@ -150,15 +151,31 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN Cluster(sys3).join(node(first).address) within(10.seconds) { + var values = Set.empty[Int] awaitAssert { replicator3.tell(Get(KeyA, ReadLocal), probe3.ref) val counter4 = probe3.expectMsgType[GetSuccess[GCounter]].dataValue - counter4.value should be(10) + val value = counter4.value.intValue + values += value + value should be(10) counter4.state.size should be(3) } + values should ===(Set(10)) } + + // after merging with others + replicator3 ! Get(KeyA, ReadAll(remainingOrDefault)) + val counter5 = expectMsgType[GetSuccess[GCounter]].dataValue + counter5.value should be(10) + counter5.state.size should be(3) } + enterBarrier("sys3-started") + replicator ! Get(KeyA, ReadAll(remainingOrDefault)) + val counter6 = expectMsgType[GetSuccess[GCounter]].dataValue + counter6.value should be(10) + counter6.state.size should be(3) + enterBarrier("after-1") } } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala index 6ac4dde83d..33154a59b0 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala @@ -122,14 +122,18 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST runOn(first, second) { within(15.seconds) { + var values = Set.empty[Int] awaitAssert { replicator ! Get(KeyA, ReadLocal) expectMsgPF() { case g @ GetSuccess(KeyA, _) ⇒ - g.get(KeyA).value should be(9) + val value = g.get(KeyA).value.toInt + values += value + value should be(9) g.get(KeyA).needPruningFrom(thirdUniqueAddress) should be(false) } } + values should ===(Set(9)) } within(5.seconds) { awaitAssert { @@ -154,10 +158,12 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST } enterBarrier("pruning-done") - // on one of the nodes the data has been updated by the pruning, - // client can update anyway + // after pruning performed we should not be able to update with data from removed node def updateAfterPruning(expectedValue: Int): Unit = { - replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None)(_ + 1) + replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None) { existing ⇒ + // inject data from removed node to simulate bad data + existing.merge(oldCounter) + 1 + } expectMsgPF() { case UpdateSuccess(KeyA, _) ⇒ replicator ! Get(KeyA, ReadLocal) @@ -165,6 +171,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST retrieved.value should be(expectedValue) } } + runOn(first) { updateAfterPruning(expectedValue = 10) } @@ -175,19 +182,19 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST } enterBarrier("update-second-after-pruning") - // after pruning performed and maxDissemination it is tombstoned - // and we should still not be able to update with data from removed node + // after full replication should still not be able to update with data from removed node + // but it would not work after removal of the PruningPerformed markers expectNoMsg(maxPruningDissemination + 3.seconds) runOn(first) { updateAfterPruning(expectedValue = 12) } - enterBarrier("update-first-after-tombstone") + enterBarrier("update-first-after-dissemination") runOn(second) { updateAfterPruning(expectedValue = 13) } - enterBarrier("update-second-after-tombstone") + enterBarrier("update-second-after-dissemination") enterBarrier("after-1") } diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java index dc40be084f..e7762f85a3 100644 --- a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java +++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java @@ -13,6 +13,11 @@ public class JavaImplOfReplicatedData extends AbstractReplicatedData modifiedByNodes() { + return akka.japi.Util.immutableSeq(new java.util.ArrayList()).toSet(); + } + @Override public boolean needPruningFrom(UniqueAddress removedNode) { return false; diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala new file mode 100644 index 0000000000..7f1ef32452 --- /dev/null +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DataEnvelopeSpec.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.cluster.ddata + +import akka.actor.Address +import akka.cluster.UniqueAddress +import org.scalatest.Matchers +import org.scalatest.WordSpec +import akka.cluster.ddata.Replicator.Internal.DataEnvelope + +class DataEnvelopeSpec extends WordSpec with Matchers { + import PruningState._ + + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) + val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L) + val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L) + val obsoleteTimeInFuture = System.currentTimeMillis() + 3600 * 1000 + val oldObsoleteTime = System.currentTimeMillis() - 3600 * 1000 + + "DataEnvelope" must { + + "handle pruning transitions" in { + val g1 = GCounter.empty.increment(node1, 1) + val d1 = DataEnvelope(g1) + + val d2 = d1.initRemovedNodePruning(node1, node2) + d2.pruning(node1).isInstanceOf[PruningInitialized] should ===(true) + d2.pruning(node1).asInstanceOf[PruningInitialized].owner should ===(node2) + + val d3 = d2.addSeen(node3.address) + d3.pruning(node1).asInstanceOf[PruningInitialized].seen should ===(Set(node3.address)) + + val d4 = d3.prune(node1, PruningPerformed(obsoleteTimeInFuture)) + d4.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2)) + } + + "merge correctly" in { + val g1 = GCounter.empty.increment(node1, 1) + val d1 = DataEnvelope(g1) + val g2 = GCounter.empty.increment(node2, 2) + val d2 = DataEnvelope(g2) + + val d3 = d1.merge(d2) + d3.data.asInstanceOf[GCounter].value should ===(3) + d3.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node1, node2)) + val d4 = d3.initRemovedNodePruning(node1, node2) + val d5 = d4.prune(node1, PruningPerformed(obsoleteTimeInFuture)) + d5.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2)) + + // late update from node1 + val g11 = g1.increment(node1, 10) + val d6 = d5.merge(DataEnvelope(g11)) + d6.data.asInstanceOf[GCounter].value should ===(3) + d6.data.asInstanceOf[GCounter].modifiedByNodes should ===(Set(node2)) + + // remove obsolete + val d7 = d5.copy(pruning = d5.pruning.updated(node1, PruningPerformed(oldObsoleteTime))) + val d8 = d5.copy(pruning = Map.empty) + d8.merge(d7).pruning should ===(Map.empty) + d7.merge(d8).pruning should ===(Map.empty) + + d5.merge(d7).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture)) + d7.merge(d5).pruning(node1) should ===(PruningPerformed(obsoleteTimeInFuture)) + } + + } +} diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala index 60571f1542..46a54f3661 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala @@ -139,18 +139,22 @@ class GCounterSpec extends WordSpec with Matchers { val c1 = GCounter() val c2 = c1 increment node1 val c3 = c2 increment node2 + c2.modifiedByNodes should ===(Set(node1)) c2.needPruningFrom(node1) should be(true) c2.needPruningFrom(node2) should be(false) + c3.modifiedByNodes should ===(Set(node1, node2)) c3.needPruningFrom(node1) should be(true) c3.needPruningFrom(node2) should be(true) c3.value should be(2) val c4 = c3.prune(node1, node2) + c4.modifiedByNodes should ===(Set(node2)) c4.needPruningFrom(node2) should be(true) c4.needPruningFrom(node1) should be(false) c4.value should be(2) val c5 = (c4 increment node1).pruningCleanup(node1) + c5.modifiedByNodes should ===(Set(node2)) c5.needPruningFrom(node1) should be(false) c4.value should be(2) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala index 211c43c093..4d56d7dee6 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala @@ -143,16 +143,20 @@ class PNCounterSpec extends WordSpec with Matchers { val c1 = PNCounter() val c2 = c1 increment node1 val c3 = c2 decrement node2 + c2.modifiedByNodes should ===(Set(node1)) c2.needPruningFrom(node1) should be(true) c2.needPruningFrom(node2) should be(false) + c3.modifiedByNodes should ===(Set(node1, node2)) c3.needPruningFrom(node1) should be(true) c3.needPruningFrom(node2) should be(true) val c4 = c3.prune(node1, node2) + c4.modifiedByNodes should ===(Set(node2)) c4.needPruningFrom(node2) should be(true) c4.needPruningFrom(node1) should be(false) val c5 = (c4 increment node1).pruningCleanup(node1) + c5.modifiedByNodes should ===(Set(node2)) c5.needPruningFrom(node1) should be(false) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala index 709cea1880..af37267baf 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PruningStateSpec.scala @@ -12,32 +12,37 @@ import org.scalatest.WordSpec class PruningStateSpec extends WordSpec with Matchers { import PruningState._ - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) - val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3) - val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) + val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L) + val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L) "Pruning state" must { - "merge phase correctly" in { - val p1 = PruningState(node1, PruningInitialized(Set.empty)) - val p2 = PruningState(node1, PruningPerformed) - p1.merge(p2).phase should be(PruningPerformed) - p2.merge(p1).phase should be(PruningPerformed) + "merge state correctly" in { + val p1 = PruningInitialized(node1, Set.empty) + val p2 = PruningPerformed(System.currentTimeMillis() + 3600 * 1000) + p1.merge(p2) should be(p2) + p2.merge(p1) should be(p2) + + val p3 = p2.copy(p2.obsoleteTime - 1) + p2.merge(p3) should be(p2) // keep greatest obsoleteTime + p3.merge(p2) should be(p2) + } "merge owner correctly" in { - val p1 = PruningState(node1, PruningInitialized(Set.empty)) - val p2 = PruningState(node2, PruningInitialized(Set.empty)) - val expected = PruningState(node1, PruningInitialized(Set.empty)) + val p1 = PruningInitialized(node1, Set.empty) + val p2 = PruningInitialized(node2, Set.empty) + val expected = PruningInitialized(node1, Set.empty) p1.merge(p2) should be(expected) p2.merge(p1) should be(expected) } "merge seen correctly" in { - val p1 = PruningState(node1, PruningInitialized(Set(node2.address))) - val p2 = PruningState(node1, PruningInitialized(Set(node4.address))) - val expected = PruningState(node1, PruningInitialized(Set(node2.address, node4.address))) + val p1 = PruningInitialized(node1, Set(node2.address)) + val p2 = PruningInitialized(node1, Set(node4.address)) + val expected = PruningInitialized(node1, Set(node2.address, node4.address)) p1.merge(p2) should be(expected) p2.merge(p1) should be(expected) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index b47e9a5332..46bdf27e7b 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -70,8 +70,8 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(Changed(keyA)(data1)) checkSerialization(DataEnvelope(data1)) checkSerialization(DataEnvelope(data1, pruning = Map( - address1 → PruningState(address2, PruningPerformed), - address3 → PruningState(address2, PruningInitialized(Set(address1.address)))))) + address1 → PruningPerformed(System.currentTimeMillis()), + address3 → PruningInitialized(address2, Set(address1.address))))) checkSerialization(Write("A", DataEnvelope(data1))) checkSerialization(WriteAck) checkSerialization(WriteNack) @@ -85,6 +85,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( "A" → DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true)) checkSerialization(new DurableDataEnvelope(data1)) + checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map( + address1 → PruningPerformed(System.currentTimeMillis()), + address3 → PruningInitialized(address2, Set(address1.address)))))) } } diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index 320a996616..777c0605cc 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -511,6 +511,18 @@ Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Up durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval`` such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``. +There is one important caveat when it comes pruning of :ref:`crdt_garbage_java` for durable data. +If and old data entry that was never pruned is injected and merged with existing data after +that the pruning markers have been removed the value will not be correct. The time-to-live +of the markers is defined by configuration +``akka.cluster.distributed-data.durable.remove-pruning-marker-after`` and is in the magnitude of days. +This would be possible if a node with durable data didn't participate in the pruning +(e.g. it was shutdown) and later started after this time. A node with durable data should not +be stopped for longer time than this duration and if it is joining again after this +duration its data should first be manually removed (from the lmdb directory). + +.. _crdt_garbage_java: + CRDT Garbage ------------ @@ -519,7 +531,8 @@ For example a ``GCounter`` keeps track of one counter per node. If a ``GCounter` from one node it will associate the identifier of that node forever. That can become a problem for long running systems with many cluster nodes being added and removed. To solve this problem the ``Replicator`` performs pruning of data associated with nodes that have been removed from the -cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. +cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. See the +API documentation of the ``Replicator`` for details. Samples ======= diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 366ada5788..4033e17e51 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -523,6 +523,18 @@ Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Up durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval`` such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``. +There is one important caveat when it comes pruning of :ref:`crdt_garbage_scala` for durable data. +If and old data entry that was never pruned is injected and merged with existing data after +that the pruning markers have been removed the value will not be correct. The time-to-live +of the markers is defined by configuration +``akka.cluster.distributed-data.durable.remove-pruning-marker-after`` and is in the magnitude of days. +This would be possible if a node with durable data didn't participate in the pruning +(e.g. it was shutdown) and later started after this time. A node with durable data should not +be stopped for longer time than this duration and if it is joining again after this +duration its data should first be manually removed (from the lmdb directory). + +.. _crdt_garbage_scala: + CRDT Garbage ------------ @@ -531,7 +543,8 @@ For example a ``GCounter`` keeps track of one counter per node. If a ``GCounter` from one node it will associate the identifier of that node forever. That can become a problem for long running systems with many cluster nodes being added and removed. To solve this problem the ``Replicator`` performs pruning of data associated with nodes that have been removed from the -cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. +cluster. Data types that need pruning have to implement the ``RemovedNodePruning`` trait. See the +API documentation of the ``Replicator`` for details. Samples ======= diff --git a/project/MiMa.scala b/project/MiMa.scala index 23bf5d8847..83adab3687 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -162,10 +162,16 @@ object MiMa extends AutoPlugin { FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"), FilterAnyProblemStartingWith("akka.cluster.sharding.ShardRegion"), + // #21647 pruning + FilterAnyProblemStartingWith("akka.cluster.ddata.PruningState"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.RemovedNodePruning.modifiedByNodes"), + FilterAnyProblemStartingWith("akka.cluster.ddata.Replicator"), + FilterAnyProblemStartingWith("akka.cluster.ddata.protobuf.msg"), + // #21537 coordinated shutdown ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"), - + // #21423 removal of deprecated stages (in 2.5.x) ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"),