diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java
index 2cbf071334..6252502e9c 100644
--- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java
+++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java
@@ -10906,19 +10906,30 @@ public final class ReplicatedDataMessages {
*/
int getZeroTag();
- // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- boolean hasEntryData();
+ java.util.List
+ getEntryDataList();
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData();
+ akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index);
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder();
+ int getEntryDataCount();
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>
+ getEntryDataOrBuilderList();
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder(
+ int index);
}
/**
* Protobuf type {@code akka.cluster.ddata.ORMapDeltaGroup.Entry}
@@ -11001,16 +11012,11 @@ public final class ReplicatedDataMessages {
break;
}
case 34: {
- akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder subBuilder = null;
- if (((bitField0_ & 0x00000008) == 0x00000008)) {
- subBuilder = entryData_.toBuilder();
+ if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ entryData_ = new java.util.ArrayList();
+ mutable_bitField0_ |= 0x00000008;
}
- entryData_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.PARSER, extensionRegistry);
- if (subBuilder != null) {
- subBuilder.mergeFrom(entryData_);
- entryData_ = subBuilder.buildPartial();
- }
- bitField0_ |= 0x00000008;
+ entryData_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.PARSER, extensionRegistry));
break;
}
}
@@ -11021,6 +11027,9 @@ public final class ReplicatedDataMessages {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
+ if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ entryData_ = java.util.Collections.unmodifiableList(entryData_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -11107,33 +11116,47 @@ public final class ReplicatedDataMessages {
return zeroTag_;
}
- // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
public static final int ENTRYDATA_FIELD_NUMBER = 4;
- private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry entryData_;
+ private java.util.List entryData_;
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public boolean hasEntryData() {
- return ((bitField0_ & 0x00000008) == 0x00000008);
- }
- /**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
- */
- public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData() {
+ public java.util.List getEntryDataList() {
return entryData_;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder() {
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>
+ getEntryDataOrBuilderList() {
return entryData_;
}
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public int getEntryDataCount() {
+ return entryData_.size();
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index) {
+ return entryData_.get(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder(
+ int index) {
+ return entryData_.get(index);
+ }
private void initFields() {
operation_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaOp.ORMapPut;
underlying_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance();
zeroTag_ = 0;
- entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance();
+ entryData_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -11156,8 +11179,8 @@ public final class ReplicatedDataMessages {
memoizedIsInitialized = 0;
return false;
}
- if (hasEntryData()) {
- if (!getEntryData().isInitialized()) {
+ for (int i = 0; i < getEntryDataCount(); i++) {
+ if (!getEntryData(i).isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
@@ -11178,8 +11201,8 @@ public final class ReplicatedDataMessages {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeSInt32(3, zeroTag_);
}
- if (((bitField0_ & 0x00000008) == 0x00000008)) {
- output.writeMessage(4, entryData_);
+ for (int i = 0; i < entryData_.size(); i++) {
+ output.writeMessage(4, entryData_.get(i));
}
getUnknownFields().writeTo(output);
}
@@ -11202,9 +11225,9 @@ public final class ReplicatedDataMessages {
size += akka.protobuf.CodedOutputStream
.computeSInt32Size(3, zeroTag_);
}
- if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ for (int i = 0; i < entryData_.size(); i++) {
size += akka.protobuf.CodedOutputStream
- .computeMessageSize(4, entryData_);
+ .computeMessageSize(4, entryData_.get(i));
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@@ -11335,11 +11358,11 @@ public final class ReplicatedDataMessages {
zeroTag_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
if (entryDataBuilder_ == null) {
- entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance();
+ entryData_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000008);
} else {
entryDataBuilder_.clear();
}
- bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -11384,10 +11407,11 @@ public final class ReplicatedDataMessages {
to_bitField0_ |= 0x00000004;
}
result.zeroTag_ = zeroTag_;
- if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
- to_bitField0_ |= 0x00000008;
- }
if (entryDataBuilder_ == null) {
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ entryData_ = java.util.Collections.unmodifiableList(entryData_);
+ bitField0_ = (bitField0_ & ~0x00000008);
+ }
result.entryData_ = entryData_;
} else {
result.entryData_ = entryDataBuilder_.build();
@@ -11417,8 +11441,31 @@ public final class ReplicatedDataMessages {
if (other.hasZeroTag()) {
setZeroTag(other.getZeroTag());
}
- if (other.hasEntryData()) {
- mergeEntryData(other.getEntryData());
+ if (entryDataBuilder_ == null) {
+ if (!other.entryData_.isEmpty()) {
+ if (entryData_.isEmpty()) {
+ entryData_ = other.entryData_;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ } else {
+ ensureEntryDataIsMutable();
+ entryData_.addAll(other.entryData_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.entryData_.isEmpty()) {
+ if (entryDataBuilder_.isEmpty()) {
+ entryDataBuilder_.dispose();
+ entryDataBuilder_ = null;
+ entryData_ = other.entryData_;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ entryDataBuilder_ =
+ akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getEntryDataFieldBuilder() : null;
+ } else {
+ entryDataBuilder_.addAllMessages(other.entryData_);
+ }
+ }
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
@@ -11441,8 +11488,8 @@ public final class ReplicatedDataMessages {
return false;
}
- if (hasEntryData()) {
- if (!getEntryData().isInitialized()) {
+ for (int i = 0; i < getEntryDataCount(); i++) {
+ if (!getEntryData(i).isInitialized()) {
return false;
}
@@ -11655,116 +11702,239 @@ public final class ReplicatedDataMessages {
return this;
}
- // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
- private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance();
- private akka.protobuf.SingleFieldBuilder<
- akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder> entryDataBuilder_;
- /**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
- */
- public boolean hasEntryData() {
- return ((bitField0_ & 0x00000008) == 0x00000008);
+ // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ private java.util.List entryData_ =
+ java.util.Collections.emptyList();
+ private void ensureEntryDataIsMutable() {
+ if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+ entryData_ = new java.util.ArrayList(entryData_);
+ bitField0_ |= 0x00000008;
+ }
}
+
+ private akka.protobuf.RepeatedFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder> entryDataBuilder_;
+
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData() {
+ public java.util.List getEntryDataList() {
if (entryDataBuilder_ == null) {
- return entryData_;
+ return java.util.Collections.unmodifiableList(entryData_);
} else {
- return entryDataBuilder_.getMessage();
+ return entryDataBuilder_.getMessageList();
}
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public Builder setEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) {
+ public int getEntryDataCount() {
+ if (entryDataBuilder_ == null) {
+ return entryData_.size();
+ } else {
+ return entryDataBuilder_.getCount();
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index) {
+ if (entryDataBuilder_ == null) {
+ return entryData_.get(index);
+ } else {
+ return entryDataBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public Builder setEntryData(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) {
if (entryDataBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
- entryData_ = value;
+ ensureEntryDataIsMutable();
+ entryData_.set(index, value);
onChanged();
} else {
- entryDataBuilder_.setMessage(value);
+ entryDataBuilder_.setMessage(index, value);
}
- bitField0_ |= 0x00000008;
return this;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
public Builder setEntryData(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) {
+ if (entryDataBuilder_ == null) {
+ ensureEntryDataIsMutable();
+ entryData_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ entryDataBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public Builder addEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) {
+ if (entryDataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureEntryDataIsMutable();
+ entryData_.add(value);
+ onChanged();
+ } else {
+ entryDataBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public Builder addEntryData(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) {
+ if (entryDataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureEntryDataIsMutable();
+ entryData_.add(index, value);
+ onChanged();
+ } else {
+ entryDataBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public Builder addEntryData(
akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) {
if (entryDataBuilder_ == null) {
- entryData_ = builderForValue.build();
+ ensureEntryDataIsMutable();
+ entryData_.add(builderForValue.build());
onChanged();
} else {
- entryDataBuilder_.setMessage(builderForValue.build());
+ entryDataBuilder_.addMessage(builderForValue.build());
}
- bitField0_ |= 0x00000008;
return this;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public Builder mergeEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) {
+ public Builder addEntryData(
+ int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) {
if (entryDataBuilder_ == null) {
- if (((bitField0_ & 0x00000008) == 0x00000008) &&
- entryData_ != akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance()) {
- entryData_ =
- akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.newBuilder(entryData_).mergeFrom(value).buildPartial();
- } else {
- entryData_ = value;
- }
+ ensureEntryDataIsMutable();
+ entryData_.add(index, builderForValue.build());
onChanged();
} else {
- entryDataBuilder_.mergeFrom(value);
+ entryDataBuilder_.addMessage(index, builderForValue.build());
}
- bitField0_ |= 0x00000008;
return this;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public Builder addAllEntryData(
+ java.lang.Iterable extends akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry> values) {
+ if (entryDataBuilder_ == null) {
+ ensureEntryDataIsMutable();
+ super.addAll(values, entryData_);
+ onChanged();
+ } else {
+ entryDataBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
public Builder clearEntryData() {
if (entryDataBuilder_ == null) {
- entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance();
+ entryData_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000008);
onChanged();
} else {
entryDataBuilder_.clear();
}
- bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder getEntryDataBuilder() {
- bitField0_ |= 0x00000008;
- onChanged();
- return getEntryDataFieldBuilder().getBuilder();
+ public Builder removeEntryData(int index) {
+ if (entryDataBuilder_ == null) {
+ ensureEntryDataIsMutable();
+ entryData_.remove(index);
+ onChanged();
+ } else {
+ entryDataBuilder_.remove(index);
+ }
+ return this;
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder() {
- if (entryDataBuilder_ != null) {
- return entryDataBuilder_.getMessageOrBuilder();
- } else {
- return entryData_;
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder getEntryDataBuilder(
+ int index) {
+ return getEntryDataFieldBuilder().getBuilder(index);
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder(
+ int index) {
+ if (entryDataBuilder_ == null) {
+ return entryData_.get(index); } else {
+ return entryDataBuilder_.getMessageOrBuilder(index);
}
}
/**
- * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
*/
- private akka.protobuf.SingleFieldBuilder<
+ public java.util.List extends akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>
+ getEntryDataOrBuilderList() {
+ if (entryDataBuilder_ != null) {
+ return entryDataBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(entryData_);
+ }
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder addEntryDataBuilder() {
+ return getEntryDataFieldBuilder().addBuilder(
+ akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder addEntryDataBuilder(
+ int index) {
+ return getEntryDataFieldBuilder().addBuilder(
+ index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance());
+ }
+ /**
+ * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4;
+ */
+ public java.util.List
+ getEntryDataBuilderList() {
+ return getEntryDataFieldBuilder().getBuilderList();
+ }
+ private akka.protobuf.RepeatedFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>
getEntryDataFieldBuilder() {
if (entryDataBuilder_ == null) {
- entryDataBuilder_ = new akka.protobuf.SingleFieldBuilder<
+ entryDataBuilder_ = new akka.protobuf.RepeatedFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>(
entryData_,
+ ((bitField0_ & 0x00000008) == 0x00000008),
getParentForChildren(),
isClean());
entryData_ = null;
@@ -18398,7 +18568,7 @@ public final class ReplicatedDataMessages {
"erMessage\032\275\001\n\005Entry\0223\n\toperation\030\001 \002(\0162 ",
".akka.cluster.ddata.ORMapDeltaOp\022-\n\nunde" +
"rlying\030\002 \002(\0132\031.akka.cluster.ddata.ORSet\022" +
- "\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryData\030\004 \001(\0132,.ak" +
+ "\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryData\030\004 \003(\0132,.ak" +
"ka.cluster.ddata.ORMapDeltaGroup.MapEntr" +
"y\"\206\002\n\006LWWMap\022\'\n\004keys\030\001 \002(\0132\031.akka.cluste" +
"r.ddata.ORSet\0221\n\007entries\030\002 \003(\0132 .akka.cl" +
diff --git a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto
index ca6b2fbdca..d02371b4fa 100644
--- a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto
+++ b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto
@@ -87,7 +87,7 @@ message ORMapDeltaGroup {
required ORMapDeltaOp operation = 1;
required ORSet underlying = 2;
required sint32 zeroTag = 3;
- optional MapEntry entryData = 4;
+ repeated MapEntry entryData = 4;
}
repeated Entry entries = 1;
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
index 2af60a8aad..3a4d34e54d 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala
@@ -6,10 +6,18 @@ package akka.cluster.ddata
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.annotation.InternalApi
-import akka.cluster.ddata.ORMap.LWWMapTag
+import akka.cluster.ddata.ORMap.ZeroTag
object LWWMap {
- private val _empty: LWWMap[Any, Any] = new LWWMap(ORMap.emptyWithLWWMapTag)
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[akka] case object LWWMapTag extends ZeroTag {
+ override def zero: DeltaReplicatedData = LWWMap.empty
+ override final val value: Int = 4
+ }
+
+ private val _empty: LWWMap[Any, Any] = new LWWMap(new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag))
def empty[A, B]: LWWMap[A, B] = _empty.asInstanceOf[LWWMap[A, B]]
def apply(): LWWMap[Any, Any] = _empty
/**
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
index 4f1fe0f478..b817a41955 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
@@ -12,7 +12,7 @@ import akka.cluster.ddata.ORMap.{ AtomicDeltaOp, ZeroTag }
import scala.collection.immutable
object ORMap {
- private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty)
+ private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty, VanillaORMapTag)
def empty[A, B <: ReplicatedData]: ORMap[A, B] = _empty.asInstanceOf[ORMap[A, B]]
def apply(): ORMap[Any, ReplicatedData] = _empty
/**
@@ -30,26 +30,6 @@ object ORMap {
override def zero: DeltaReplicatedData
}
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] def emptyWithPNCounterMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag)
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] def emptyWithORMultiMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag)
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] def emptyWithORMultiMapWithValueDeltasTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag)
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] def emptyWithLWWMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag)
-
/**
* INTERNAL API
* Tags for ORMap.DeltaOp's, so that when the Replicator needs to re-create full value from delta,
@@ -68,38 +48,6 @@ object ORMap {
override final val value: Int = 0
}
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] case object PNCounterMapTag extends ZeroTag {
- override def zero: DeltaReplicatedData = PNCounterMap.empty
- override final val value: Int = 1
- }
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] case object ORMultiMapTag extends ZeroTag {
- override def zero: DeltaReplicatedData = ORMultiMap.empty
- override final val value: Int = 2
- }
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag {
- override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas
- override final val value: Int = 3
- }
-
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] case object LWWMapTag extends ZeroTag {
- override def zero: DeltaReplicatedData = LWWMap.empty
- override final val value: Int = 4
- }
-
/**
* INTERNAL API
*/
@@ -107,7 +55,6 @@ object ORMap {
def underlying: ORSet.DeltaOp
def zeroTag: ZeroTag
override def zero: DeltaReplicatedData = zeroTag.zero
-
override def merge(that: DeltaOp): DeltaOp = that match {
case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other))
case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops)
@@ -116,30 +63,79 @@ object ORMap {
// PutDeltaOp contains ORSet delta and full value
/** INTERNAL API */
- @InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
+ @InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] {
+ override def merge(that: DeltaOp): DeltaOp = that match {
+ case put: PutDeltaOp[A, B] if this.value._1 == put.value._1 ⇒
+ new PutDeltaOp[A, B](this.underlying.merge(put.underlying), put.value, zeroTag)
+ case update: UpdateDeltaOp[A, B] if update.values.size == 1 && update.values.contains(this.value._1) ⇒
+ val (key, elem1) = this.value
+ val newValue = elem1 match {
+ case e1: DeltaReplicatedData ⇒
+ val e2 = update.values.head._2.asInstanceOf[e1.D]
+ (key, e1.mergeDelta(e2).asInstanceOf[B])
+ case _ ⇒
+ val elem2 = update.values.head._2.asInstanceOf[elem1.T]
+ (key, elem1.merge(elem2).asInstanceOf[B])
+ }
+ new PutDeltaOp[A, B](this.underlying.merge(update.underlying), newValue, zeroTag)
+ case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other))
+ case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops)
+ }
}
// UpdateDeltaOp contains ORSet delta and either delta of value (in case where underlying type supports deltas) or full value
/** INTERNAL API */
- @InternalApi private[akka] final case class UpdateDeltaOp[A, X <: ReplicatedDelta](underlying: ORSet.DeltaOp, values: Map[A, X], zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, X] {
+ @InternalApi private[akka] final case class UpdateDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, values: Map[A, B], zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] {
+ override def merge(that: DeltaOp): DeltaOp = that match {
+ case update: UpdateDeltaOp[A, B] ⇒
+ new UpdateDeltaOp[A, B](
+ this.underlying.merge(update.underlying),
+ update.values.foldLeft(this.values) {
+ (map, pair) ⇒
+ val (key, value) = pair
+ if (this.values.contains(key)) {
+ val elem1 = this.values(key)
+ val elem2 = value.asInstanceOf[elem1.T]
+ map + (key → elem1.merge(elem2).asInstanceOf[B])
+ } else map + pair
+ },
+ zeroTag)
+ case put: PutDeltaOp[A, B] if this.values.size == 1 && this.values.contains(put.value._1) ⇒
+ new PutDeltaOp[A, B](this.underlying.merge(put.underlying), put.value, zeroTag)
+ case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other))
+ case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops)
+ }
}
- // RemoveDeltaOp does not contain any value at all - the propagated 'value' map is empty
+ // RemoveDeltaOp does not contain any value at all - the propagated 'value' map would be empty
/** INTERNAL API */
- @InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
- }
+ @InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag) extends AtomicDeltaOp[A, B]
// RemoveKeyDeltaOp contains a single value - to provide the recipient with the removed key for value map
/** INTERNAL API */
- @InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
- }
+ @InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag) extends AtomicDeltaOp[A, B]
// DeltaGroup is effectively a causally ordered list of individual deltas
/** INTERNAL API */
@InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
override def merge(that: DeltaOp): DeltaOp = that match {
- case DeltaGroup(thatOps) ⇒ DeltaGroup(ops ++ thatOps)
- case that: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops :+ that)
+ case that: AtomicDeltaOp[A, B] ⇒
+ ops.last match {
+ case thisPut: PutDeltaOp[A, B] ⇒
+ val merged = thisPut.merge(that)
+ merged match {
+ case op: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops.dropRight(1) :+ op)
+ case DeltaGroup(thatOps) ⇒ DeltaGroup(ops.dropRight(1) ++ thatOps)
+ }
+ case thisUpdate: UpdateDeltaOp[A, B] ⇒
+ val merged = thisUpdate.merge(that)
+ merged match {
+ case op: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops.dropRight(1) :+ op)
+ case DeltaGroup(thatOps) ⇒ DeltaGroup(ops.dropRight(1) ++ thatOps)
+ }
+ case _ ⇒ DeltaGroup(ops :+ that)
+ }
+ case DeltaGroup(thatOps) ⇒ DeltaGroup(ops ++ thatOps)
}
override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero)
@@ -158,7 +154,7 @@ object ORMap {
final class ORMap[A, B <: ReplicatedData] private[akka] (
private[akka] val keys: ORSet[A],
private[akka] val values: Map[A, B],
- private[akka] val zeroTag: ZeroTag = ORMap.VanillaORMapTag,
+ private[akka] val zeroTag: ZeroTag,
override val delta: Option[ORMap.DeltaOp] = None)
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
@@ -390,12 +386,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
case removeKeyOp: RemoveKeyDeltaOp[A, B] ⇒
tombstonedVals = tombstonedVals + removeKeyOp.removedKey
case updateOp: UpdateDeltaOp[A, _] ⇒
- val key = updateOp.values.head._1
- val value = (key, updateOp.values.head._2)
- if (thatValueDeltas.contains(key))
- thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ value))
- else
- thatValueDeltas += (key → (value :: Nil))
+ updateOp.values.foreach {
+ case (key, value) ⇒
+ if (thatValueDeltas.contains(key))
+ thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ (key, value)))
+ else
+ thatValueDeltas += (key → ((key, value) :: Nil))
+ }
}
val processNestedDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
index aa1c022d95..25651954a2 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala
@@ -8,9 +8,24 @@ import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap._
object ORMultiMap {
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[akka] case object ORMultiMapTag extends ZeroTag {
+ override def zero: DeltaReplicatedData = ORMultiMap.empty
+ override final val value: Int = 2
+ }
- val _empty: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, false)
- val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, true)
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag {
+ override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas
+ override final val value: Int = 3
+ }
+
+ val _empty: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag), false)
+ val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag), true)
/**
* Provides an empty multimap.
*/
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
index 694616edb2..e4ea85e70d 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala
@@ -11,7 +11,15 @@ import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap._
object PNCounterMap {
- def empty[A]: PNCounterMap[A] = new PNCounterMap(ORMap.emptyWithPNCounterMapTag)
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[akka] case object PNCounterMapTag extends ZeroTag {
+ override def zero: DeltaReplicatedData = PNCounterMap.empty
+ override final val value: Int = 1
+ }
+
+ def empty[A]: PNCounterMap[A] = new PNCounterMap(new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag))
def apply[A](): PNCounterMap[A] = empty
/**
* Java API
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
index 8d713e4633..194b5e08c5 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
@@ -8,6 +8,7 @@ import java.util.ArrayList
import java.util.Collections
import java.util.Comparator
import java.util.TreeSet
+
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.breakOut
@@ -24,6 +25,7 @@ import akka.util.ByteString.UTF_8
import scala.collection.immutable.TreeMap
import akka.cluster.UniqueAddress
import java.io.NotSerializableException
+
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
import akka.cluster.ddata.ORSet.DeltaOp
@@ -570,129 +572,134 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(ormap.getEntriesList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData])
new ORMap(
keys = orsetFromProto(ormap.getKeys),
- entries)
+ entries,
+ ORMap.VanillaORMapTag)
}
- def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](entry: PEntry, valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
- val elem = if (eh.hasStringKey(entry)) Some(eh.getStringKey(entry) → valueCreator(eh.getValue(entry)))
- else if (eh.hasIntKey(entry)) Some(eh.getIntKey(entry) → valueCreator(eh.getValue(entry)))
- else if (eh.hasLongKey(entry)) Some(eh.getLongKey(entry) → valueCreator(eh.getValue(entry)))
- else if (eh.hasOtherKey(entry)) Some(otherMessageFromProto(eh.getOtherKey(entry)) → valueCreator(eh.getValue(entry)))
- else None
- elem match {
- case Some(e) ⇒ Map(e)
- case _ ⇒ Map.empty[Any, B]
+ def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
+ val map = mapTypeFromProto(input, valueCreator)
+ if (map.size > 1)
+ throw new IllegalArgumentException(s"Can't deserialize the key/value pair in the ORMap delta - too many pairs on the wire")
+ else
+ map
+ }
+
+ def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entryOption: Option[PEntry])(implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
+ entryOption match {
+ case Some(entry) ⇒ if (eh.hasStringKey(entry)) eh.getStringKey(entry)
+ else if (eh.hasIntKey(entry)) eh.getIntKey(entry)
+ else if (eh.hasLongKey(entry)) eh.getLongKey(entry)
+ else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry))
+ else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
+ case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
}
- }
-
- def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entry: PEntry)(implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
- if (eh.hasStringKey(entry)) eh.getStringKey(entry)
- else if (eh.hasIntKey(entry)) eh.getIntKey(entry)
- else if (eh.hasLongKey(entry)) eh.getLongKey(entry)
- else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry))
- else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
// wire protocol is always DeltaGroup
private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = {
- val group = ormapDeltaGroupFromBinary(bytes)
- if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
- group.ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
+ val ops = ormapDeltaGroupOpsFromBinary(bytes)
+ if (ops.size == 1 && ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
+ ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta put operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveFromBinary(bytes: Array[Byte]): ORMap.RemoveDeltaOp[Any, ReplicatedData] = {
- val group = ormapDeltaGroupFromBinary(bytes)
- if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
- group.ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
+ val ops = ormapDeltaGroupOpsFromBinary(bytes)
+ if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
+ ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveKeyFromBinary(bytes: Array[Byte]): ORMap.RemoveKeyDeltaOp[Any, ReplicatedData] = {
- val group = ormapDeltaGroupFromBinary(bytes)
- if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
- group.ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
+ val ops = ormapDeltaGroupOpsFromBinary(bytes)
+ if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
+ ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove key operation size or kind")
}
// wire protocol is always delta group
private def ormapUpdateFromBinary(bytes: Array[Byte]): ORMap.UpdateDeltaOp[Any, ReplicatedDelta] = {
- val group = ormapDeltaGroupFromBinary(bytes)
- if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
- group.ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
+ val ops = ormapDeltaGroupOpsFromBinary(bytes)
+ if (ops.size == 1 && ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
+ ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
else
throw new NotSerializableException("Improper ORMap delta update operation size or kind")
}
// this can be made client-extendable in the same way as Http codes in Spray are
private def zeroTagFromCode(code: Int) = code match {
- case ORMap.VanillaORMapTag.value ⇒ ORMap.VanillaORMapTag
- case ORMap.PNCounterMapTag.value ⇒ ORMap.PNCounterMapTag
- case ORMap.ORMultiMapTag.value ⇒ ORMap.ORMultiMapTag
- case ORMap.ORMultiMapWithValueDeltasTag.value ⇒ ORMap.ORMultiMapWithValueDeltasTag
- case ORMap.LWWMapTag.value ⇒ ORMap.LWWMapTag
- case _ ⇒ throw new IllegalArgumentException("Invalid ZeroTag code")
+ case ORMap.VanillaORMapTag.value ⇒ ORMap.VanillaORMapTag
+ case PNCounterMap.PNCounterMapTag.value ⇒ PNCounterMap.PNCounterMapTag
+ case ORMultiMap.ORMultiMapTag.value ⇒ ORMultiMap.ORMultiMapTag
+ case ORMultiMap.ORMultiMapWithValueDeltasTag.value ⇒ ORMultiMap.ORMultiMapWithValueDeltasTag
+ case LWWMap.LWWMapTag.value ⇒ LWWMap.LWWMapTag
+ case _ ⇒ throw new IllegalArgumentException("Invalid ZeroTag code")
}
private def ormapDeltaGroupFromBinary(bytes: Array[Byte]): ORMap.DeltaGroup[Any, ReplicatedData] = {
+ ORMap.DeltaGroup(ormapDeltaGroupOpsFromBinary(bytes))
+ }
+
+ private def ormapDeltaGroupOpsFromBinary(bytes: Array[Byte]): scala.collection.immutable.IndexedSeq[ORMap.DeltaOp] = {
val deltaGroup = rd.ORMapDeltaGroup.parseFrom(bytes)
val ops: Vector[ORMap.DeltaOp] =
deltaGroup.getEntriesList.asScala.map { entry ⇒
if (entry.getOperation == rd.ORMapDeltaOp.ORMapPut) {
- val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData])
+ val map = singleMapEntryFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) {
ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) {
- val elem = singleKeyEntryFromProto(entry.getEntryData)
+ val elem = singleKeyEntryFromProto(entry.getEntryDataList.asScala.headOption)
ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), elem, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) {
- val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
+ val map = mapTypeFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag))
} else
throw new NotSerializableException(s"Unknown ORMap delta operation ${entry.getOperation}")
}(collection.breakOut)
- ORMap.DeltaGroup(ops)
+ ops
}
private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = {
- ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
+ ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = {
- ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
+ ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = {
- ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
+ ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = {
- ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
+ ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = {
+ ormapDeltaGroupOpsToProto(deltaGroup.ops)
+ }
+
+ private def ormapDeltaGroupOpsToProto(deltaGroupOps: scala.collection.immutable.IndexedSeq[ORMap.DeltaOp]): rd.ORMapDeltaGroup = {
def createEntry(opType: rd.ORMapDeltaOp, u: ORSet[_], m: Map[_, _], zt: Int) = {
- if (m.size > 1)
+ if (m.size > 1 && opType != rd.ORMapDeltaOp.ORMapUpdate)
throw new IllegalArgumentException("Invalid size of ORMap delta map")
else {
- val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder()
- m.headOption.map {
- case (key: String, value) ⇒ entryDataBuilder.setStringKey(key).setValue(otherMessageToProto(value))
- case (key: Int, value) ⇒ entryDataBuilder.setIntKey(key).setValue(otherMessageToProto(value))
- case (key: Long, value) ⇒ entryDataBuilder.setLongKey(key).setValue(otherMessageToProto(value))
- case (key, value) ⇒ entryDataBuilder.setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value))
- }
val builder = rd.ORMapDeltaGroup.Entry.newBuilder()
.setOperation(opType)
.setUnderlying(orsetToProto(u))
.setZeroTag(zt)
- if (m.size > 0)
- builder.setEntryData(entryDataBuilder.build())
+ m.foreach {
+ case (key: String, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setStringKey(key).setValue(otherMessageToProto(value)).build())
+ case (key: Int, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setIntKey(key).setValue(otherMessageToProto(value)).build())
+ case (key: Long, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setLongKey(key).setValue(otherMessageToProto(value)).build())
+ case (key, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value)).build())
+ }
builder
}
}
@@ -709,12 +716,12 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
.setOperation(opType)
.setUnderlying(orsetToProto(u))
.setZeroTag(zt)
- builder.setEntryData(entryDataBuilder.build())
+ builder.addEntryData(entryDataBuilder.build())
builder
}
val b = rd.ORMapDeltaGroup.newBuilder()
- deltaGroup.ops.foreach {
+ deltaGroupOps.foreach {
case ORMap.PutDeltaOp(op, pair, zt) ⇒
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapPut, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, Map(pair), zt.value))
case ORMap.RemoveDeltaOp(op, zt) ⇒
@@ -742,7 +749,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto)
new LWWMap(new ORMap(
keys = orsetFromProto(lwwmap.getKeys),
- entries, ORMap.LWWMapTag))
+ entries, LWWMap.LWWMapTag))
}
def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = {
@@ -758,7 +765,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto)
new PNCounterMap(new ORMap(
keys = orsetFromProto(pncountermap.getKeys),
- entries, ORMap.PNCounterMapTag))
+ entries, PNCounterMap.PNCounterMapTag))
}
def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = {
@@ -783,9 +790,9 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
keys = orsetFromProto(multimap.getKeys),
entries,
if (withValueDeltas)
- ORMap.ORMultiMapWithValueDeltasTag
+ ORMultiMap.ORMultiMapWithValueDeltasTag
else
- ORMap.ORMultiMapTag),
+ ORMultiMap.ORMultiMapTag),
withValueDeltas)
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala
index 262c3da232..68ba4011ed 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala
@@ -439,6 +439,77 @@ class ORMapSpec extends WordSpec with Matchers {
merged6.entries("c").elements should be(Set("C"))
}
+ "work with delta-coalescing scenario 1" in {
+ val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B")
+ val m2 = m1.resetDelta.put(node2, "b", GSet.empty + "B2").updated(node2, "b", GSet.empty[String])(_.add("B3"))
+
+ val merged1 = m1 merge m2
+
+ merged1.entries("a").elements should be(Set("A"))
+ merged1.entries("b").elements should be(Set("B", "B2", "B3"))
+
+ val merged2 = m1 mergeDelta m2.delta.get
+
+ merged2.entries("a").elements should be(Set("A"))
+ merged2.entries("b").elements should be(Set("B", "B2", "B3"))
+
+ val m3 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B")
+ val m4 = m3.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "b", GSet.empty + "B3")
+
+ val merged3 = m3 merge m4
+
+ merged3.entries("a").elements should be(Set("A"))
+ merged3.entries("b").elements should be(Set("B", "B3"))
+
+ val merged4 = m3 mergeDelta m4.delta.get
+
+ merged4.entries("a").elements should be(Set("A"))
+ merged4.entries("b").elements should be(Set("B", "B3"))
+
+ val m5 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B")
+ val m6 = m5.resetDelta.put(node2, "b", GSet.empty + "B2").updated(node2, "b", GSet.empty[String])(_.add("B3"))
+ .updated(node2, "b", GSet.empty[String])(_.add("B4"))
+
+ val merged5 = m5 merge m6
+
+ merged5.entries("a").elements should be(Set("A"))
+ merged5.entries("b").elements should be(Set("B", "B2", "B3", "B4"))
+
+ val merged6 = m5 mergeDelta m6.delta.get
+
+ merged6.entries("a").elements should be(Set("A"))
+ merged6.entries("b").elements should be(Set("B", "B2", "B3", "B4"))
+
+ val m7 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B")
+ val m8 = m7.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "d", GSet.empty + "D").put(node2, "b", GSet.empty + "B3")
+
+ val merged7 = m7 merge m8
+
+ merged7.entries("a").elements should be(Set("A"))
+ merged7.entries("b").elements should be(Set("B", "B3"))
+ merged7.entries("d").elements should be(Set("D"))
+
+ val merged8 = m7 mergeDelta m8.delta.get
+
+ merged8.entries("a").elements should be(Set("A"))
+ merged8.entries("b").elements should be(Set("B", "B3"))
+ merged8.entries("d").elements should be(Set("D"))
+
+ val m9 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B")
+ val m10 = m9.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "d", GSet.empty + "D")
+ .remove(node2, "d").put(node2, "b", GSet.empty + "B3")
+
+ val merged9 = m9 merge m10
+
+ merged9.entries("a").elements should be(Set("A"))
+ merged9.entries("b").elements should be(Set("B", "B3"))
+
+ val merged10 = m9 mergeDelta m10.delta.get
+
+ merged10.entries("a").elements should be(Set("A"))
+ merged10.entries("b").elements should be(Set("B", "B3"))
+ }
+
"work with deltas and updated for GSet elements type" in {
val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A")
val m2 = m1.resetDelta.updated(node1, "a", GSet.empty[String])(_.add("B"))
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala
index f95af2c934..441c115a4e 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala
@@ -133,6 +133,12 @@ class ORMultiMapSpec extends WordSpec with Matchers {
merged3.entries("a") should be(Set("A"))
merged3.entries("b") should be(Set("B2"))
merged3.entries("c") should be(Set("C"))
+
+ val merged4 = merged1 mergeDelta m3.delta.get.merge(m4.delta.get)
+
+ merged4.entries("a") should be(Set("A"))
+ merged4.entries("b") should be(Set("B2"))
+ merged4.entries("c") should be(Set("C"))
}
"not have usual anomalies for remove+addBinding scenario and delta-deltas 2" in {
@@ -181,6 +187,217 @@ class ORMultiMapSpec extends WordSpec with Matchers {
merged6.entries("c") should be(Set("C"))
}
+ "work with delta-coalescing scenario 1" in {
+ val m1 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m2 = m1.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3")
+
+ val merged1 = m1 merge m2
+
+ merged1.entries("a") should be(Set("A"))
+ merged1.entries("b") should be(Set("B2", "B3"))
+
+ val merged2 = m1 mergeDelta m2.delta.get
+
+ merged2.entries("a") should be(Set("A"))
+ merged2.entries("b") should be(Set("B2", "B3"))
+
+ val m3 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m4 = m3.resetDelta.put(node2, "b", Set("B2")).put(node2, "b", Set("B3"))
+
+ val merged3 = m3 merge m4
+
+ merged3.entries("a") should be(Set("A"))
+ merged3.entries("b") should be(Set("B3"))
+
+ val merged4 = m3 mergeDelta m4.delta.get
+
+ merged4.entries("a") should be(Set("A"))
+ merged4.entries("b") should be(Set("B3"))
+
+ val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m6 = m5.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3").addBinding(node2, "b", "B4")
+
+ val merged5 = m5 merge m6
+
+ merged5.entries("a") should be(Set("A"))
+ merged5.entries("b") should be(Set("B2", "B3", "B4"))
+
+ val merged6 = m5 mergeDelta m6.delta.get
+
+ merged6.entries("a") should be(Set("A"))
+ merged6.entries("b") should be(Set("B2", "B3", "B4"))
+
+ val m7 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m8 = m7.resetDelta.put(node2, "d", Set("D")).addBinding(node2, "b", "B3").put(node2, "b", Set("B4"))
+
+ val merged7 = m7 merge m8
+
+ merged7.entries("a") should be(Set("A"))
+ merged7.entries("b") should be(Set("B4"))
+ merged7.entries("d") should be(Set("D"))
+
+ val merged8 = m7 mergeDelta m8.delta.get
+
+ merged8.entries("a") should be(Set("A"))
+ merged8.entries("b") should be(Set("B4"))
+ merged8.entries("d") should be(Set("D"))
+
+ val m9 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m10 = m9.resetDelta.addBinding(node2, "b", "B3").addBinding(node2, "b", "B4")
+
+ val merged9 = m9 merge m10
+
+ merged9.entries("a") should be(Set("A"))
+ merged9.entries("b") should be(Set("B", "B3", "B4"))
+
+ val merged10 = m9 mergeDelta m10.delta.get
+
+ merged10.entries("a") should be(Set("A"))
+ merged10.entries("b") should be(Set("B", "B3", "B4"))
+
+ val m11 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1"))
+ .remove(node1, "b")
+ val m12 = m11.resetDelta.addBinding(node2, "b", "B2").addBinding(node2, "b", "B3")
+
+ val merged11 = m11 merge m12
+
+ merged11.entries("a") should be(Set("A"))
+ merged11.entries("b") should be(Set("B2", "B3"))
+
+ val merged12 = m11 mergeDelta m12.delta.get
+
+ merged12.entries("a") should be(Set("A"))
+ merged12.entries("b") should be(Set("B2", "B3"))
+
+ val m13 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1"))
+ .remove(node1, "b")
+ val m14 = m13.resetDelta.addBinding(node2, "b", "B2").put(node2, "b", Set("B3"))
+
+ val merged13 = m13 merge m14
+
+ merged13.entries("a") should be(Set("A"))
+ merged13.entries("b") should be(Set("B3"))
+
+ val merged14 = m13 mergeDelta m14.delta.get
+
+ merged14.entries("a") should be(Set("A"))
+ merged14.entries("b") should be(Set("B3"))
+
+ val m15 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1"))
+ .put(node1, "c", Set("C"))
+ val m16 = m15.resetDelta.addBinding(node2, "b", "B2").addBinding(node2, "c", "C1")
+
+ val merged15 = m15 merge m16
+
+ merged15.entries("a") should be(Set("A"))
+ merged15.entries("b") should be(Set("B", "B1", "B2"))
+ merged15.entries("c") should be(Set("C", "C1"))
+
+ val merged16 = m15 mergeDelta m16.delta.get
+
+ merged16.entries("a") should be(Set("A"))
+ merged16.entries("b") should be(Set("B", "B1", "B2"))
+ merged16.entries("c") should be(Set("C", "C1"))
+
+ // somewhat artificial setup
+ val m17 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1"))
+ val m18 = m17.resetDelta.addBinding(node2, "b", "B2")
+ val m19 = ORMultiMap.emptyWithValueDeltas[String, String].resetDelta.put(node2, "b", Set("B3"))
+
+ val merged17 = m17 merge m18 merge m19
+
+ merged17.entries("a") should be(Set("A"))
+ merged17.entries("b") should be(Set("B", "B1", "B3"))
+
+ val merged18 = m17 mergeDelta m18.delta.get.merge(m19.delta.get)
+
+ merged18.entries("a") should be(Set("A"))
+ merged18.entries("b") should be(Set("B", "B1", "B3"))
+ }
+
+ "work with delta-coalescing scenario 2" in {
+ val m1 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m2 = m1.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3")
+
+ val merged1 = m1 merge m2
+
+ merged1.entries("a") should be(Set("A"))
+ merged1.entries("b") should be(Set("B2", "B3"))
+
+ val merged2 = m1 mergeDelta m2.delta.get
+
+ merged2.entries("a") should be(Set("A"))
+ merged2.entries("b") should be(Set("B2", "B3"))
+
+ val m3 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m4 = m3.resetDelta.put(node2, "b", Set("B2")).put(node2, "b", Set("B3"))
+
+ val merged3 = m3 merge m4
+
+ merged3.entries("a") should be(Set("A"))
+ merged3.entries("b") should be(Set("B3"))
+
+ val merged4 = m3 mergeDelta m4.delta.get
+
+ merged4.entries("a") should be(Set("A"))
+ merged4.entries("b") should be(Set("B3"))
+
+ val m5 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m6 = m5.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3").addBinding(node2, "b", "B4")
+
+ val merged5 = m5 merge m6
+
+ merged5.entries("a") should be(Set("A"))
+ merged5.entries("b") should be(Set("B2", "B3", "B4"))
+
+ val merged6 = m5 mergeDelta m6.delta.get
+
+ merged6.entries("a") should be(Set("A"))
+ merged6.entries("b") should be(Set("B2", "B3", "B4"))
+
+ val m7 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m8 = m7.resetDelta.put(node2, "d", Set("D")).addBinding(node2, "b", "B3").put(node2, "b", Set("B4"))
+
+ val merged7 = m7 merge m8
+
+ merged7.entries("a") should be(Set("A"))
+ merged7.entries("b") should be(Set("B4"))
+ merged7.entries("d") should be(Set("D"))
+
+ val merged8 = m7 mergeDelta m8.delta.get
+
+ merged8.entries("a") should be(Set("A"))
+ merged8.entries("b") should be(Set("B4"))
+ merged8.entries("d") should be(Set("D"))
+
+ val m9 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B"))
+ val m10 = m9.resetDelta.addBinding(node2, "b", "B3").addBinding(node2, "b", "B4")
+
+ val merged9 = m9 merge m10
+
+ merged9.entries("a") should be(Set("A"))
+ merged9.entries("b") should be(Set("B", "B3", "B4"))
+
+ val merged10 = m9 mergeDelta m10.delta.get
+
+ merged10.entries("a") should be(Set("A"))
+ merged10.entries("b") should be(Set("B", "B3", "B4"))
+
+ val m11 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1"))
+ .remove(node1, "b")
+ val m12 = ORMultiMap.empty[String, String].addBinding(node2, "b", "B2").addBinding(node2, "b", "B3")
+
+ val merged11 = m11 merge m12
+
+ merged11.entries("a") should be(Set("A"))
+ merged11.entries("b") should be(Set("B2", "B3"))
+
+ val merged12 = m11 mergeDelta m12.delta.get
+
+ merged12.entries("a") should be(Set("A"))
+ merged12.entries("b") should be(Set("B2", "B3"))
+ }
+
"have unapply extractor" in {
val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L))
val m2: ORMultiMap[String, Long] = m1
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
index da86a68dc2..5a63607339 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
@@ -263,6 +263,10 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
val m1 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2")
val m2 = ORMultiMap.empty[String, String].put(address2, "b", Set("B1", "B2", "B3"))
checkSameContent(m1.merge(m2), m2.merge(m1))
+ checkSerialization(ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address1, "a", "A2").delta.get)
+ val m3 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1")
+ val d3 = m3.resetDelta.addBinding(address1, "a", "A2").addBinding(address1, "a", "A3").delta.get
+ checkSerialization(d3)
}
"be compatible with old ORMultiMap serialization" in {