diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index 590254ef83..4e66f63096 100644
--- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -12476,6 +12476,1379 @@ public final class ReplicatorMessages {
// @@protoc_insertion_point(class_scope:akka.cluster.ddata.Gossip)
}
+ public interface DeltaPropagationOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ java.util.List
+ getEntriesList();
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry getEntries(int index);
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ int getEntriesCount();
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder>
+ getEntriesOrBuilderList();
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
+ int index);
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DeltaPropagation}
+ */
+ public static final class DeltaPropagation extends
+ akka.protobuf.GeneratedMessage
+ implements DeltaPropagationOrBuilder {
+ // Use DeltaPropagation.newBuilder() to construct.
+ private DeltaPropagation(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private DeltaPropagation(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final DeltaPropagation defaultInstance;
+ public static DeltaPropagation getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public DeltaPropagation getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private DeltaPropagation(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ entries_ = new java.util.ArrayList();
+ mutable_bitField0_ |= 0x00000001;
+ }
+ entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry));
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ entries_ = java.util.Collections.unmodifiableList(entries_);
+ }
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public DeltaPropagation parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new DeltaPropagation(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ public interface EntryOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required string key = 1;
+ /**
+ * required string key = 1;
+ */
+ boolean hasKey();
+ /**
+ * required string key = 1;
+ */
+ java.lang.String getKey();
+ /**
+ * required string key = 1;
+ */
+ akka.protobuf.ByteString
+ getKeyBytes();
+
+ // required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ boolean hasEnvelope();
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope getEnvelope();
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder getEnvelopeOrBuilder();
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DeltaPropagation.Entry}
+ */
+ public static final class Entry extends
+ akka.protobuf.GeneratedMessage
+ implements EntryOrBuilder {
+ // Use Entry.newBuilder() to construct.
+ private Entry(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private Entry(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final Entry defaultInstance;
+ public static Entry getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public Entry getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private Entry(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ key_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder subBuilder = null;
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ subBuilder = envelope_.toBuilder();
+ }
+ envelope_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.PARSER, extensionRegistry);
+ if (subBuilder != null) {
+ subBuilder.mergeFrom(envelope_);
+ envelope_ = subBuilder.buildPartial();
+ }
+ bitField0_ |= 0x00000002;
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public Entry parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new Entry(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required string key = 1;
+ public static final int KEY_FIELD_NUMBER = 1;
+ private java.lang.Object key_;
+ /**
+ * required string key = 1;
+ */
+ public boolean hasKey() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string key = 1;
+ */
+ public java.lang.String getKey() {
+ java.lang.Object ref = key_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ akka.protobuf.ByteString bs =
+ (akka.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ key_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * required string key = 1;
+ */
+ public akka.protobuf.ByteString
+ getKeyBytes() {
+ java.lang.Object ref = key_;
+ if (ref instanceof java.lang.String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ key_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+
+ // required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ public static final int ENVELOPE_FIELD_NUMBER = 2;
+ private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope envelope_;
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public boolean hasEnvelope() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope getEnvelope() {
+ return envelope_;
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder getEnvelopeOrBuilder() {
+ return envelope_;
+ }
+
+ private void initFields() {
+ key_ = "";
+ envelope_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasKey()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasEnvelope()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getEnvelope().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getKeyBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeMessage(2, envelope_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeBytesSize(1, getKeyBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(2, envelope_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DeltaPropagation.Entry}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder.class);
+ }
+
+ // Construct using akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getEnvelopeFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ key_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ if (envelopeBuilder_ == null) {
+ envelope_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance();
+ } else {
+ envelopeBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry getDefaultInstanceForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.getDefaultInstance();
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry build() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry buildPartial() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry result = new akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.key_ = key_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ if (envelopeBuilder_ == null) {
+ result.envelope_ = envelope_;
+ } else {
+ result.envelope_ = envelopeBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry) {
+ return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry other) {
+ if (other == akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.getDefaultInstance()) return this;
+ if (other.hasKey()) {
+ bitField0_ |= 0x00000001;
+ key_ = other.key_;
+ onChanged();
+ }
+ if (other.hasEnvelope()) {
+ mergeEnvelope(other.getEnvelope());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasKey()) {
+
+ return false;
+ }
+ if (!hasEnvelope()) {
+
+ return false;
+ }
+ if (!getEnvelope().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required string key = 1;
+ private java.lang.Object key_ = "";
+ /**
+ * required string key = 1;
+ */
+ public boolean hasKey() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string key = 1;
+ */
+ public java.lang.String getKey() {
+ java.lang.Object ref = key_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((akka.protobuf.ByteString) ref)
+ .toStringUtf8();
+ key_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * required string key = 1;
+ */
+ public akka.protobuf.ByteString
+ getKeyBytes() {
+ java.lang.Object ref = key_;
+ if (ref instanceof String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ key_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * required string key = 1;
+ */
+ public Builder setKey(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ key_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required string key = 1;
+ */
+ public Builder clearKey() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ key_ = getDefaultInstance().getKey();
+ onChanged();
+ return this;
+ }
+ /**
+ * required string key = 1;
+ */
+ public Builder setKeyBytes(
+ akka.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ key_ = value;
+ onChanged();
+ return this;
+ }
+
+ // required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope envelope_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance();
+ private akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder> envelopeBuilder_;
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public boolean hasEnvelope() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope getEnvelope() {
+ if (envelopeBuilder_ == null) {
+ return envelope_;
+ } else {
+ return envelopeBuilder_.getMessage();
+ }
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public Builder setEnvelope(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope value) {
+ if (envelopeBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ envelope_ = value;
+ onChanged();
+ } else {
+ envelopeBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public Builder setEnvelope(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder builderForValue) {
+ if (envelopeBuilder_ == null) {
+ envelope_ = builderForValue.build();
+ onChanged();
+ } else {
+ envelopeBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public Builder mergeEnvelope(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope value) {
+ if (envelopeBuilder_ == null) {
+ if (((bitField0_ & 0x00000002) == 0x00000002) &&
+ envelope_ != akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance()) {
+ envelope_ =
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.newBuilder(envelope_).mergeFrom(value).buildPartial();
+ } else {
+ envelope_ = value;
+ }
+ onChanged();
+ } else {
+ envelopeBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public Builder clearEnvelope() {
+ if (envelopeBuilder_ == null) {
+ envelope_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance();
+ onChanged();
+ } else {
+ envelopeBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder getEnvelopeBuilder() {
+ bitField0_ |= 0x00000002;
+ onChanged();
+ return getEnvelopeFieldBuilder().getBuilder();
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder getEnvelopeOrBuilder() {
+ if (envelopeBuilder_ != null) {
+ return envelopeBuilder_.getMessageOrBuilder();
+ } else {
+ return envelope_;
+ }
+ }
+ /**
+ * required .akka.cluster.ddata.DataEnvelope envelope = 2;
+ */
+ private akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder>
+ getEnvelopeFieldBuilder() {
+ if (envelopeBuilder_ == null) {
+ envelopeBuilder_ = new akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder>(
+ envelope_,
+ getParentForChildren(),
+ isClean());
+ envelope_ = null;
+ }
+ return envelopeBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation.Entry)
+ }
+
+ static {
+ defaultInstance = new Entry(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:akka.cluster.ddata.DeltaPropagation.Entry)
+ }
+
+ // repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ public static final int ENTRIES_FIELD_NUMBER = 1;
+ private java.util.List entries_;
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public java.util.List getEntriesList() {
+ return entries_;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder>
+ getEntriesOrBuilderList() {
+ return entries_;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public int getEntriesCount() {
+ return entries_.size();
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry getEntries(int index) {
+ return entries_.get(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
+ int index) {
+ return entries_.get(index);
+ }
+
+ private void initFields() {
+ entries_ = java.util.Collections.emptyList();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ for (int i = 0; i < getEntriesCount(); i++) {
+ if (!getEntries(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ for (int i = 0; i < entries_.size(); i++) {
+ output.writeMessage(1, entries_.get(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ for (int i = 0; i < entries_.size(); i++) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(1, entries_.get(i));
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DeltaPropagation}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagationOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Builder.class);
+ }
+
+ // Construct using akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getEntriesFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (entriesBuilder_ == null) {
+ entries_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ } else {
+ entriesBuilder_.clear();
+ }
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DeltaPropagation_descriptor;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation getDefaultInstanceForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.getDefaultInstance();
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation build() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation buildPartial() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation result = new akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation(this);
+ int from_bitField0_ = bitField0_;
+ if (entriesBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ entries_ = java.util.Collections.unmodifiableList(entries_);
+ bitField0_ = (bitField0_ & ~0x00000001);
+ }
+ result.entries_ = entries_;
+ } else {
+ result.entries_ = entriesBuilder_.build();
+ }
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation) {
+ return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation other) {
+ if (other == akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.getDefaultInstance()) return this;
+ if (entriesBuilder_ == null) {
+ if (!other.entries_.isEmpty()) {
+ if (entries_.isEmpty()) {
+ entries_ = other.entries_;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ } else {
+ ensureEntriesIsMutable();
+ entries_.addAll(other.entries_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.entries_.isEmpty()) {
+ if (entriesBuilder_.isEmpty()) {
+ entriesBuilder_.dispose();
+ entriesBuilder_ = null;
+ entries_ = other.entries_;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ entriesBuilder_ =
+ akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getEntriesFieldBuilder() : null;
+ } else {
+ entriesBuilder_.addAllMessages(other.entries_);
+ }
+ }
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ for (int i = 0; i < getEntriesCount(); i++) {
+ if (!getEntries(i).isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ private java.util.List entries_ =
+ java.util.Collections.emptyList();
+ private void ensureEntriesIsMutable() {
+ if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+ entries_ = new java.util.ArrayList(entries_);
+ bitField0_ |= 0x00000001;
+ }
+ }
+
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder> entriesBuilder_;
+
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public java.util.List getEntriesList() {
+ if (entriesBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(entries_);
+ } else {
+ return entriesBuilder_.getMessageList();
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public int getEntriesCount() {
+ if (entriesBuilder_ == null) {
+ return entries_.size();
+ } else {
+ return entriesBuilder_.getCount();
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry getEntries(int index) {
+ if (entriesBuilder_ == null) {
+ return entries_.get(index);
+ } else {
+ return entriesBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder setEntries(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry value) {
+ if (entriesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureEntriesIsMutable();
+ entries_.set(index, value);
+ onChanged();
+ } else {
+ entriesBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder setEntries(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder builderForValue) {
+ if (entriesBuilder_ == null) {
+ ensureEntriesIsMutable();
+ entries_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ entriesBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder addEntries(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry value) {
+ if (entriesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureEntriesIsMutable();
+ entries_.add(value);
+ onChanged();
+ } else {
+ entriesBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder addEntries(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry value) {
+ if (entriesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureEntriesIsMutable();
+ entries_.add(index, value);
+ onChanged();
+ } else {
+ entriesBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder addEntries(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder builderForValue) {
+ if (entriesBuilder_ == null) {
+ ensureEntriesIsMutable();
+ entries_.add(builderForValue.build());
+ onChanged();
+ } else {
+ entriesBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder addEntries(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder builderForValue) {
+ if (entriesBuilder_ == null) {
+ ensureEntriesIsMutable();
+ entries_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ entriesBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder addAllEntries(
+ java.lang.Iterable extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry> values) {
+ if (entriesBuilder_ == null) {
+ ensureEntriesIsMutable();
+ super.addAll(values, entries_);
+ onChanged();
+ } else {
+ entriesBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder clearEntries() {
+ if (entriesBuilder_ == null) {
+ entries_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ onChanged();
+ } else {
+ entriesBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public Builder removeEntries(int index) {
+ if (entriesBuilder_ == null) {
+ ensureEntriesIsMutable();
+ entries_.remove(index);
+ onChanged();
+ } else {
+ entriesBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder getEntriesBuilder(
+ int index) {
+ return getEntriesFieldBuilder().getBuilder(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
+ int index) {
+ if (entriesBuilder_ == null) {
+ return entries_.get(index); } else {
+ return entriesBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder>
+ getEntriesOrBuilderList() {
+ if (entriesBuilder_ != null) {
+ return entriesBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(entries_);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder addEntriesBuilder() {
+ return getEntriesFieldBuilder().addBuilder(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder addEntriesBuilder(
+ int index) {
+ return getEntriesFieldBuilder().addBuilder(
+ index, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.DeltaPropagation.Entry entries = 1;
+ */
+ public java.util.List
+ getEntriesBuilderList() {
+ return getEntriesFieldBuilder().getBuilderList();
+ }
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder>
+ getEntriesFieldBuilder() {
+ if (entriesBuilder_ == null) {
+ entriesBuilder_ = new akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder>(
+ entries_,
+ ((bitField0_ & 0x00000001) == 0x00000001),
+ getParentForChildren(),
+ isClean());
+ entries_ = null;
+ }
+ return entriesBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation)
+ }
+
+ static {
+ defaultInstance = new DeltaPropagation(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:akka.cluster.ddata.DeltaPropagation)
+ }
+
public interface UniqueAddressOrBuilder
extends akka.protobuf.MessageOrBuilder {
@@ -15828,6 +17201,16 @@ public final class ReplicatorMessages {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_cluster_ddata_Gossip_Entry_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_akka_cluster_ddata_DeltaPropagation_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable;
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_cluster_ddata_UniqueAddress_descriptor;
private static
@@ -15902,18 +17285,22 @@ public final class ReplicatorMessages {
"\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007entries\030\002 " +
"\003(\0132 .akka.cluster.ddata.Gossip.Entry\032H\n" +
"\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .",
- "akka.cluster.ddata.DataEnvelope\"X\n\rUniqu" +
- "eAddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster" +
- ".ddata.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(" +
- "\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002" +
- " \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMessage\030" +
- "\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" +
- "nifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030\001" +
- " \003(\t\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 \002" +
- "(\0132 .akka.cluster.ddata.OtherMessage\022>\n\007" +
- "pruning\030\002 \003(\0132-.akka.cluster.ddata.DataE",
- "nvelope.PruningEntryB#\n\037akka.cluster.dda" +
- "ta.protobuf.msgH\001"
+ "akka.cluster.ddata.DataEnvelope\"\231\001\n\020Delt" +
+ "aPropagation\022;\n\007entries\030\001 \003(\0132*.akka.clu" +
+ "ster.ddata.DeltaPropagation.Entry\032H\n\005Ent" +
+ "ry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka" +
+ ".cluster.ddata.DataEnvelope\"X\n\rUniqueAdd" +
+ "ress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.dda" +
+ "ta.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n" +
+ "\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r" +
+ "\"V\n\014OtherMessage\022\027\n\017enclosedMessage\030\001 \002(" +
+ "\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageManife",
+ "st\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t" +
+ "\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 \002(\0132 " +
+ ".akka.cluster.ddata.OtherMessage\022>\n\007prun" +
+ "ing\030\002 \003(\0132-.akka.cluster.ddata.DataEnvel" +
+ "ope.PruningEntryB#\n\037akka.cluster.ddata.p" +
+ "rotobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16022,32 +17409,44 @@ public final class ReplicatorMessages {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_Gossip_Entry_descriptor,
new java.lang.String[] { "Key", "Envelope", });
- internal_static_akka_cluster_ddata_UniqueAddress_descriptor =
+ internal_static_akka_cluster_ddata_DeltaPropagation_descriptor =
getDescriptor().getMessageTypes().get(14);
+ internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_akka_cluster_ddata_DeltaPropagation_descriptor,
+ new java.lang.String[] { "Entries", });
+ internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor =
+ internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0);
+ internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor,
+ new java.lang.String[] { "Key", "Envelope", });
+ internal_static_akka_cluster_ddata_UniqueAddress_descriptor =
+ getDescriptor().getMessageTypes().get(15);
internal_static_akka_cluster_ddata_UniqueAddress_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_UniqueAddress_descriptor,
new java.lang.String[] { "Address", "Uid", "Uid2", });
internal_static_akka_cluster_ddata_Address_descriptor =
- getDescriptor().getMessageTypes().get(15);
+ getDescriptor().getMessageTypes().get(16);
internal_static_akka_cluster_ddata_Address_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_Address_descriptor,
new java.lang.String[] { "Hostname", "Port", });
internal_static_akka_cluster_ddata_OtherMessage_descriptor =
- getDescriptor().getMessageTypes().get(16);
+ getDescriptor().getMessageTypes().get(17);
internal_static_akka_cluster_ddata_OtherMessage_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_OtherMessage_descriptor,
new java.lang.String[] { "EnclosedMessage", "SerializerId", "MessageManifest", });
internal_static_akka_cluster_ddata_StringGSet_descriptor =
- getDescriptor().getMessageTypes().get(17);
+ getDescriptor().getMessageTypes().get(18);
internal_static_akka_cluster_ddata_StringGSet_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_StringGSet_descriptor,
new java.lang.String[] { "Elements", });
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor =
- getDescriptor().getMessageTypes().get(18);
+ getDescriptor().getMessageTypes().get(19);
internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor,
diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
index 7c57b84215..4b1f2084b4 100644
--- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
+++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
@@ -96,6 +96,15 @@ message Gossip {
repeated Entry entries = 2;
}
+message DeltaPropagation {
+ message Entry {
+ required string key = 1;
+ required DataEnvelope envelope = 2;
+ }
+
+ repeated Entry entries = 1;
+}
+
message UniqueAddress {
required Address address = 1;
required sfixed32 uid = 2;
diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf
index 0df2bf195f..6d39269ead 100644
--- a/akka-distributed-data/src/main/resources/reference.conf
+++ b/akka-distributed-data/src/main/resources/reference.conf
@@ -18,7 +18,7 @@ akka.cluster.distributed-data {
# How often the Replicator should send out gossip information
gossip-interval = 2 s
-
+
# How often the subscribers will be notified of changes, if any
notify-subscribers-interval = 500 ms
@@ -58,6 +58,12 @@ akka.cluster.distributed-data {
# after this duration.
serializer-cache-time-to-live = 10s
+ # Settings for delta-CRDT
+ delta-crdt {
+ # enable or disable delta-CRDT replication
+ enabled = on
+ }
+
durable {
# List of keys that are durable. Prefix matching is supported by using * at the
# end of a key.
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala
new file mode 100644
index 0000000000..a8c2dfdf5e
--- /dev/null
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala
@@ -0,0 +1,170 @@
+/**
+ * Copyright (C) 2017 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import scala.collection.immutable.TreeMap
+import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
+import akka.actor.Address
+import akka.cluster.ddata.Replicator.Internal.DataEnvelope
+
+/**
+ * INTERNAL API: Used by the Replicator actor.
+ * Extracted to separate trait to make it easy to test.
+ */
+private[akka] trait DeltaPropagationSelector {
+
+ private var _propagationCount = 0L
+ def propagationCount: Long = _propagationCount
+ private var deltaCounter = Map.empty[String, Long]
+ private var deltaEntries = Map.empty[String, TreeMap[Long, ReplicatedData]]
+ private var deltaSentToNode = Map.empty[String, Map[Address, Long]]
+ private var deltaNodeRoundRobinCounter = 0L
+
+ def divisor: Int
+
+ def allNodes: Vector[Address]
+
+ def createDeltaPropagation(deltas: Map[String, ReplicatedData]): DeltaPropagation
+
+ def update(key: String, delta: ReplicatedData): Unit = {
+ val c = deltaCounter.get(key) match {
+ case Some(c) ⇒ c
+ case None ⇒
+ deltaCounter = deltaCounter.updated(key, 1L)
+ 1L
+ }
+ val deltaEntriesForKey = deltaEntries.getOrElse(key, TreeMap.empty[Long, ReplicatedData])
+ val updatedEntriesForKey =
+ deltaEntriesForKey.get(c) match {
+ case Some(existingDelta) ⇒
+ deltaEntriesForKey.updated(c, existingDelta.merge(delta.asInstanceOf[existingDelta.T]))
+ case None ⇒
+ deltaEntriesForKey.updated(c, delta)
+ }
+ deltaEntries = deltaEntries.updated(key, updatedEntriesForKey)
+ }
+
+ def delete(key: String): Unit = {
+ deltaEntries -= key
+ deltaCounter -= key
+ deltaSentToNode -= key
+ }
+
+ def nodesSliceSize(allNodesSize: Int): Int = {
+ // 2 - 10 nodes
+ math.min(math.max((allNodesSize / divisor) + 1, 2), math.min(allNodesSize, 10))
+ }
+
+ def collectPropagations(): Map[Address, DeltaPropagation] = {
+ _propagationCount += 1
+ val all = allNodes
+ if (all.isEmpty)
+ Map.empty
+ else {
+ // For each tick we pick a few nodes in round-robin fashion, 2 - 10 nodes for each tick.
+ // Normally the delta is propagated to all nodes within the gossip tick, so that
+ // full state gossip is not needed.
+ val sliceSize = nodesSliceSize(all.size)
+ val slice = {
+ if (all.size <= sliceSize)
+ all
+ else {
+ val i = (deltaNodeRoundRobinCounter % all.size).toInt
+ val first = all.slice(i, i + sliceSize)
+ if (first.size == sliceSize) first
+ else first ++ all.take(sliceSize - first.size)
+ }
+ }
+ deltaNodeRoundRobinCounter += sliceSize
+
+ var result = Map.empty[Address, DeltaPropagation]
+
+ slice.foreach { node ⇒
+ // collect the deltas that have not already been sent to the node and merge
+ // them into a delta group
+ var deltas = Map.empty[String, ReplicatedData]
+ deltaEntries.foreach {
+ case (key, entries) ⇒
+ val deltaSentToNodeForKey = deltaSentToNode.getOrElse(key, TreeMap.empty[Address, Long])
+ val j = deltaSentToNodeForKey.getOrElse(node, 0L)
+ val deltaEntriesAfterJ = deltaEntriesAfter(entries, j)
+ if (deltaEntriesAfterJ.nonEmpty) {
+ val deltaGroup = deltaEntriesAfterJ.valuesIterator.reduceLeft {
+ (d1, d2) ⇒ d1.merge(d2.asInstanceOf[d1.T])
+ }
+ deltas = deltas.updated(key, deltaGroup)
+ deltaSentToNode = deltaSentToNode.updated(key, deltaSentToNodeForKey.updated(node, deltaEntriesAfterJ.lastKey))
+ }
+ }
+
+ if (deltas.nonEmpty) {
+ // Important to include the pruning state in the deltas. For example if the delta is based
+ // on an entry that has been pruned but that has not yet been performed on the target node.
+ val deltaPropagation = createDeltaPropagation(deltas)
+ result = result.updated(node, deltaPropagation)
+ }
+ }
+
+ // increase the counter
+ deltaCounter = deltaCounter.map {
+ case (key, value) ⇒
+ if (deltaEntries.contains(key))
+ key → (value + 1)
+ else
+ key → value
+ }
+
+ result
+ }
+ }
+
+ private def deltaEntriesAfter(entries: TreeMap[Long, ReplicatedData], version: Long): TreeMap[Long, ReplicatedData] =
+ entries.from(version) match {
+ case ntrs if ntrs.isEmpty ⇒ ntrs
+ case ntrs if ntrs.firstKey == version ⇒ ntrs.tail // exclude first, i.e. version j that was already sent
+ case ntrs ⇒ ntrs
+ }
+
+ def hasDeltaEntries(key: String): Boolean = {
+ deltaEntries.get(key) match {
+ case Some(m) ⇒ m.nonEmpty
+ case None ⇒ false
+ }
+ }
+
+ private def findSmallestVersionPropagatedToAllNodes(key: String, all: Vector[Address]): Long = {
+ deltaSentToNode.get(key) match {
+ case None ⇒ 0L
+ case Some(deltaSentToNodeForKey) ⇒
+ if (deltaSentToNodeForKey.isEmpty) 0L
+ else if (all.exists(node ⇒ !deltaSentToNodeForKey.contains(node))) 0L
+ else deltaSentToNodeForKey.valuesIterator.min
+ }
+ }
+
+ def cleanupDeltaEntries(): Unit = {
+ val all = allNodes
+ if (all.isEmpty)
+ deltaEntries = Map.empty
+ else {
+ deltaEntries = deltaEntries.map {
+ case (key, entries) ⇒
+ val minVersion = findSmallestVersionPropagatedToAllNodes(key, all)
+
+ val deltaEntriesAfterMin = deltaEntriesAfter(entries, minVersion)
+
+ // TODO perhaps also remove oldest when deltaCounter are too far ahead (e.g. 10 cylces)
+
+ key → deltaEntriesAfterMin
+ }
+ }
+ }
+
+ def cleanupRemovedNode(address: Address): Unit = {
+ deltaSentToNode = deltaSentToNode.map {
+ case (key, deltaSentToNodeForKey) ⇒
+ key → (deltaSentToNodeForKey - address)
+ }
+ }
+}
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
index 3d1148f431..58c2243d7d 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala
@@ -39,8 +39,9 @@ object GCounter {
*/
@SerialVersionUID(1L)
final class GCounter private[akka] (
- private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty)
- extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge {
+ private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty,
+ private[akka] val _delta: Option[GCounter] = None)
+ extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge {
import GCounter.Zero
@@ -57,17 +58,17 @@ final class GCounter private[akka] (
def getValue: BigInteger = value.bigInteger
/**
- * Increment the counter with the delta specified.
+ * Increment the counter with the delta `n` specified.
* The delta must be zero or positive.
*/
- def +(delta: Long)(implicit node: Cluster): GCounter = increment(node, delta)
+ def +(n: Long)(implicit node: Cluster): GCounter = increment(node, n)
/**
- * Increment the counter with the delta specified.
- * The delta must be zero or positive.
+ * Increment the counter with the delta `n` specified.
+ * The delta `n` must be zero or positive.
*/
- def increment(node: Cluster, delta: Long = 1): GCounter =
- increment(node.selfUniqueAddress, delta)
+ def increment(node: Cluster, n: Long = 1): GCounter =
+ increment(node.selfUniqueAddress, n)
/**
* INTERNAL API
@@ -77,14 +78,19 @@ final class GCounter private[akka] (
/**
* INTERNAL API
*/
- private[akka] def increment(key: UniqueAddress, delta: BigInt): GCounter = {
- require(delta >= 0, "Can't decrement a GCounter")
- if (delta == 0) this
- else state.get(key) match {
- case Some(v) ⇒
- val tot = v + delta
- assignAncestor(new GCounter(state + (key → tot)))
- case None ⇒ assignAncestor(new GCounter(state + (key → delta)))
+ private[akka] def increment(key: UniqueAddress, n: BigInt): GCounter = {
+ require(n >= 0, "Can't decrement a GCounter")
+ if (n == 0) this
+ else {
+ val nextValue = state.get(key) match {
+ case Some(v) ⇒ v + n
+ case None ⇒ n
+ }
+ val newDelta = _delta match {
+ case Some(d) ⇒ Some(new GCounter(d.state + (key → nextValue)))
+ case None ⇒ Some(new GCounter(Map(key → nextValue)))
+ }
+ assignAncestor(new GCounter(state + (key → nextValue), newDelta))
}
}
@@ -102,6 +108,13 @@ final class GCounter private[akka] (
new GCounter(merged)
}
+ override def delta: GCounter = _delta match {
+ case Some(d) ⇒ d
+ case None ⇒ GCounter.empty
+ }
+
+ override def resetDelta: GCounter = new GCounter(state)
+
override def modifiedByNodes: Set[UniqueAddress] = state.keySet
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
index 7135becc3b..6bf92fdfd8 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala
@@ -39,7 +39,7 @@ object PNCounter {
@SerialVersionUID(1L)
final class PNCounter private[akka] (
private[akka] val increments: GCounter, private[akka] val decrements: GCounter)
- extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
+ extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
type T = PNCounter
@@ -54,39 +54,39 @@ final class PNCounter private[akka] (
def getValue: BigInteger = value.bigInteger
/**
- * Increment the counter with the delta specified.
+ * Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
- def +(delta: Long)(implicit node: Cluster): PNCounter = increment(node, delta)
+ def +(n: Long)(implicit node: Cluster): PNCounter = increment(node, n)
/**
- * Increment the counter with the delta specified.
+ * Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
- def increment(node: Cluster, delta: Long = 1): PNCounter =
- increment(node.selfUniqueAddress, delta)
+ def increment(node: Cluster, n: Long = 1): PNCounter =
+ increment(node.selfUniqueAddress, n)
/**
- * Decrement the counter with the delta specified.
+ * Decrement the counter with the delta `n` specified.
* If the delta is negative then it will increment instead of decrement.
*/
- def -(delta: Long)(implicit node: Cluster): PNCounter = decrement(node, delta)
+ def -(n: Long)(implicit node: Cluster): PNCounter = decrement(node, n)
/**
- * Decrement the counter with the delta specified.
- * If the delta is negative then it will increment instead of decrement.
+ * Decrement the counter with the delta `n` specified.
+ * If the delta `n` is negative then it will increment instead of decrement.
*/
- def decrement(node: Cluster, delta: Long = 1): PNCounter =
- decrement(node.selfUniqueAddress, delta)
+ def decrement(node: Cluster, n: Long = 1): PNCounter =
+ decrement(node.selfUniqueAddress, n)
- private[akka] def increment(key: UniqueAddress, delta: Long): PNCounter = change(key, delta)
+ private[akka] def increment(key: UniqueAddress, n: Long): PNCounter = change(key, n)
private[akka] def increment(key: UniqueAddress): PNCounter = increment(key, 1)
- private[akka] def decrement(key: UniqueAddress, delta: Long): PNCounter = change(key, -delta)
+ private[akka] def decrement(key: UniqueAddress, n: Long): PNCounter = change(key, -n)
private[akka] def decrement(key: UniqueAddress): PNCounter = decrement(key, 1)
- private[akka] def change(key: UniqueAddress, delta: Long): PNCounter =
- if (delta > 0) copy(increments = increments.increment(key, delta))
- else if (delta < 0) copy(decrements = decrements.increment(key, -delta))
+ private[akka] def change(key: UniqueAddress, n: Long): PNCounter =
+ if (n > 0) copy(increments = increments.increment(key, n))
+ else if (n < 0) copy(decrements = decrements.increment(key, -n))
else this
override def merge(that: PNCounter): PNCounter =
@@ -94,6 +94,10 @@ final class PNCounter private[akka] (
increments = that.increments.merge(this.increments),
decrements = that.decrements.merge(this.decrements))
+ override def delta: PNCounter = new PNCounter(increments.delta, decrements.delta)
+
+ override def resetDelta: PNCounter = new PNCounter(increments.resetDelta, decrements.resetDelta)
+
override def modifiedByNodes: Set[UniqueAddress] =
increments.modifiedByNodes union decrements.modifiedByNodes
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
index b93793e0dd..117e9c5a43 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala
@@ -22,6 +22,9 @@ import akka.cluster.UniqueAddress
*
* ReplicatedData types should be immutable, i.e. "modifying" methods should return
* a new instance.
+ *
+ * Implement the additional methods of [[DeltaReplicatedData]] if
+ * it has support for delta-CRDT replication.
*/
trait ReplicatedData {
/**
@@ -37,6 +40,42 @@ trait ReplicatedData {
}
+/**
+ * [[ReplicatedData]] with additional support for delta-CRDT replication.
+ * delta-CRDT is a way to reduce the need for sending the full state
+ * for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would
+ * result in sending the delta {'c', 'd'} and merge that with the state on the
+ * receiving side, resulting in set {'a', 'b', 'c', 'd'}.
+ *
+ * Learn more about this in the paper
+ * Delta State Replicated Data Types.
+ */
+trait DeltaReplicatedData extends ReplicatedData {
+
+ /**
+ * The accumulated delta of mutator operations since previous
+ * [[#resetDelta]]. When the `Replicator` invokes the `modify` function
+ * of the `Update` message and the user code is invoking one or more mutator
+ * operations the data is collecting the delta of the operations and makes
+ * it available for the `Replicator` with the [[#delta]] accessor. The
+ * `modify` function shall still return the full state in the same way as
+ * `ReplicatedData` without support for deltas.
+ */
+ def delta: T
+
+ /**
+ * Reset collection of deltas from mutator operations. When the `Replicator`
+ * invokes the `modify` function of the `Update` message the delta is always
+ * "reset" and when the user code is invoking one or more mutator operations the
+ * data is collecting the delta of the operations and makes it available for
+ * the `Replicator` with the [[#delta]] accessor. When the `Replicator` has
+ * grabbed the `delta` it will invoke this method to get a clean data instance
+ * without the delta.
+ */
+ def resetDelta: T
+
+}
+
/**
* Java API: Interface for implementing a [[ReplicatedData]] in Java.
*
@@ -61,6 +100,17 @@ abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends Re
}
+/**
+ * Java API: Interface for implementing a [[DeltaReplicatedData]] in Java.
+ *
+ * The type parameter `D` is a self-recursive type to be defined by the
+ * concrete implementation.
+ * E.g. `class TwoPhaseSet extends AbstractDeltaReplicatedData<TwoPhaseSet>`
+ */
+abstract class AbstractDeltaReplicatedData[D <: AbstractDeltaReplicatedData[D]]
+ extends AbstractReplicatedData[D] with DeltaReplicatedData {
+}
+
/**
* [[ReplicatedData]] that has support for pruning of data
* belonging to a specific node may implement this interface.
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
index 86827413b9..1870092830 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
@@ -44,6 +44,7 @@ import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
+import akka.actor.Cancellable
object ReplicatorSettings {
@@ -81,7 +82,8 @@ object ReplicatorSettings {
durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
durableKeys = config.getStringList("durable.keys").asScala.toSet,
pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis,
- durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis)
+ durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis,
+ deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"))
}
/**
@@ -128,20 +130,21 @@ final class ReplicatorSettings(
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String],
val pruningMarkerTimeToLive: FiniteDuration,
- val durablePruningMarkerTimeToLive: FiniteDuration) {
+ val durablePruningMarkerTimeToLive: FiniteDuration,
+ val deltaCrdtEnabled: Boolean) {
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
- maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days)
+ maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true)
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
- maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days)
+ maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@@ -191,6 +194,9 @@ final class ReplicatorSettings(
withDurableKeys(durableKeys.asScala.toSet)
}
+ def withDeltaCrdtEnabled(deltaCrdtEnabled: Boolean): ReplicatorSettings =
+ copy(deltaCrdtEnabled = deltaCrdtEnabled)
+
private def copy(
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
@@ -202,10 +208,11 @@ final class ReplicatorSettings(
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys,
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
- durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive): ReplicatorSettings =
+ durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
+ deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
- pruningMarkerTimeToLive, durablePruningMarkerTimeToLive)
+ pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled)
}
object Replicator {
@@ -572,6 +579,7 @@ object Replicator {
private[akka] object Internal {
case object GossipTick
+ case object DeltaPropagationTick
case object RemovedNodePruningTick
case object ClockTick
final case class Write(key: String, envelope: DataEnvelope) extends ReplicatorMessage
@@ -581,6 +589,8 @@ object Replicator {
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: String, envelope: DataEnvelope)
case object ReadRepairAck
+ // for testing purposes
+ final case class TestFullStateGossip(enabled: Boolean)
// Gossip Status message contains SHA-1 digests of the data to determine when
// to send the full data
@@ -691,6 +701,8 @@ object Replicator {
}
final case class Gossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
+ final case class DeltaPropagation(deltas: Map[String, DataEnvelope]) extends ReplicatorMessage
+
}
}
@@ -704,8 +716,8 @@ object Replicator {
* The data types must be convergent CRDTs and implement [[ReplicatedData]], i.e.
* they provide a monotonic merge function and the state changes always converge.
*
- * You can use your own custom [[ReplicatedData]] types, and several types are provided
- * by this package, such as:
+ * You can use your own custom [[ReplicatedData]] or [[DeltaReplicatedData]] types,
+ * and several types are provided by this package, such as:
*
*
* - Counters: [[GCounter]], [[PNCounter]]
@@ -726,7 +738,24 @@ object Replicator {
* The `Replicator` actor must be started on each node in the cluster, or group of
* nodes tagged with a specific role. It communicates with other `Replicator` instances
* with the same path (without address) that are running on other nodes . For convenience it
- * can be used with the [[DistributedData]] extension.
+ * can be used with the [[DistributedData]] extension but it can also be started as an ordinary
+ * actor using the `Replicator.props`. If it is started as an ordinary actor it is important
+ * that it is given the same name, started on same path, on all nodes.
+ *
+ * Delta State Replicated Data Types
+ * is supported. delta-CRDT is a way to reduce the need for sending the full state
+ * for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would
+ * result in sending the delta {'c', 'd'} and merge that with the state on the
+ * receiving side, resulting in set {'a', 'b', 'c', 'd'}.
+ *
+ * Current protocol for replicating the deltas does not support causal consistency.
+ * It is only eventually consistent. This means that if elements 'c' and 'd' are
+ * added in two separate `Update` operations these deltas may occasionally be propagated
+ * to nodes in different order than the causal order of the updates. For this example it
+ * can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually
+ * it will be {'a', 'b', 'c', 'd'}. If causal consistency is needed the delta propagation
+ * should be disabled with configuration property
+ * `akka.cluster.distributed-data.delta-crdt.enabled=off`.
*
* == Update ==
*
@@ -910,6 +939,34 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} else
context.system.deadLetters // not used
+ val deltaPropagationSelector = new DeltaPropagationSelector {
+ override val divisor = 5
+ override def allNodes: Vector[Address] = {
+ // TODO optimize, by maintaining a sorted instance variable instead
+ nodes.union(weaklyUpNodes).toVector.sorted
+ }
+
+ override def createDeltaPropagation(deltas: Map[String, ReplicatedData]): DeltaPropagation = {
+ // Important to include the pruning state in the deltas. For example if the delta is based
+ // on an entry that has been pruned but that has not yet been performed on the target node.
+ DeltaPropagation(deltas.map {
+ case (key, d) ⇒ getData(key) match {
+ case Some(envelope) ⇒ key → envelope.copy(data = d)
+ case None ⇒ key → DataEnvelope(d)
+ }
+ }(collection.breakOut))
+ }
+ }
+ val deltaPropagationTask: Option[Cancellable] =
+ if (deltaCrdtEnabled) {
+ // Derive the deltaPropagationInterval from the gossipInterval.
+ // Normally the delta is propagated to all nodes within the gossip tick, so that
+ // full state gossip is not needed.
+ val deltaPropagationInterval = (gossipInterval / deltaPropagationSelector.divisor).max(200.millis)
+ Some(context.system.scheduler.schedule(deltaPropagationInterval, deltaPropagationInterval,
+ self, DeltaPropagationTick))
+ } else None
+
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
@@ -933,6 +990,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// for splitting up gossip in chunks
var statusCount = 0L
var statusTotChunks = 0
+ // possibility to disable Gossip for testing purpose
+ var fullStateGossipEnabled = true
val subscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
val newSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
@@ -965,6 +1024,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def postStop(): Unit = {
cluster.unsubscribe(self)
gossipTask.cancel()
+ deltaPropagationTask.foreach(_.cancel())
notifyTask.cancel()
pruningTask.foreach(_.cancel())
clockTask.cancel()
@@ -1011,7 +1071,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case (key, d) ⇒
write(key, d.dataEnvelope) match {
case Some(newEnvelope) ⇒
- if (newEnvelope.data ne d.dataEnvelope.data)
+ if (newEnvelope ne d.dataEnvelope)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None ⇒
}
@@ -1030,6 +1090,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case RemovedNodePruningTick | FlushChanges | GossipTick ⇒
// ignore scheduled ticks when loading durable data
+ case TestFullStateGossip(enabled) ⇒
+ fullStateGossipEnabled = enabled
case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒
// ignore gossip and replication when loading durable data
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
@@ -1046,7 +1108,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Read(key) ⇒ receiveRead(key)
case Write(key, envelope) ⇒ receiveWrite(key, envelope)
case ReadRepair(key, envelope) ⇒ receiveReadRepair(key, envelope)
+ case DeltaPropagation(deltas) ⇒ receiveDeltaPropagation(deltas)
case FlushChanges ⇒ receiveFlushChanges()
+ case DeltaPropagationTick ⇒ receiveDeltaPropagationTick()
case GossipTick ⇒ receiveGossipTick()
case ClockTick ⇒ receiveClockTick()
case Status(otherDigests, chunk, totChunks) ⇒ receiveStatus(otherDigests, chunk, totChunks)
@@ -1066,6 +1130,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Delete(key, consistency, req) ⇒ receiveDelete(key, consistency, req)
case RemovedNodePruningTick ⇒ receiveRemovedNodePruningTick()
case GetReplicaCount ⇒ receiveGetReplicaCount()
+ case TestFullStateGossip(enabled) ⇒ fullStateGossipEnabled = enabled
}
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = {
@@ -1103,13 +1168,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
localValue match {
case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _)) ⇒
- envelope.merge(modify(Some(existing)).asInstanceOf[existing.T])
- case None ⇒ DataEnvelope(modify(None))
+ modify(Some(existing)) match {
+ case d: DeltaReplicatedData if deltaCrdtEnabled ⇒
+ (envelope.merge(d.resetDelta.asInstanceOf[existing.T]), Some(d.delta))
+ case d ⇒
+ (envelope.merge(d.asInstanceOf[existing.T]), None)
+ }
+ case None ⇒ modify(None) match {
+ case d: DeltaReplicatedData if deltaCrdtEnabled ⇒ (DataEnvelope(d.resetDelta), Some(d.delta))
+ case d ⇒ (DataEnvelope(d), None)
+ }
}
} match {
- case Success(envelope) ⇒
- log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, envelope.data)
+ case Success((envelope, delta)) ⇒
+ log.debug("Received Update for key [{}], old data [{}], new data [{}], delta [{}]", key, localValue, envelope.data, delta)
setData(key.id, envelope)
+
+ // handle the delta
+ delta match {
+ case Some(d) ⇒ deltaPropagationSelector.update(key.id, d)
+ case None ⇒ // not DeltaReplicatedData
+ }
+
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
@@ -1118,8 +1198,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else
replyTo ! UpdateSuccess(key, req)
} else {
+ val writeEnvelope = delta match {
+ case Some(d) ⇒ DataEnvelope(d)
+ case None ⇒ envelope
+ }
val writeAggregator =
- context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
+ context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
@@ -1176,13 +1260,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
Some(writeEnvelope2)
}
- def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
+ def writeAndStore(key: String, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope) ⇒
if (isDurable(key))
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None ⇒
}
+ }
+
+ def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
+ writeAndStore(key, writeEnvelope)
replyTo ! ReadRepairAck
}
@@ -1231,6 +1319,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else LazyDigest
dataEntries = dataEntries.updated(key, (envelope, dig))
+ if (envelope.data == DeletedData)
+ deltaPropagationSelector.delete(key)
}
def getDigest(key: String): Digest = {
@@ -1282,7 +1372,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
changed = Set.empty[String]
}
- def receiveGossipTick(): Unit = selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo
+ def receiveDeltaPropagationTick(): Unit = {
+ deltaPropagationSelector.collectPropagations().foreach {
+ case (node, deltaPropagation) ⇒
+ // TODO split it to several DeltaPropagation if too many entries
+ replica(node) ! deltaPropagation
+ }
+ if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.divisor == 0)
+ deltaPropagationSelector.cleanupDeltaEntries()
+ }
+
+ def receiveDeltaPropagation(deltas: Map[String, DataEnvelope]): Unit = {
+ if (log.isDebugEnabled)
+ log.debug("Received DeltaPropagation from [{}], containing [{}]", sender().path.address, deltas.keys.mkString(", "))
+ deltas.foreach {
+ case (key, envelope) ⇒ writeAndStore(key, envelope)
+ }
+ }
+
+ def receiveGossipTick(): Unit = {
+ if (fullStateGossipEnabled)
+ selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo
+ }
def gossipTo(address: Address): Unit = {
val to = replica(address)
@@ -1353,12 +1464,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
updatedData.foreach {
case (key, envelope) ⇒
val hadData = dataEntries.contains(key)
- write(key, envelope) match {
- case Some(newEnvelope) ⇒
- if (isDurable(key))
- durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
- case None ⇒
- }
+ writeAndStore(key, envelope)
if (sendBack) getData(key) match {
case Some(d) ⇒
if (hadData || d.pruning.nonEmpty)
@@ -1426,6 +1532,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
+ deltaPropagationSelector.cleanupRemovedNode(m.address)
}
}
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
index 8ba9dabede..93e1202d29 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
@@ -174,6 +174,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
val GossipManifest = "N"
val WriteNackManifest = "O"
val DurableDataEnvelopeManifest = "P"
+ val DeltaPropagationManifest = "Q"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
GetManifest → getFromBinary,
@@ -190,6 +191,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
ReadResultManifest → readResultFromBinary,
StatusManifest → statusFromBinary,
GossipManifest → gossipFromBinary,
+ DeltaPropagationManifest → deltaPropagationFromBinary,
WriteNackManifest → (_ ⇒ WriteNack),
DurableDataEnvelopeManifest → durableDataEnvelopeFromBinary)
@@ -199,6 +201,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case WriteAck ⇒ WriteAckManifest
case _: Read ⇒ ReadManifest
case _: ReadResult ⇒ ReadResultManifest
+ case _: DeltaPropagation ⇒ DeltaPropagationManifest
case _: Status ⇒ StatusManifest
case _: Get[_] ⇒ GetManifest
case _: GetSuccess[_] ⇒ GetSuccessManifest
@@ -221,6 +224,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case m: Read ⇒ readCache.getOrAdd(m)
case m: ReadResult ⇒ readResultToProto(m).toByteArray
case m: Status ⇒ statusToProto(m).toByteArray
+ case m: DeltaPropagation ⇒ deltaPropagationToProto(m).toByteArray
case m: Get[_] ⇒ getToProto(m).toByteArray
case m: GetSuccess[_] ⇒ getSuccessToProto(m).toByteArray
case m: DurableDataEnvelope ⇒ durableDataEnvelopeToProto(m).toByteArray
@@ -281,6 +285,24 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
sendBack = gossip.getSendBack)
}
+ private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = {
+ val b = dm.DeltaPropagation.newBuilder()
+ val entries = deltaPropagation.deltas.foreach {
+ case (key, data) ⇒
+ b.addEntries(dm.DeltaPropagation.Entry.newBuilder().
+ setKey(key).
+ setEnvelope(dataEnvelopeToProto(data)))
+ }
+ b.build()
+ }
+
+ private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = {
+ val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes)
+ DeltaPropagation(
+ deltaPropagation.getEntriesList.asScala.map(e ⇒
+ e.getKey → dataEnvelopeFromProto(e.getEnvelope))(breakOut))
+ }
+
private def getToProto(get: Get[_]): dm.Get = {
val consistencyValue = get.consistency match {
case ReadLocal ⇒ 1
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala
new file mode 100644
index 0000000000..ebb4265d93
--- /dev/null
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala
@@ -0,0 +1,205 @@
+/**
+ * Copyright (C) 2009-2016 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import java.util.concurrent.ThreadLocalRandom
+
+import scala.concurrent.duration._
+
+import akka.cluster.Cluster
+import akka.cluster.ddata.Replicator._
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+object ReplicatorDeltaSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+
+ commonConfig(ConfigFactory.parseString("""
+ akka.actor.provider = "cluster"
+ akka.log-dead-letters-during-shutdown = off
+ """))
+
+ testTransport(on = true)
+
+ sealed trait Op
+ final case class Delay(n: Int) extends Op
+ final case class Incr(key: PNCounterKey, n: Int, consistency: WriteConsistency) extends Op
+ final case class Decr(key: PNCounterKey, n: Int, consistency: WriteConsistency) extends Op
+
+ val timeout = 5.seconds
+ val writeTwo = WriteTo(2, timeout)
+ val writeMajority = WriteMajority(timeout)
+
+ val KeyA = PNCounterKey("A")
+ val KeyB = PNCounterKey("B")
+ val KeyC = PNCounterKey("C")
+
+ def generateOperations(): Vector[Op] = {
+ val rnd = ThreadLocalRandom.current()
+
+ def consistency(): WriteConsistency = {
+ rnd.nextInt(100) match {
+ case n if n < 90 ⇒ WriteLocal
+ case n if n < 95 ⇒ writeTwo
+ case n if n < 100 ⇒ writeMajority
+ }
+ }
+
+ def key(): PNCounterKey = {
+ rnd.nextInt(3) match {
+ case 0 ⇒ KeyA
+ case 1 ⇒ KeyB
+ case 2 ⇒ KeyC
+ }
+ }
+
+ (0 to (20 + rnd.nextInt(10))).map { _ ⇒
+ rnd.nextInt(3) match {
+ case 0 ⇒ Delay(rnd.nextInt(500))
+ case 1 ⇒ Incr(key(), rnd.nextInt(100), consistency())
+ case 2 ⇒ Decr(key(), rnd.nextInt(10), consistency())
+ }
+ }.toVector
+ }
+
+}
+
+class ReplicatorDeltaSpecMultiJvmNode1 extends ReplicatorDeltaSpec
+class ReplicatorDeltaSpecMultiJvmNode2 extends ReplicatorDeltaSpec
+class ReplicatorDeltaSpecMultiJvmNode3 extends ReplicatorDeltaSpec
+class ReplicatorDeltaSpecMultiJvmNode4 extends ReplicatorDeltaSpec
+
+class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMultiNodeSpec with ImplicitSender {
+ import Replicator._
+ import ReplicatorDeltaSpec._
+
+ override def initialParticipants = roles.size
+
+ implicit val cluster = Cluster(system)
+ val fullStateReplicator = system.actorOf(Replicator.props(
+ ReplicatorSettings(system).withGossipInterval(1.second).withDeltaCrdtEnabled(false)), "fullStateReplicator")
+ val deltaReplicator = {
+ val r = system.actorOf(Replicator.props(ReplicatorSettings(system)), "deltaReplicator")
+ r ! Replicator.Internal.TestFullStateGossip(enabled = false)
+ r
+ }
+
+ var afterCounter = 0
+ def enterBarrierAfterTestStep(): Unit = {
+ afterCounter += 1
+ enterBarrier("after-" + afterCounter)
+ }
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ cluster join node(to).address
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ "delta-CRDT" must {
+ "join cluster" in {
+ join(first, first)
+ join(second, first)
+ join(third, first)
+ join(fourth, first)
+
+ within(15.seconds) {
+ awaitAssert {
+ fullStateReplicator ! GetReplicaCount
+ expectMsg(ReplicaCount(4))
+ }
+ }
+
+ enterBarrierAfterTestStep()
+ }
+
+ "propagate delta" in {
+ join(first, first)
+ join(second, first)
+ join(third, first)
+ join(fourth, first)
+
+ within(15.seconds) {
+ awaitAssert {
+ fullStateReplicator ! GetReplicaCount
+ expectMsg(ReplicaCount(4))
+ }
+ }
+ enterBarrier("ready")
+
+ runOn(first) {
+ fullStateReplicator ! Update(KeyA, PNCounter.empty, WriteLocal)(_ + 1)
+ deltaReplicator ! Update(KeyA, PNCounter.empty, WriteLocal)(_ + 1)
+ }
+ enterBarrier("updated-1")
+
+ within(5.seconds) {
+ awaitAssert {
+ val p = TestProbe()
+ deltaReplicator.tell(Get(KeyA, ReadLocal), p.ref)
+ p.expectMsgType[GetSuccess[PNCounter]].dataValue.getValue.intValue should be(1)
+ }
+ awaitAssert {
+ val p = TestProbe()
+ deltaReplicator.tell(Get(KeyA, ReadLocal), p.ref)
+ p.expectMsgType[GetSuccess[PNCounter]].dataValue.getValue.intValue should be(1)
+ }
+ }
+
+ enterBarrierAfterTestStep()
+ }
+
+ "be eventually consistent" in {
+ val operations = generateOperations()
+ log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")
+ try {
+ // perform random operations with both delta and full-state replicators
+ // and compare that the end result is the same
+
+ for (op ← operations) {
+ log.debug("operation: {}", op)
+ op match {
+ case Delay(d) ⇒ Thread.sleep(d)
+ case Incr(key, n, consistency) ⇒
+ fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ + n)
+ deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ + n)
+ case Decr(key, n, consistency) ⇒
+ fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ - n)
+ deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ - n)
+ }
+ }
+
+ enterBarrier("updated-2")
+
+ List(KeyA, KeyB, KeyC).foreach { key ⇒
+ within(5.seconds) {
+ awaitAssert {
+ val p = TestProbe()
+ fullStateReplicator.tell(Get(key, ReadLocal), p.ref)
+ val fullStateValue = p.expectMsgType[GetSuccess[PNCounter]].dataValue
+ deltaReplicator.tell(Get(key, ReadLocal), p.ref)
+ val deltaValue = p.expectMsgType[GetSuccess[PNCounter]].dataValue
+ deltaValue should ===(fullStateValue)
+ }
+ }
+ }
+
+ enterBarrierAfterTestStep()
+ } catch {
+ case e: Throwable ⇒
+ info(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")
+ throw e
+ }
+ }
+ }
+
+}
+
diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfDeltaReplicatedData.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfDeltaReplicatedData.java
new file mode 100644
index 0000000000..8db3842051
--- /dev/null
+++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfDeltaReplicatedData.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2017 Lightbend Inc.
+ */
+package akka.cluster.ddata;
+
+import akka.cluster.UniqueAddress;
+
+public class JavaImplOfDeltaReplicatedData extends AbstractDeltaReplicatedData implements
+ RemovedNodePruning {
+
+ @Override
+ public JavaImplOfDeltaReplicatedData mergeData(JavaImplOfDeltaReplicatedData other) {
+ return this;
+ }
+
+ @Override
+ public JavaImplOfDeltaReplicatedData delta() {
+ return this;
+ }
+
+ @Override
+ public JavaImplOfDeltaReplicatedData resetDelta() {
+ return this;
+ }
+
+ @Override
+ public scala.collection.immutable.Set modifiedByNodes() {
+ return akka.japi.Util.immutableSeq(new java.util.ArrayList()).toSet();
+ }
+
+ @Override
+ public boolean needPruningFrom(UniqueAddress removedNode) {
+ return false;
+ }
+
+ @Override
+ public JavaImplOfDeltaReplicatedData prune(UniqueAddress removedNode, UniqueAddress collapseInto) {
+ return this;
+ }
+
+ @Override
+ public JavaImplOfDeltaReplicatedData pruningCleanup(UniqueAddress removedNode) {
+ return this;
+ }
+}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala
new file mode 100644
index 0000000000..f58db38454
--- /dev/null
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala
@@ -0,0 +1,185 @@
+/**
+ * Copyright (C) 2017 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import akka.actor.Address
+import akka.cluster.ddata.Replicator.Internal.DataEnvelope
+import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
+import org.scalactic.TypeCheckedTripleEquals
+import org.scalatest.Matchers
+import org.scalatest.WordSpec
+
+object DeltaPropagationSelectorSpec {
+ class TestSelector(override val allNodes: Vector[Address]) extends DeltaPropagationSelector {
+ override val divisor = 5
+ override def createDeltaPropagation(deltas: Map[String, ReplicatedData]): DeltaPropagation =
+ DeltaPropagation(deltas.mapValues(d ⇒ DataEnvelope(d)))
+ }
+
+ val deltaA = GSet.empty[String] + "a"
+ val deltaB = GSet.empty[String] + "b"
+ val deltaC = GSet.empty[String] + "c"
+}
+
+class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheckedTripleEquals {
+ import DeltaPropagationSelectorSpec._
+ val nodes = (2500 until 2600).map(n ⇒ Address("akka", "Sys", "localhost", n)).toVector
+
+ "DeltaPropagationSelector" must {
+ "collect none when no nodes" in {
+ val selector = new TestSelector(Vector.empty)
+ selector.update("A", deltaA)
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(false)
+ }
+
+ "collect 1 when one node" in {
+ val selector = new TestSelector(nodes.take(1))
+ selector.update("A", deltaA)
+ selector.update("B", deltaB)
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(true)
+ selector.hasDeltaEntries("B") should ===(true)
+ val expected = DeltaPropagation(Map("A" → DataEnvelope(deltaA), "B" → DataEnvelope(deltaB)))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected))
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(false)
+ selector.hasDeltaEntries("B") should ===(false)
+ }
+
+ "collect 2+1 when three nodes" in {
+ val selector = new TestSelector(nodes.take(3))
+ selector.update("A", deltaA)
+ selector.update("B", deltaB)
+ val expected = DeltaPropagation(Map("A" → DataEnvelope(deltaA), "B" → DataEnvelope(deltaB)))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected, nodes(1) → expected))
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(true)
+ selector.hasDeltaEntries("B") should ===(true)
+ selector.collectPropagations() should ===(Map(nodes(2) → expected))
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(false)
+ selector.hasDeltaEntries("B") should ===(false)
+ }
+
+ "keep track of deltas per node" in {
+ val selector = new TestSelector(nodes.take(3))
+ selector.update("A", deltaA)
+ selector.update("B", deltaB)
+ val expected1 = DeltaPropagation(Map("A" → DataEnvelope(deltaA), "B" → DataEnvelope(deltaB)))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected1, nodes(1) → expected1))
+ // new update before previous was propagated to all nodes
+ selector.update("C", deltaC)
+ val expected2 = DeltaPropagation(Map("A" → DataEnvelope(deltaA), "B" → DataEnvelope(deltaB),
+ "C" → DataEnvelope(deltaC)))
+ val expected3 = DeltaPropagation(Map("C" → DataEnvelope(deltaC)))
+ selector.collectPropagations() should ===(Map(nodes(2) → expected2, nodes(0) → expected3))
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("A") should ===(false)
+ selector.hasDeltaEntries("B") should ===(false)
+ selector.hasDeltaEntries("C") should ===(true)
+ selector.collectPropagations() should ===(Map(nodes(1) → expected3))
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ selector.cleanupDeltaEntries()
+ selector.hasDeltaEntries("C") should ===(false)
+ }
+
+ "merge updates that occur within same tick" in {
+ val delta1 = GSet.empty[String] + "a1"
+ val delta2 = GSet.empty[String] + "a2"
+ val delta3 = GSet.empty[String] + "a3"
+ val selector = new TestSelector(nodes.take(1))
+ selector.update("A", delta1)
+ selector.update("A", delta2)
+ val expected1 = DeltaPropagation(Map("A" → DataEnvelope(delta1.merge(delta2))))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected1))
+ selector.update("A", delta3)
+ val expected2 = DeltaPropagation(Map("A" → DataEnvelope(delta3)))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected2))
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ }
+
+ "merge deltas" in {
+ val delta1 = GSet.empty[String] + "a1"
+ val delta2 = GSet.empty[String] + "a2"
+ val delta3 = GSet.empty[String] + "a3"
+ val selector = new TestSelector(nodes.take(3)) {
+ override def nodesSliceSize(allNodesSize: Int): Int = 1
+ }
+ selector.update("A", delta1)
+ val expected1 = DeltaPropagation(Map("A" → DataEnvelope(delta1)))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected1))
+
+ selector.update("A", delta2)
+ val expected2 = DeltaPropagation(Map("A" → DataEnvelope(delta1.merge(delta2))))
+ selector.collectPropagations() should ===(Map(nodes(1) → expected2))
+
+ selector.update("A", delta3)
+ val expected3 = DeltaPropagation(Map("A" → DataEnvelope(delta1.merge(delta2).merge(delta3))))
+ selector.collectPropagations() should ===(Map(nodes(2) → expected3))
+
+ val expected4 = DeltaPropagation(Map("A" → DataEnvelope(delta2.merge(delta3))))
+ selector.collectPropagations() should ===(Map(nodes(0) → expected4))
+
+ val expected5 = DeltaPropagation(Map("A" → DataEnvelope(delta3)))
+ selector.collectPropagations() should ===(Map(nodes(1) → expected5))
+
+ selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
+ }
+
+ "calcualte right slice size" in {
+ val selector = new TestSelector(nodes)
+ selector.nodesSliceSize(0) should ===(0)
+ selector.nodesSliceSize(1) should ===(1)
+ (2 to 9).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(2)
+ }
+ }
+ (10 to 14).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(3)
+ }
+ }
+ (15 to 19).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(4)
+ }
+ }
+ (20 to 24).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(5)
+ }
+ }
+ (25 to 29).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(6)
+ }
+ }
+ (30 to 34).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(7)
+ }
+ }
+ (35 to 39).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(8)
+ }
+ }
+ (40 to 44).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(9)
+ }
+ }
+ (45 to 200).foreach { n ⇒
+ withClue(s"n=$n") {
+ selector.nodesSliceSize(n) should ===(10)
+ }
+ }
+ }
+ }
+}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
index 46a54f3661..9f7402ca65 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala
@@ -11,9 +11,9 @@ import org.scalatest.Matchers
import org.scalatest.WordSpec
class GCounterSpec extends WordSpec with Matchers {
- val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
- val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2)
- val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3)
+ val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
+ val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
+ val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)
"A GCounter" must {
@@ -25,10 +25,14 @@ class GCounterSpec extends WordSpec with Matchers {
val c4 = c3 increment node2
val c5 = c4 increment node2
- val c6 = c5 increment node2
+ val c6 = c5.resetDelta increment node2
c6.state(node1) should be(2)
c6.state(node2) should be(3)
+
+ c2.delta.state(node1) should be(1)
+ c3.delta.state(node1) should be(2)
+ c6.delta.state(node2) should be(3)
}
"be able to increment each node's record by arbitrary delta" in {
@@ -74,7 +78,7 @@ class GCounterSpec extends WordSpec with Matchers {
c16.state(node2) should be(10)
c16.value should be(17)
- // counter 1
+ // counter 2
val c21 = GCounter()
val c22 = c21 increment (node1, 2)
val c23 = c22 increment (node1, 2)
@@ -91,11 +95,13 @@ class GCounterSpec extends WordSpec with Matchers {
merged1.state(node1) should be(7)
merged1.state(node2) should be(10)
merged1.value should be(17)
+ merged1.delta should be(GCounter.empty)
val merged2 = c26 merge c16
merged2.state(node1) should be(7)
merged2.state(node2) should be(10)
merged2.value should be(17)
+ merged2.delta should be(GCounter.empty)
}
"be able to have its history correctly merged with another GCounter 2" in {
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
index 4d56d7dee6..40648ea29f 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala
@@ -11,8 +11,8 @@ import org.scalatest.Matchers
import org.scalatest.WordSpec
class PNCounterSpec extends WordSpec with Matchers {
- val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
- val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2)
+ val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
+ val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
"A PNCounter" must {
@@ -24,10 +24,18 @@ class PNCounterSpec extends WordSpec with Matchers {
val c4 = c3 increment node2
val c5 = c4 increment node2
- val c6 = c5 increment node2
+ val c6 = c5.resetDelta increment node2
c6.increments.state(node1) should be(2)
c6.increments.state(node2) should be(3)
+
+ c2.delta.value.toLong should be(1)
+ c2.delta.increments.state(node1) should be(1)
+ c3.delta.value should be(2)
+ c3.delta.increments.state(node1) should be(2)
+
+ c6.delta.value should be(3)
+ c6.delta.increments.state(node2) should be(3)
}
"be able to decrement each node's record by one" in {
@@ -38,10 +46,16 @@ class PNCounterSpec extends WordSpec with Matchers {
val c4 = c3 decrement node2
val c5 = c4 decrement node2
- val c6 = c5 decrement node2
+ val c6 = c5.resetDelta decrement node2
c6.decrements.state(node1) should be(2)
c6.decrements.state(node2) should be(3)
+
+ c3.delta.value should be(-2)
+ c3.delta.decrements.state(node1) should be(2)
+
+ c6.delta.value should be(-3)
+ c6.delta.decrements.state(node2) should be(3)
}
"be able to increment each node's record by arbitrary delta" in {
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
index 46bdf27e7b..ea32de0d1b 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
@@ -24,6 +24,7 @@ import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
+import akka.cluster.ddata.GCounter
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec",
@@ -58,6 +59,8 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"serialize Replicator messages" in {
val ref1 = system.actorOf(Props.empty, "ref1")
val data1 = GSet.empty[String] + "a"
+ val delta1 = GCounter.empty.increment(address1, 17).increment(address2, 2)
+ val delta2 = delta1.increment(address2, 1)
checkSerialization(Get(keyA, ReadLocal))
checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x")))
@@ -84,6 +87,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Gossip(Map(
"A" → DataEnvelope(data1),
"B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true))
+ checkSerialization(DeltaPropagation(Map(
+ "A" → DataEnvelope(delta1),
+ "B" → DataEnvelope(delta2))))
checkSerialization(new DurableDataEnvelope(data1))
checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map(
address1 → PruningPerformed(System.currentTimeMillis()),
diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst
index 777c0605cc..df9444a4d7 100644
--- a/akka-docs/rst/java/distributed-data.rst
+++ b/akka-docs/rst/java/distributed-data.rst
@@ -38,7 +38,9 @@ The ``akka.cluster.ddata.Replicator`` actor provides the API for interacting wit
The ``Replicator`` actor must be started on each node in the cluster, or group of nodes tagged
with a specific role. It communicates with other ``Replicator`` instances with the same path
(without address) that are running on other nodes . For convenience it can be used with the
-``akka.cluster.ddata.DistributedData`` extension.
+``akka.cluster.ddata.DistributedData`` extension but it can also be started as an ordinary
+actor using the ``Replicator.props``. If it is started as an ordinary actor it is important
+that it is given the same name, started on same path, on all nodes.
Cluster members with status :ref:`WeaklyUp `, if that feature is enabled,
will participate in Distributed Data. This means that the data will be replicated to the
@@ -256,14 +258,38 @@ Subscribers will receive ``Replicator.DataDeleted``.
where frequent adds and removes are required, you should use a fixed number of top-level data
types that support both updates and removals, for example ``ORMap`` or ``ORSet``.
+.. _delta_crdt_java:
+
+delta-CRDT
+----------
+
+`Delta State Replicated Data Types `_
+are supported. delta-CRDT is a way to reduce the need for sending the full state
+for updates. For example adding element ``'c'`` and ``'d'`` to set ``{'a', 'b'}`` would
+result in sending the delta ``{'c', 'd'}`` and merge that with the state on the
+receiving side, resulting in set ``{'a', 'b', 'c', 'd'}``.
+
+Current protocol for replicating the deltas does not support causal consistency.
+It is only eventually consistent. This means that if elements ``'c'`` and ``'d'`` are
+added in two separate `Update` operations these deltas may occasionally be propagated
+to nodes in different order than the causal order of the updates. For this example it
+can result in that set ``{'a', 'b', 'd'}`` can be seen before element 'c' is seen. Eventually
+it will be ``{'a', 'b', 'c', 'd'}``. If causal consistency is needed the delta propagation
+should be disabled with configuration property
+``akka.cluster.distributed-data.delta-crdt.enabled=off``.
+
+Note that the full state is occasionally also replicated for delta-CRDTs, for example when
+new nodes are added to the cluster or when deltas could not be propagated because
+of network partitions or similar problems.
+
Data Types
==========
The data types must be convergent (stateful) CRDTs and implement the ``ReplicatedData`` trait,
i.e. they provide a monotonic merge function and the state changes always converge.
-You can use your own custom ``ReplicatedData`` types, and several types are provided
-by this package, such as:
+You can use your own custom ``AbstractReplicatedData`` or ``AbstractDeltaReplicatedData`` types,
+and several types are provided by this package, such as:
* Counters: ``GCounter``, ``PNCounter``
* Sets: ``GSet``, ``ORSet``
@@ -287,6 +313,8 @@ The value of the counter is the value of the P counter minus the value of the N
.. includecode:: code/docs/ddata/DistributedDataDocTest.java#pncounter
+``GCounter`` and ``PNCounter`` have support for :ref:`delta_crdt_java`.
+
Several related counters can be managed in a map with the ``PNCounterMap`` data type.
When the counters are placed in a ``PNCounterMap`` as opposed to placing them as separate top level
values they are guaranteed to be replicated together as one unit, which is sometimes necessary for
@@ -406,6 +434,8 @@ removed, but never added again thereafter.
Data types should be immutable, i.e. "modifying" methods should return a new instance.
+Implement the additional methods of ``AbstractDeltaReplicatedData`` if it has support for delta-CRDT replication.
+
Serialization
^^^^^^^^^^^^^
@@ -563,11 +593,11 @@ be able to improve this if needed, but the design is still not intended for bill
All data is held in memory, which is another reason why it is not intended for *Big Data*.
-When a data entry is changed the full state of that entry is replicated to other nodes. For example,
-if you add one element to a Set with 100 existing elements, all 101 elements are transferred to
-other nodes. This means that you cannot have too large data entries, because then the remote message
-size will be too large. We might be able to make this more efficient by implementing
-`Efficient State-based CRDTs by Delta-Mutation `_.
+When a data entry is changed the full state of that entry may be replicated to other nodes
+if it doesn't support :ref:`delta_crdt_java`. The full state is also replicated for delta-CRDTs,
+for example when new nodes are added to the cluster or when deltas could not be propagated because
+of network partitions or similar problems. This means that you cannot have too large
+data entries, because then the remote message size will be too large.
Learn More about CRDTs
======================
diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst
index 4033e17e51..e49b17753b 100644
--- a/akka-docs/rst/scala/distributed-data.rst
+++ b/akka-docs/rst/scala/distributed-data.rst
@@ -38,7 +38,9 @@ The ``akka.cluster.ddata.Replicator`` actor provides the API for interacting wit
The ``Replicator`` actor must be started on each node in the cluster, or group of nodes tagged
with a specific role. It communicates with other ``Replicator`` instances with the same path
(without address) that are running on other nodes . For convenience it can be used with the
-``akka.cluster.ddata.DistributedData`` extension.
+``akka.cluster.ddata.DistributedData`` extension but it can also be started as an ordinary
+actor using the ``Replicator.props``. If it is started as an ordinary actor it is important
+that it is given the same name, started on same path, on all nodes.
Cluster members with status :ref:`WeaklyUp `, if that feature is enabled,
will participate in Distributed Data. This means that the data will be replicated to the
@@ -268,13 +270,37 @@ to after receiving and transforming `DeleteSuccess`.
where frequent adds and removes are required, you should use a fixed number of top-level data
types that support both updates and removals, for example ``ORMap`` or ``ORSet``.
+.. _delta_crdt_scala:
+
+delta-CRDT
+----------
+
+`Delta State Replicated Data Types `_
+are supported. delta-CRDT is a way to reduce the need for sending the full state
+for updates. For example adding element ``'c'`` and ``'d'`` to set ``{'a', 'b'}`` would
+result in sending the delta ``{'c', 'd'}`` and merge that with the state on the
+receiving side, resulting in set ``{'a', 'b', 'c', 'd'}``.
+
+Current protocol for replicating the deltas does not support causal consistency.
+It is only eventually consistent. This means that if elements ``'c'`` and ``'d'`` are
+added in two separate `Update` operations these deltas may occasionally be propagated
+to nodes in different order than the causal order of the updates. For this example it
+can result in that set ``{'a', 'b', 'd'}`` can be seen before element 'c' is seen. Eventually
+it will be ``{'a', 'b', 'c', 'd'}``. If causal consistency is needed the delta propagation
+should be disabled with configuration property
+``akka.cluster.distributed-data.delta-crdt.enabled=off``.
+
+Note that the full state is occasionally also replicated for delta-CRDTs, for example when
+new nodes are added to the cluster or when deltas could not be propagated because
+of network partitions or similar problems.
+
Data Types
==========
The data types must be convergent (stateful) CRDTs and implement the ``ReplicatedData`` trait,
i.e. they provide a monotonic merge function and the state changes always converge.
-You can use your own custom ``ReplicatedData`` types, and several types are provided
+You can use your own custom ``ReplicatedData`` or ``DeltaReplicatedData`` types, and several types are provided
by this package, such as:
* Counters: ``GCounter``, ``PNCounter``
@@ -299,6 +325,8 @@ The value of the counter is the value of the P counter minus the value of the N
.. includecode:: code/docs/ddata/DistributedDataDocSpec.scala#pncounter
+``GCounter`` and ``PNCounter`` have support for :ref:`delta_crdt_scala`.
+
Several related counters can be managed in a map with the ``PNCounterMap`` data type.
When the counters are placed in a ``PNCounterMap`` as opposed to placing them as separate top level
values they are guaranteed to be replicated together as one unit, which is sometimes necessary for
@@ -418,6 +446,8 @@ removed, but never added again thereafter.
Data types should be immutable, i.e. "modifying" methods should return a new instance.
+Implement the additional methods of ``DeltaReplicatedData`` if it has support for delta-CRDT replication.
+
Serialization
^^^^^^^^^^^^^
@@ -575,11 +605,11 @@ be able to improve this if needed, but the design is still not intended for bill
All data is held in memory, which is another reason why it is not intended for *Big Data*.
-When a data entry is changed the full state of that entry is replicated to other nodes. For example,
-if you add one element to a Set with 100 existing elements, all 101 elements are transferred to
-other nodes. This means that you cannot have too large data entries, because then the remote message
-size will be too large. We might be able to make this more efficient by implementing
-`Efficient State-based CRDTs by Delta-Mutation `_.
+When a data entry is changed the full state of that entry may be replicated to other nodes
+if it doesn't support :ref:`delta_crdt_scala`. The full state is also replicated for delta-CRDTs,
+for example when new nodes are added to the cluster or when deltas could not be propagated because
+of network partitions or similar problems. This means that you cannot have too large
+data entries, because then the remote message size will be too large.
Learn More about CRDTs
======================
diff --git a/project/MiMa.scala b/project/MiMa.scala
index 3cf1ab73d6..5a57c4176e 100644
--- a/project/MiMa.scala
+++ b/project/MiMa.scala
@@ -82,6 +82,9 @@ object MiMa extends AutoPlugin {
import com.typesafe.tools.mima.core._
val bcIssuesBetween24and25 = Seq(
+
+ // #21875 delta-CRDT
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),
// #21423 Remove deprecated metrics
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"),
@@ -168,6 +171,12 @@ object MiMa extends AutoPlugin {
FilterAnyProblemStartingWith("akka.cluster.ddata.Replicator"),
FilterAnyProblemStartingWith("akka.cluster.ddata.protobuf.msg"),
+ // #21647 pruning
+ FilterAnyProblemStartingWith("akka.cluster.ddata.PruningState"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.RemovedNodePruning.usingNodes"),
+ FilterAnyProblemStartingWith("akka.cluster.ddata.Replicator"),
+ FilterAnyProblemStartingWith("akka.cluster.ddata.protobuf.msg"),
+
// #21537 coordinated shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"),