From 7c42627ea9e28bf24ec39f1fa82fc4b75c545ef4 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Fri, 24 Mar 2017 16:18:01 +0100 Subject: [PATCH] delta-aggregation in the ORMap deltas (#22633) --- .../protobuf/msg/ReplicatedDataMessages.java | 360 +++++++++++++----- .../protobuf/ReplicatedDataMessages.proto | 2 +- .../scala/akka/cluster/ddata/LWWMap.scala | 12 +- .../main/scala/akka/cluster/ddata/ORMap.scala | 137 ++++--- .../scala/akka/cluster/ddata/ORMultiMap.scala | 19 +- .../akka/cluster/ddata/PNCounterMap.scala | 10 +- .../protobuf/ReplicatedDataSerializer.scala | 127 +++--- .../scala/akka/cluster/ddata/ORMapSpec.scala | 71 ++++ .../akka/cluster/ddata/ORMultiMapSpec.scala | 217 +++++++++++ .../ReplicatedDataSerializerSpec.scala | 4 + 10 files changed, 728 insertions(+), 231 deletions(-) diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java index 2cbf071334..6252502e9c 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java @@ -10906,19 +10906,30 @@ public final class ReplicatedDataMessages { */ int getZeroTag(); - // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - boolean hasEntryData(); + java.util.List + getEntryDataList(); /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(); + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index); /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder(); + int getEntryDataCount(); + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + java.util.List + getEntryDataOrBuilderList(); + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder( + int index); } /** * Protobuf type {@code akka.cluster.ddata.ORMapDeltaGroup.Entry} @@ -11001,16 +11012,11 @@ public final class ReplicatedDataMessages { break; } case 34: { - akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder subBuilder = null; - if (((bitField0_ & 0x00000008) == 0x00000008)) { - subBuilder = entryData_.toBuilder(); + if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + entryData_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000008; } - entryData_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.PARSER, extensionRegistry); - if (subBuilder != null) { - subBuilder.mergeFrom(entryData_); - entryData_ = subBuilder.buildPartial(); - } - bitField0_ |= 0x00000008; + entryData_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.PARSER, extensionRegistry)); break; } } @@ -11021,6 +11027,9 @@ public final class ReplicatedDataMessages { throw new akka.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { + if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + entryData_ = java.util.Collections.unmodifiableList(entryData_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -11107,33 +11116,47 @@ public final class ReplicatedDataMessages { return zeroTag_; } - // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; public static final int ENTRYDATA_FIELD_NUMBER = 4; - private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry entryData_; + private java.util.List entryData_; /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public boolean hasEntryData() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; - */ - public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData() { + public java.util.List getEntryDataList() { return entryData_; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder() { + public java.util.List + getEntryDataOrBuilderList() { return entryData_; } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public int getEntryDataCount() { + return entryData_.size(); + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index) { + return entryData_.get(index); + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder( + int index) { + return entryData_.get(index); + } private void initFields() { operation_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaOp.ORMapPut; underlying_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); zeroTag_ = 0; - entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance(); + entryData_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -11156,8 +11179,8 @@ public final class ReplicatedDataMessages { memoizedIsInitialized = 0; return false; } - if (hasEntryData()) { - if (!getEntryData().isInitialized()) { + for (int i = 0; i < getEntryDataCount(); i++) { + if (!getEntryData(i).isInitialized()) { memoizedIsInitialized = 0; return false; } @@ -11178,8 +11201,8 @@ public final class ReplicatedDataMessages { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeSInt32(3, zeroTag_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeMessage(4, entryData_); + for (int i = 0; i < entryData_.size(); i++) { + output.writeMessage(4, entryData_.get(i)); } getUnknownFields().writeTo(output); } @@ -11202,9 +11225,9 @@ public final class ReplicatedDataMessages { size += akka.protobuf.CodedOutputStream .computeSInt32Size(3, zeroTag_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + for (int i = 0; i < entryData_.size(); i++) { size += akka.protobuf.CodedOutputStream - .computeMessageSize(4, entryData_); + .computeMessageSize(4, entryData_.get(i)); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -11335,11 +11358,11 @@ public final class ReplicatedDataMessages { zeroTag_ = 0; bitField0_ = (bitField0_ & ~0x00000004); if (entryDataBuilder_ == null) { - entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance(); + entryData_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); } else { entryDataBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -11384,10 +11407,11 @@ public final class ReplicatedDataMessages { to_bitField0_ |= 0x00000004; } result.zeroTag_ = zeroTag_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } if (entryDataBuilder_ == null) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { + entryData_ = java.util.Collections.unmodifiableList(entryData_); + bitField0_ = (bitField0_ & ~0x00000008); + } result.entryData_ = entryData_; } else { result.entryData_ = entryDataBuilder_.build(); @@ -11417,8 +11441,31 @@ public final class ReplicatedDataMessages { if (other.hasZeroTag()) { setZeroTag(other.getZeroTag()); } - if (other.hasEntryData()) { - mergeEntryData(other.getEntryData()); + if (entryDataBuilder_ == null) { + if (!other.entryData_.isEmpty()) { + if (entryData_.isEmpty()) { + entryData_ = other.entryData_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureEntryDataIsMutable(); + entryData_.addAll(other.entryData_); + } + onChanged(); + } + } else { + if (!other.entryData_.isEmpty()) { + if (entryDataBuilder_.isEmpty()) { + entryDataBuilder_.dispose(); + entryDataBuilder_ = null; + entryData_ = other.entryData_; + bitField0_ = (bitField0_ & ~0x00000008); + entryDataBuilder_ = + akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getEntryDataFieldBuilder() : null; + } else { + entryDataBuilder_.addAllMessages(other.entryData_); + } + } } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -11441,8 +11488,8 @@ public final class ReplicatedDataMessages { return false; } - if (hasEntryData()) { - if (!getEntryData().isInitialized()) { + for (int i = 0; i < getEntryDataCount(); i++) { + if (!getEntryData(i).isInitialized()) { return false; } @@ -11655,116 +11702,239 @@ public final class ReplicatedDataMessages { return this; } - // optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; - private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance(); - private akka.protobuf.SingleFieldBuilder< - akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder> entryDataBuilder_; - /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; - */ - public boolean hasEntryData() { - return ((bitField0_ & 0x00000008) == 0x00000008); + // repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + private java.util.List entryData_ = + java.util.Collections.emptyList(); + private void ensureEntryDataIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + entryData_ = new java.util.ArrayList(entryData_); + bitField0_ |= 0x00000008; + } } + + private akka.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder> entryDataBuilder_; + /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData() { + public java.util.List getEntryDataList() { if (entryDataBuilder_ == null) { - return entryData_; + return java.util.Collections.unmodifiableList(entryData_); } else { - return entryDataBuilder_.getMessage(); + return entryDataBuilder_.getMessageList(); } } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public Builder setEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) { + public int getEntryDataCount() { + if (entryDataBuilder_ == null) { + return entryData_.size(); + } else { + return entryDataBuilder_.getCount(); + } + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry getEntryData(int index) { + if (entryDataBuilder_ == null) { + return entryData_.get(index); + } else { + return entryDataBuilder_.getMessage(index); + } + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public Builder setEntryData( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) { if (entryDataBuilder_ == null) { if (value == null) { throw new NullPointerException(); } - entryData_ = value; + ensureEntryDataIsMutable(); + entryData_.set(index, value); onChanged(); } else { - entryDataBuilder_.setMessage(value); + entryDataBuilder_.setMessage(index, value); } - bitField0_ |= 0x00000008; return this; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ public Builder setEntryData( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) { + if (entryDataBuilder_ == null) { + ensureEntryDataIsMutable(); + entryData_.set(index, builderForValue.build()); + onChanged(); + } else { + entryDataBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public Builder addEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) { + if (entryDataBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntryDataIsMutable(); + entryData_.add(value); + onChanged(); + } else { + entryDataBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public Builder addEntryData( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) { + if (entryDataBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntryDataIsMutable(); + entryData_.add(index, value); + onChanged(); + } else { + entryDataBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public Builder addEntryData( akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) { if (entryDataBuilder_ == null) { - entryData_ = builderForValue.build(); + ensureEntryDataIsMutable(); + entryData_.add(builderForValue.build()); onChanged(); } else { - entryDataBuilder_.setMessage(builderForValue.build()); + entryDataBuilder_.addMessage(builderForValue.build()); } - bitField0_ |= 0x00000008; return this; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public Builder mergeEntryData(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry value) { + public Builder addEntryData( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder builderForValue) { if (entryDataBuilder_ == null) { - if (((bitField0_ & 0x00000008) == 0x00000008) && - entryData_ != akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance()) { - entryData_ = - akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.newBuilder(entryData_).mergeFrom(value).buildPartial(); - } else { - entryData_ = value; - } + ensureEntryDataIsMutable(); + entryData_.add(index, builderForValue.build()); onChanged(); } else { - entryDataBuilder_.mergeFrom(value); + entryDataBuilder_.addMessage(index, builderForValue.build()); } - bitField0_ |= 0x00000008; return this; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public Builder addAllEntryData( + java.lang.Iterable values) { + if (entryDataBuilder_ == null) { + ensureEntryDataIsMutable(); + super.addAll(values, entryData_); + onChanged(); + } else { + entryDataBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ public Builder clearEntryData() { if (entryDataBuilder_ == null) { - entryData_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance(); + entryData_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); onChanged(); } else { entryDataBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); return this; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder getEntryDataBuilder() { - bitField0_ |= 0x00000008; - onChanged(); - return getEntryDataFieldBuilder().getBuilder(); + public Builder removeEntryData(int index) { + if (entryDataBuilder_ == null) { + ensureEntryDataIsMutable(); + entryData_.remove(index); + onChanged(); + } else { + entryDataBuilder_.remove(index); + } + return this; } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder() { - if (entryDataBuilder_ != null) { - return entryDataBuilder_.getMessageOrBuilder(); - } else { - return entryData_; + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder getEntryDataBuilder( + int index) { + return getEntryDataFieldBuilder().getBuilder(index); + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder getEntryDataOrBuilder( + int index) { + if (entryDataBuilder_ == null) { + return entryData_.get(index); } else { + return entryDataBuilder_.getMessageOrBuilder(index); } } /** - * optional .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; */ - private akka.protobuf.SingleFieldBuilder< + public java.util.List + getEntryDataOrBuilderList() { + if (entryDataBuilder_ != null) { + return entryDataBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(entryData_); + } + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder addEntryDataBuilder() { + return getEntryDataFieldBuilder().addBuilder( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder addEntryDataBuilder( + int index) { + return getEntryDataFieldBuilder().addBuilder( + index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.ORMapDeltaGroup.MapEntry entryData = 4; + */ + public java.util.List + getEntryDataBuilderList() { + return getEntryDataFieldBuilder().getBuilderList(); + } + private akka.protobuf.RepeatedFieldBuilder< akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder> getEntryDataFieldBuilder() { if (entryDataBuilder_ == null) { - entryDataBuilder_ = new akka.protobuf.SingleFieldBuilder< + entryDataBuilder_ = new akka.protobuf.RepeatedFieldBuilder< akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMapDeltaGroup.MapEntryOrBuilder>( entryData_, + ((bitField0_ & 0x00000008) == 0x00000008), getParentForChildren(), isClean()); entryData_ = null; @@ -18398,7 +18568,7 @@ public final class ReplicatedDataMessages { "erMessage\032\275\001\n\005Entry\0223\n\toperation\030\001 \002(\0162 ", ".akka.cluster.ddata.ORMapDeltaOp\022-\n\nunde" + "rlying\030\002 \002(\0132\031.akka.cluster.ddata.ORSet\022" + - "\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryData\030\004 \001(\0132,.ak" + + "\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryData\030\004 \003(\0132,.ak" + "ka.cluster.ddata.ORMapDeltaGroup.MapEntr" + "y\"\206\002\n\006LWWMap\022\'\n\004keys\030\001 \002(\0132\031.akka.cluste" + "r.ddata.ORSet\0221\n\007entries\030\002 \003(\0132 .akka.cl" + diff --git a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto index ca6b2fbdca..d02371b4fa 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto @@ -87,7 +87,7 @@ message ORMapDeltaGroup { required ORMapDeltaOp operation = 1; required ORSet underlying = 2; required sint32 zeroTag = 3; - optional MapEntry entryData = 4; + repeated MapEntry entryData = 4; } repeated Entry entries = 1; diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala index 2af60a8aad..3a4d34e54d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala @@ -6,10 +6,18 @@ package akka.cluster.ddata import akka.cluster.Cluster import akka.cluster.UniqueAddress import akka.annotation.InternalApi -import akka.cluster.ddata.ORMap.LWWMapTag +import akka.cluster.ddata.ORMap.ZeroTag object LWWMap { - private val _empty: LWWMap[Any, Any] = new LWWMap(ORMap.emptyWithLWWMapTag) + /** + * INTERNAL API + */ + @InternalApi private[akka] case object LWWMapTag extends ZeroTag { + override def zero: DeltaReplicatedData = LWWMap.empty + override final val value: Int = 4 + } + + private val _empty: LWWMap[Any, Any] = new LWWMap(new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag)) def empty[A, B]: LWWMap[A, B] = _empty.asInstanceOf[LWWMap[A, B]] def apply(): LWWMap[Any, Any] = _empty /** diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 4f1fe0f478..b817a41955 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -12,7 +12,7 @@ import akka.cluster.ddata.ORMap.{ AtomicDeltaOp, ZeroTag } import scala.collection.immutable object ORMap { - private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty) + private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty, VanillaORMapTag) def empty[A, B <: ReplicatedData]: ORMap[A, B] = _empty.asInstanceOf[ORMap[A, B]] def apply(): ORMap[Any, ReplicatedData] = _empty /** @@ -30,26 +30,6 @@ object ORMap { override def zero: DeltaReplicatedData } - /** - * INTERNAL API - */ - @InternalApi private[akka] def emptyWithPNCounterMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag) - - /** - * INTERNAL API - */ - @InternalApi private[akka] def emptyWithORMultiMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag) - - /** - * INTERNAL API - */ - @InternalApi private[akka] def emptyWithORMultiMapWithValueDeltasTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag) - - /** - * INTERNAL API - */ - @InternalApi private[akka] def emptyWithLWWMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag) - /** * INTERNAL API * Tags for ORMap.DeltaOp's, so that when the Replicator needs to re-create full value from delta, @@ -68,38 +48,6 @@ object ORMap { override final val value: Int = 0 } - /** - * INTERNAL API - */ - @InternalApi private[akka] case object PNCounterMapTag extends ZeroTag { - override def zero: DeltaReplicatedData = PNCounterMap.empty - override final val value: Int = 1 - } - - /** - * INTERNAL API - */ - @InternalApi private[akka] case object ORMultiMapTag extends ZeroTag { - override def zero: DeltaReplicatedData = ORMultiMap.empty - override final val value: Int = 2 - } - - /** - * INTERNAL API - */ - @InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag { - override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas - override final val value: Int = 3 - } - - /** - * INTERNAL API - */ - @InternalApi private[akka] case object LWWMapTag extends ZeroTag { - override def zero: DeltaReplicatedData = LWWMap.empty - override final val value: Int = 4 - } - /** * INTERNAL API */ @@ -107,7 +55,6 @@ object ORMap { def underlying: ORSet.DeltaOp def zeroTag: ZeroTag override def zero: DeltaReplicatedData = zeroTag.zero - override def merge(that: DeltaOp): DeltaOp = that match { case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other)) case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops) @@ -116,30 +63,79 @@ object ORMap { // PutDeltaOp contains ORSet delta and full value /** INTERNAL API */ - @InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] { + @InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] { + override def merge(that: DeltaOp): DeltaOp = that match { + case put: PutDeltaOp[A, B] if this.value._1 == put.value._1 ⇒ + new PutDeltaOp[A, B](this.underlying.merge(put.underlying), put.value, zeroTag) + case update: UpdateDeltaOp[A, B] if update.values.size == 1 && update.values.contains(this.value._1) ⇒ + val (key, elem1) = this.value + val newValue = elem1 match { + case e1: DeltaReplicatedData ⇒ + val e2 = update.values.head._2.asInstanceOf[e1.D] + (key, e1.mergeDelta(e2).asInstanceOf[B]) + case _ ⇒ + val elem2 = update.values.head._2.asInstanceOf[elem1.T] + (key, elem1.merge(elem2).asInstanceOf[B]) + } + new PutDeltaOp[A, B](this.underlying.merge(update.underlying), newValue, zeroTag) + case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other)) + case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops) + } } // UpdateDeltaOp contains ORSet delta and either delta of value (in case where underlying type supports deltas) or full value /** INTERNAL API */ - @InternalApi private[akka] final case class UpdateDeltaOp[A, X <: ReplicatedDelta](underlying: ORSet.DeltaOp, values: Map[A, X], zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, X] { + @InternalApi private[akka] final case class UpdateDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, values: Map[A, B], zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] { + override def merge(that: DeltaOp): DeltaOp = that match { + case update: UpdateDeltaOp[A, B] ⇒ + new UpdateDeltaOp[A, B]( + this.underlying.merge(update.underlying), + update.values.foldLeft(this.values) { + (map, pair) ⇒ + val (key, value) = pair + if (this.values.contains(key)) { + val elem1 = this.values(key) + val elem2 = value.asInstanceOf[elem1.T] + map + (key → elem1.merge(elem2).asInstanceOf[B]) + } else map + pair + }, + zeroTag) + case put: PutDeltaOp[A, B] if this.values.size == 1 && this.values.contains(put.value._1) ⇒ + new PutDeltaOp[A, B](this.underlying.merge(put.underlying), put.value, zeroTag) + case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other)) + case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops) + } } - // RemoveDeltaOp does not contain any value at all - the propagated 'value' map is empty + // RemoveDeltaOp does not contain any value at all - the propagated 'value' map would be empty /** INTERNAL API */ - @InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] { - } + @InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] // RemoveKeyDeltaOp contains a single value - to provide the recipient with the removed key for value map /** INTERNAL API */ - @InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] { - } + @InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag) extends AtomicDeltaOp[A, B] // DeltaGroup is effectively a causally ordered list of individual deltas /** INTERNAL API */ @InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { override def merge(that: DeltaOp): DeltaOp = that match { - case DeltaGroup(thatOps) ⇒ DeltaGroup(ops ++ thatOps) - case that: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops :+ that) + case that: AtomicDeltaOp[A, B] ⇒ + ops.last match { + case thisPut: PutDeltaOp[A, B] ⇒ + val merged = thisPut.merge(that) + merged match { + case op: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops.dropRight(1) :+ op) + case DeltaGroup(thatOps) ⇒ DeltaGroup(ops.dropRight(1) ++ thatOps) + } + case thisUpdate: UpdateDeltaOp[A, B] ⇒ + val merged = thisUpdate.merge(that) + merged match { + case op: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops.dropRight(1) :+ op) + case DeltaGroup(thatOps) ⇒ DeltaGroup(ops.dropRight(1) ++ thatOps) + } + case _ ⇒ DeltaGroup(ops :+ that) + } + case DeltaGroup(thatOps) ⇒ DeltaGroup(ops ++ thatOps) } override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero) @@ -158,7 +154,7 @@ object ORMap { final class ORMap[A, B <: ReplicatedData] private[akka] ( private[akka] val keys: ORSet[A], private[akka] val values: Map[A, B], - private[akka] val zeroTag: ZeroTag = ORMap.VanillaORMapTag, + private[akka] val zeroTag: ZeroTag, override val delta: Option[ORMap.DeltaOp] = None) extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { @@ -390,12 +386,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( case removeKeyOp: RemoveKeyDeltaOp[A, B] ⇒ tombstonedVals = tombstonedVals + removeKeyOp.removedKey case updateOp: UpdateDeltaOp[A, _] ⇒ - val key = updateOp.values.head._1 - val value = (key, updateOp.values.head._2) - if (thatValueDeltas.contains(key)) - thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ value)) - else - thatValueDeltas += (key → (value :: Nil)) + updateOp.values.foreach { + case (key, value) ⇒ + if (thatValueDeltas.contains(key)) + thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ (key, value))) + else + thatValueDeltas += (key → ((key, value) :: Nil)) + } } val processNestedDelta: PartialFunction[ORMap.DeltaOp, Unit] = { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index aa1c022d95..25651954a2 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -8,9 +8,24 @@ import akka.annotation.InternalApi import akka.cluster.ddata.ORMap._ object ORMultiMap { + /** + * INTERNAL API + */ + @InternalApi private[akka] case object ORMultiMapTag extends ZeroTag { + override def zero: DeltaReplicatedData = ORMultiMap.empty + override final val value: Int = 2 + } - val _empty: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, false) - val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, true) + /** + * INTERNAL API + */ + @InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag { + override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas + override final val value: Int = 3 + } + + val _empty: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag), false) + val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag), true) /** * Provides an empty multimap. */ diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala index 694616edb2..e4ea85e70d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala @@ -11,7 +11,15 @@ import akka.annotation.InternalApi import akka.cluster.ddata.ORMap._ object PNCounterMap { - def empty[A]: PNCounterMap[A] = new PNCounterMap(ORMap.emptyWithPNCounterMapTag) + /** + * INTERNAL API + */ + @InternalApi private[akka] case object PNCounterMapTag extends ZeroTag { + override def zero: DeltaReplicatedData = PNCounterMap.empty + override final val value: Int = 1 + } + + def empty[A]: PNCounterMap[A] = new PNCounterMap(new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag)) def apply[A](): PNCounterMap[A] = empty /** * Java API diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 8d713e4633..194b5e08c5 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -8,6 +8,7 @@ import java.util.ArrayList import java.util.Collections import java.util.Comparator import java.util.TreeSet + import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.breakOut @@ -24,6 +25,7 @@ import akka.util.ByteString.UTF_8 import scala.collection.immutable.TreeMap import akka.cluster.UniqueAddress import java.io.NotSerializableException + import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage import akka.cluster.ddata.ORSet.DeltaOp @@ -570,129 +572,134 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) val entries = mapTypeFromProto(ormap.getEntriesList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) new ORMap( keys = orsetFromProto(ormap.getKeys), - entries) + entries, + ORMap.VanillaORMapTag) } - def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](entry: PEntry, valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = { - val elem = if (eh.hasStringKey(entry)) Some(eh.getStringKey(entry) → valueCreator(eh.getValue(entry))) - else if (eh.hasIntKey(entry)) Some(eh.getIntKey(entry) → valueCreator(eh.getValue(entry))) - else if (eh.hasLongKey(entry)) Some(eh.getLongKey(entry) → valueCreator(eh.getValue(entry))) - else if (eh.hasOtherKey(entry)) Some(otherMessageFromProto(eh.getOtherKey(entry)) → valueCreator(eh.getValue(entry))) - else None - elem match { - case Some(e) ⇒ Map(e) - case _ ⇒ Map.empty[Any, B] + def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = { + val map = mapTypeFromProto(input, valueCreator) + if (map.size > 1) + throw new IllegalArgumentException(s"Can't deserialize the key/value pair in the ORMap delta - too many pairs on the wire") + else + map + } + + def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entryOption: Option[PEntry])(implicit eh: ProtoMapEntryReader[PEntry, A]): Any = + entryOption match { + case Some(entry) ⇒ if (eh.hasStringKey(entry)) eh.getStringKey(entry) + else if (eh.hasIntKey(entry)) eh.getIntKey(entry) + else if (eh.hasLongKey(entry)) eh.getLongKey(entry) + else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry)) + else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta") + case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta") } - } - - def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entry: PEntry)(implicit eh: ProtoMapEntryReader[PEntry, A]): Any = - if (eh.hasStringKey(entry)) eh.getStringKey(entry) - else if (eh.hasIntKey(entry)) eh.getIntKey(entry) - else if (eh.hasLongKey(entry)) eh.getLongKey(entry) - else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry)) - else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta") // wire protocol is always DeltaGroup private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = { - val group = ormapDeltaGroupFromBinary(bytes) - if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]]) - group.ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]] + val ops = ormapDeltaGroupOpsFromBinary(bytes) + if (ops.size == 1 && ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]]) + ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]] else throw new NotSerializableException("Improper ORMap delta put operation size or kind") } // wire protocol is always delta group private def ormapRemoveFromBinary(bytes: Array[Byte]): ORMap.RemoveDeltaOp[Any, ReplicatedData] = { - val group = ormapDeltaGroupFromBinary(bytes) - if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]]) - group.ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]] + val ops = ormapDeltaGroupOpsFromBinary(bytes) + if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]]) + ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]] else throw new NotSerializableException("Improper ORMap delta remove operation size or kind") } // wire protocol is always delta group private def ormapRemoveKeyFromBinary(bytes: Array[Byte]): ORMap.RemoveKeyDeltaOp[Any, ReplicatedData] = { - val group = ormapDeltaGroupFromBinary(bytes) - if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]]) - group.ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]] + val ops = ormapDeltaGroupOpsFromBinary(bytes) + if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]]) + ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]] else throw new NotSerializableException("Improper ORMap delta remove key operation size or kind") } // wire protocol is always delta group private def ormapUpdateFromBinary(bytes: Array[Byte]): ORMap.UpdateDeltaOp[Any, ReplicatedDelta] = { - val group = ormapDeltaGroupFromBinary(bytes) - if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]]) - group.ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]] + val ops = ormapDeltaGroupOpsFromBinary(bytes) + if (ops.size == 1 && ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]]) + ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]] else throw new NotSerializableException("Improper ORMap delta update operation size or kind") } // this can be made client-extendable in the same way as Http codes in Spray are private def zeroTagFromCode(code: Int) = code match { - case ORMap.VanillaORMapTag.value ⇒ ORMap.VanillaORMapTag - case ORMap.PNCounterMapTag.value ⇒ ORMap.PNCounterMapTag - case ORMap.ORMultiMapTag.value ⇒ ORMap.ORMultiMapTag - case ORMap.ORMultiMapWithValueDeltasTag.value ⇒ ORMap.ORMultiMapWithValueDeltasTag - case ORMap.LWWMapTag.value ⇒ ORMap.LWWMapTag - case _ ⇒ throw new IllegalArgumentException("Invalid ZeroTag code") + case ORMap.VanillaORMapTag.value ⇒ ORMap.VanillaORMapTag + case PNCounterMap.PNCounterMapTag.value ⇒ PNCounterMap.PNCounterMapTag + case ORMultiMap.ORMultiMapTag.value ⇒ ORMultiMap.ORMultiMapTag + case ORMultiMap.ORMultiMapWithValueDeltasTag.value ⇒ ORMultiMap.ORMultiMapWithValueDeltasTag + case LWWMap.LWWMapTag.value ⇒ LWWMap.LWWMapTag + case _ ⇒ throw new IllegalArgumentException("Invalid ZeroTag code") } private def ormapDeltaGroupFromBinary(bytes: Array[Byte]): ORMap.DeltaGroup[Any, ReplicatedData] = { + ORMap.DeltaGroup(ormapDeltaGroupOpsFromBinary(bytes)) + } + + private def ormapDeltaGroupOpsFromBinary(bytes: Array[Byte]): scala.collection.immutable.IndexedSeq[ORMap.DeltaOp] = { val deltaGroup = rd.ORMapDeltaGroup.parseFrom(bytes) val ops: Vector[ORMap.DeltaOp] = deltaGroup.getEntriesList.asScala.map { entry ⇒ if (entry.getOperation == rd.ORMapDeltaOp.ORMapPut) { - val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) + val map = singleMapEntryFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) { ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) { - val elem = singleKeyEntryFromProto(entry.getEntryData) + val elem = singleKeyEntryFromProto(entry.getEntryDataList.asScala.headOption) ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), elem, zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) { - val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta]) + val map = mapTypeFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta]) ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag)) } else throw new NotSerializableException(s"Unknown ORMap delta operation ${entry.getOperation}") }(collection.breakOut) - ORMap.DeltaGroup(ops) + ops } private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) + ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])) } private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) + ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])) } private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) + ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])) } private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) + ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])) } private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = { + ormapDeltaGroupOpsToProto(deltaGroup.ops) + } + + private def ormapDeltaGroupOpsToProto(deltaGroupOps: scala.collection.immutable.IndexedSeq[ORMap.DeltaOp]): rd.ORMapDeltaGroup = { def createEntry(opType: rd.ORMapDeltaOp, u: ORSet[_], m: Map[_, _], zt: Int) = { - if (m.size > 1) + if (m.size > 1 && opType != rd.ORMapDeltaOp.ORMapUpdate) throw new IllegalArgumentException("Invalid size of ORMap delta map") else { - val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder() - m.headOption.map { - case (key: String, value) ⇒ entryDataBuilder.setStringKey(key).setValue(otherMessageToProto(value)) - case (key: Int, value) ⇒ entryDataBuilder.setIntKey(key).setValue(otherMessageToProto(value)) - case (key: Long, value) ⇒ entryDataBuilder.setLongKey(key).setValue(otherMessageToProto(value)) - case (key, value) ⇒ entryDataBuilder.setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value)) - } val builder = rd.ORMapDeltaGroup.Entry.newBuilder() .setOperation(opType) .setUnderlying(orsetToProto(u)) .setZeroTag(zt) - if (m.size > 0) - builder.setEntryData(entryDataBuilder.build()) + m.foreach { + case (key: String, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setStringKey(key).setValue(otherMessageToProto(value)).build()) + case (key: Int, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setIntKey(key).setValue(otherMessageToProto(value)).build()) + case (key: Long, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setLongKey(key).setValue(otherMessageToProto(value)).build()) + case (key, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value)).build()) + } builder } } @@ -709,12 +716,12 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) .setOperation(opType) .setUnderlying(orsetToProto(u)) .setZeroTag(zt) - builder.setEntryData(entryDataBuilder.build()) + builder.addEntryData(entryDataBuilder.build()) builder } val b = rd.ORMapDeltaGroup.newBuilder() - deltaGroup.ops.foreach { + deltaGroupOps.foreach { case ORMap.PutDeltaOp(op, pair, zt) ⇒ b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapPut, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, Map(pair), zt.value)) case ORMap.RemoveDeltaOp(op, zt) ⇒ @@ -742,7 +749,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto) new LWWMap(new ORMap( keys = orsetFromProto(lwwmap.getKeys), - entries, ORMap.LWWMapTag)) + entries, LWWMap.LWWMapTag)) } def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = { @@ -758,7 +765,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto) new PNCounterMap(new ORMap( keys = orsetFromProto(pncountermap.getKeys), - entries, ORMap.PNCounterMapTag)) + entries, PNCounterMap.PNCounterMapTag)) } def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = { @@ -783,9 +790,9 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) keys = orsetFromProto(multimap.getKeys), entries, if (withValueDeltas) - ORMap.ORMultiMapWithValueDeltasTag + ORMultiMap.ORMultiMapWithValueDeltasTag else - ORMap.ORMultiMapTag), + ORMultiMap.ORMultiMapTag), withValueDeltas) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala index 262c3da232..68ba4011ed 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala @@ -439,6 +439,77 @@ class ORMapSpec extends WordSpec with Matchers { merged6.entries("c").elements should be(Set("C")) } + "work with delta-coalescing scenario 1" in { + val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B") + val m2 = m1.resetDelta.put(node2, "b", GSet.empty + "B2").updated(node2, "b", GSet.empty[String])(_.add("B3")) + + val merged1 = m1 merge m2 + + merged1.entries("a").elements should be(Set("A")) + merged1.entries("b").elements should be(Set("B", "B2", "B3")) + + val merged2 = m1 mergeDelta m2.delta.get + + merged2.entries("a").elements should be(Set("A")) + merged2.entries("b").elements should be(Set("B", "B2", "B3")) + + val m3 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B") + val m4 = m3.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "b", GSet.empty + "B3") + + val merged3 = m3 merge m4 + + merged3.entries("a").elements should be(Set("A")) + merged3.entries("b").elements should be(Set("B", "B3")) + + val merged4 = m3 mergeDelta m4.delta.get + + merged4.entries("a").elements should be(Set("A")) + merged4.entries("b").elements should be(Set("B", "B3")) + + val m5 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B") + val m6 = m5.resetDelta.put(node2, "b", GSet.empty + "B2").updated(node2, "b", GSet.empty[String])(_.add("B3")) + .updated(node2, "b", GSet.empty[String])(_.add("B4")) + + val merged5 = m5 merge m6 + + merged5.entries("a").elements should be(Set("A")) + merged5.entries("b").elements should be(Set("B", "B2", "B3", "B4")) + + val merged6 = m5 mergeDelta m6.delta.get + + merged6.entries("a").elements should be(Set("A")) + merged6.entries("b").elements should be(Set("B", "B2", "B3", "B4")) + + val m7 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B") + val m8 = m7.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "d", GSet.empty + "D").put(node2, "b", GSet.empty + "B3") + + val merged7 = m7 merge m8 + + merged7.entries("a").elements should be(Set("A")) + merged7.entries("b").elements should be(Set("B", "B3")) + merged7.entries("d").elements should be(Set("D")) + + val merged8 = m7 mergeDelta m8.delta.get + + merged8.entries("a").elements should be(Set("A")) + merged8.entries("b").elements should be(Set("B", "B3")) + merged8.entries("d").elements should be(Set("D")) + + val m9 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node2, "b", GSet.empty + "B") + val m10 = m9.resetDelta.put(node2, "b", GSet.empty + "B2").put(node2, "d", GSet.empty + "D") + .remove(node2, "d").put(node2, "b", GSet.empty + "B3") + + val merged9 = m9 merge m10 + + merged9.entries("a").elements should be(Set("A")) + merged9.entries("b").elements should be(Set("B", "B3")) + + val merged10 = m9 mergeDelta m10.delta.get + + merged10.entries("a").elements should be(Set("A")) + merged10.entries("b").elements should be(Set("B", "B3")) + } + "work with deltas and updated for GSet elements type" in { val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") val m2 = m1.resetDelta.updated(node1, "a", GSet.empty[String])(_.add("B")) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala index f95af2c934..441c115a4e 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala @@ -133,6 +133,12 @@ class ORMultiMapSpec extends WordSpec with Matchers { merged3.entries("a") should be(Set("A")) merged3.entries("b") should be(Set("B2")) merged3.entries("c") should be(Set("C")) + + val merged4 = merged1 mergeDelta m3.delta.get.merge(m4.delta.get) + + merged4.entries("a") should be(Set("A")) + merged4.entries("b") should be(Set("B2")) + merged4.entries("c") should be(Set("C")) } "not have usual anomalies for remove+addBinding scenario and delta-deltas 2" in { @@ -181,6 +187,217 @@ class ORMultiMapSpec extends WordSpec with Matchers { merged6.entries("c") should be(Set("C")) } + "work with delta-coalescing scenario 1" in { + val m1 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m2 = m1.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3") + + val merged1 = m1 merge m2 + + merged1.entries("a") should be(Set("A")) + merged1.entries("b") should be(Set("B2", "B3")) + + val merged2 = m1 mergeDelta m2.delta.get + + merged2.entries("a") should be(Set("A")) + merged2.entries("b") should be(Set("B2", "B3")) + + val m3 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m4 = m3.resetDelta.put(node2, "b", Set("B2")).put(node2, "b", Set("B3")) + + val merged3 = m3 merge m4 + + merged3.entries("a") should be(Set("A")) + merged3.entries("b") should be(Set("B3")) + + val merged4 = m3 mergeDelta m4.delta.get + + merged4.entries("a") should be(Set("A")) + merged4.entries("b") should be(Set("B3")) + + val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m6 = m5.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3").addBinding(node2, "b", "B4") + + val merged5 = m5 merge m6 + + merged5.entries("a") should be(Set("A")) + merged5.entries("b") should be(Set("B2", "B3", "B4")) + + val merged6 = m5 mergeDelta m6.delta.get + + merged6.entries("a") should be(Set("A")) + merged6.entries("b") should be(Set("B2", "B3", "B4")) + + val m7 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m8 = m7.resetDelta.put(node2, "d", Set("D")).addBinding(node2, "b", "B3").put(node2, "b", Set("B4")) + + val merged7 = m7 merge m8 + + merged7.entries("a") should be(Set("A")) + merged7.entries("b") should be(Set("B4")) + merged7.entries("d") should be(Set("D")) + + val merged8 = m7 mergeDelta m8.delta.get + + merged8.entries("a") should be(Set("A")) + merged8.entries("b") should be(Set("B4")) + merged8.entries("d") should be(Set("D")) + + val m9 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m10 = m9.resetDelta.addBinding(node2, "b", "B3").addBinding(node2, "b", "B4") + + val merged9 = m9 merge m10 + + merged9.entries("a") should be(Set("A")) + merged9.entries("b") should be(Set("B", "B3", "B4")) + + val merged10 = m9 mergeDelta m10.delta.get + + merged10.entries("a") should be(Set("A")) + merged10.entries("b") should be(Set("B", "B3", "B4")) + + val m11 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1")) + .remove(node1, "b") + val m12 = m11.resetDelta.addBinding(node2, "b", "B2").addBinding(node2, "b", "B3") + + val merged11 = m11 merge m12 + + merged11.entries("a") should be(Set("A")) + merged11.entries("b") should be(Set("B2", "B3")) + + val merged12 = m11 mergeDelta m12.delta.get + + merged12.entries("a") should be(Set("A")) + merged12.entries("b") should be(Set("B2", "B3")) + + val m13 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1")) + .remove(node1, "b") + val m14 = m13.resetDelta.addBinding(node2, "b", "B2").put(node2, "b", Set("B3")) + + val merged13 = m13 merge m14 + + merged13.entries("a") should be(Set("A")) + merged13.entries("b") should be(Set("B3")) + + val merged14 = m13 mergeDelta m14.delta.get + + merged14.entries("a") should be(Set("A")) + merged14.entries("b") should be(Set("B3")) + + val m15 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1")) + .put(node1, "c", Set("C")) + val m16 = m15.resetDelta.addBinding(node2, "b", "B2").addBinding(node2, "c", "C1") + + val merged15 = m15 merge m16 + + merged15.entries("a") should be(Set("A")) + merged15.entries("b") should be(Set("B", "B1", "B2")) + merged15.entries("c") should be(Set("C", "C1")) + + val merged16 = m15 mergeDelta m16.delta.get + + merged16.entries("a") should be(Set("A")) + merged16.entries("b") should be(Set("B", "B1", "B2")) + merged16.entries("c") should be(Set("C", "C1")) + + // somewhat artificial setup + val m17 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1")) + val m18 = m17.resetDelta.addBinding(node2, "b", "B2") + val m19 = ORMultiMap.emptyWithValueDeltas[String, String].resetDelta.put(node2, "b", Set("B3")) + + val merged17 = m17 merge m18 merge m19 + + merged17.entries("a") should be(Set("A")) + merged17.entries("b") should be(Set("B", "B1", "B3")) + + val merged18 = m17 mergeDelta m18.delta.get.merge(m19.delta.get) + + merged18.entries("a") should be(Set("A")) + merged18.entries("b") should be(Set("B", "B1", "B3")) + } + + "work with delta-coalescing scenario 2" in { + val m1 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m2 = m1.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3") + + val merged1 = m1 merge m2 + + merged1.entries("a") should be(Set("A")) + merged1.entries("b") should be(Set("B2", "B3")) + + val merged2 = m1 mergeDelta m2.delta.get + + merged2.entries("a") should be(Set("A")) + merged2.entries("b") should be(Set("B2", "B3")) + + val m3 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m4 = m3.resetDelta.put(node2, "b", Set("B2")).put(node2, "b", Set("B3")) + + val merged3 = m3 merge m4 + + merged3.entries("a") should be(Set("A")) + merged3.entries("b") should be(Set("B3")) + + val merged4 = m3 mergeDelta m4.delta.get + + merged4.entries("a") should be(Set("A")) + merged4.entries("b") should be(Set("B3")) + + val m5 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m6 = m5.resetDelta.put(node2, "b", Set("B2")).addBinding(node2, "b", "B3").addBinding(node2, "b", "B4") + + val merged5 = m5 merge m6 + + merged5.entries("a") should be(Set("A")) + merged5.entries("b") should be(Set("B2", "B3", "B4")) + + val merged6 = m5 mergeDelta m6.delta.get + + merged6.entries("a") should be(Set("A")) + merged6.entries("b") should be(Set("B2", "B3", "B4")) + + val m7 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m8 = m7.resetDelta.put(node2, "d", Set("D")).addBinding(node2, "b", "B3").put(node2, "b", Set("B4")) + + val merged7 = m7 merge m8 + + merged7.entries("a") should be(Set("A")) + merged7.entries("b") should be(Set("B4")) + merged7.entries("d") should be(Set("D")) + + val merged8 = m7 mergeDelta m8.delta.get + + merged8.entries("a") should be(Set("A")) + merged8.entries("b") should be(Set("B4")) + merged8.entries("d") should be(Set("D")) + + val m9 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) + val m10 = m9.resetDelta.addBinding(node2, "b", "B3").addBinding(node2, "b", "B4") + + val merged9 = m9 merge m10 + + merged9.entries("a") should be(Set("A")) + merged9.entries("b") should be(Set("B", "B3", "B4")) + + val merged10 = m9 mergeDelta m10.delta.get + + merged10.entries("a") should be(Set("A")) + merged10.entries("b") should be(Set("B", "B3", "B4")) + + val m11 = ORMultiMap.empty[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B", "B1")) + .remove(node1, "b") + val m12 = ORMultiMap.empty[String, String].addBinding(node2, "b", "B2").addBinding(node2, "b", "B3") + + val merged11 = m11 merge m12 + + merged11.entries("a") should be(Set("A")) + merged11.entries("b") should be(Set("B2", "B3")) + + val merged12 = m11 mergeDelta m12.delta.get + + merged12.entries("a") should be(Set("A")) + merged12.entries("b") should be(Set("B2", "B3")) + } + "have unapply extractor" in { val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L)) val m2: ORMultiMap[String, Long] = m1 diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index da86a68dc2..5a63607339 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -263,6 +263,10 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( val m1 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2") val m2 = ORMultiMap.empty[String, String].put(address2, "b", Set("B1", "B2", "B3")) checkSameContent(m1.merge(m2), m2.merge(m1)) + checkSerialization(ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address1, "a", "A2").delta.get) + val m3 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1") + val d3 = m3.resetDelta.addBinding(address1, "a", "A2").addBinding(address1, "a", "A3").delta.get + checkSerialization(d3) } "be compatible with old ORMultiMap serialization" in {