From 7041c76ba910e4474b1004060c5cbc08a121b5e1 Mon Sep 17 00:00:00 2001 From: Christopher Hunt Date: Thu, 18 Jun 2015 16:17:53 +0200 Subject: [PATCH] +cdd #16799 Add ORMultiMap data type --- .../protobuf/msg/ReplicatedDataMessages.java | 1615 ++++++++++++++++- .../protobuf/ReplicatedDataMessages.proto | 10 +- .../main/scala/akka/cluster/ddata/ORMap.scala | 4 +- .../scala/akka/cluster/ddata/ORMultiMap.scala | 233 +++ .../scala/akka/cluster/ddata/Replicator.scala | 2 +- .../protobuf/ReplicatedDataSerializer.scala | 29 +- .../akka/cluster/ddata/ORMultiMapSpec.scala | 122 ++ .../ReplicatedDataSerializerSpec.scala | 14 + .../docs/ddata/DistributedDataDocTest.java | 77 +- akka-docs/rst/java/distributed-data.rst | 7 +- .../docs/ddata/DistributedDataDocSpec.scala | 13 + akka-docs/rst/scala/distributed-data.rst | 7 +- 12 files changed, 2083 insertions(+), 50 deletions(-) create mode 100644 akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala create mode 100644 akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java index a9d111c12a..929b0c17fa 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java @@ -12309,6 +12309,1590 @@ public final class ReplicatedDataMessages { // @@protoc_insertion_point(class_scope:akka.cluster.ddata.PNCounterMap) } + public interface ORMultiMapOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required .akka.cluster.ddata.ORSet keys = 1; + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + boolean hasKeys(); + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getKeys(); + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getKeysOrBuilder(); + + // repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + java.util.List + getEntriesList(); + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry getEntries(int index); + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + int getEntriesCount(); + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + java.util.List + getEntriesOrBuilderList(); + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder getEntriesOrBuilder( + int index); + } + /** + * Protobuf type {@code akka.cluster.ddata.ORMultiMap} + */ + public static final class ORMultiMap extends + com.google.protobuf.GeneratedMessage + implements ORMultiMapOrBuilder { + // Use ORMultiMap.newBuilder() to construct. + private ORMultiMap(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ORMultiMap(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ORMultiMap defaultInstance; + public static ORMultiMap getDefaultInstance() { + return defaultInstance; + } + + public ORMultiMap getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ORMultiMap( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = keys_.toBuilder(); + } + keys_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(keys_); + keys_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + entries_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000002; + } + entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + entries_ = java.util.Collections.unmodifiableList(entries_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.class, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public ORMultiMap parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new ORMultiMap(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + public interface EntryOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string key = 1; + /** + * required string key = 1; + */ + boolean hasKey(); + /** + * required string key = 1; + */ + java.lang.String getKey(); + /** + * required string key = 1; + */ + com.google.protobuf.ByteString + getKeyBytes(); + + // required .akka.cluster.ddata.ORSet value = 2; + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + boolean hasValue(); + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getValue(); + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getValueOrBuilder(); + } + /** + * Protobuf type {@code akka.cluster.ddata.ORMultiMap.Entry} + */ + public static final class Entry extends + com.google.protobuf.GeneratedMessage + implements EntryOrBuilder { + // Use Entry.newBuilder() to construct. + private Entry(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Entry(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Entry defaultInstance; + public static Entry getDefaultInstance() { + return defaultInstance; + } + + public Entry getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Entry( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + key_ = input.readBytes(); + break; + } + case 18: { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = value_.toBuilder(); + } + value_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(value_); + value_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_Entry_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.class, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public Entry parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Entry(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string key = 1; + public static final int KEY_FIELD_NUMBER = 1; + private java.lang.Object key_; + /** + * required string key = 1; + */ + public boolean hasKey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string key = 1; + */ + public java.lang.String getKey() { + java.lang.Object ref = key_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + key_ = s; + } + return s; + } + } + /** + * required string key = 1; + */ + public com.google.protobuf.ByteString + getKeyBytes() { + java.lang.Object ref = key_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + key_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required .akka.cluster.ddata.ORSet value = 2; + public static final int VALUE_FIELD_NUMBER = 2; + private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value_; + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public boolean hasValue() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getValue() { + return value_; + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getValueOrBuilder() { + return value_; + } + + private void initFields() { + key_ = ""; + value_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasKey()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasValue()) { + memoizedIsInitialized = 0; + return false; + } + if (!getValue().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getKeyBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, value_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getKeyBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, value_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code akka.cluster.ddata.ORMultiMap.Entry} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_Entry_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.class, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder.class); + } + + // Construct using akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getValueFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + key_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + if (valueBuilder_ == null) { + value_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + } else { + valueBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor; + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry getDefaultInstanceForType() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.getDefaultInstance(); + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry build() { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry buildPartial() { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry result = new akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.key_ = key_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (valueBuilder_ == null) { + result.value_ = value_; + } else { + result.value_ = valueBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry) { + return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry other) { + if (other == akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.getDefaultInstance()) return this; + if (other.hasKey()) { + bitField0_ |= 0x00000001; + key_ = other.key_; + onChanged(); + } + if (other.hasValue()) { + mergeValue(other.getValue()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasKey()) { + + return false; + } + if (!hasValue()) { + + return false; + } + if (!getValue().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string key = 1; + private java.lang.Object key_ = ""; + /** + * required string key = 1; + */ + public boolean hasKey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string key = 1; + */ + public java.lang.String getKey() { + java.lang.Object ref = key_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + key_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string key = 1; + */ + public com.google.protobuf.ByteString + getKeyBytes() { + java.lang.Object ref = key_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + key_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string key = 1; + */ + public Builder setKey( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + key_ = value; + onChanged(); + return this; + } + /** + * required string key = 1; + */ + public Builder clearKey() { + bitField0_ = (bitField0_ & ~0x00000001); + key_ = getDefaultInstance().getKey(); + onChanged(); + return this; + } + /** + * required string key = 1; + */ + public Builder setKeyBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + key_ = value; + onChanged(); + return this; + } + + // required .akka.cluster.ddata.ORSet value = 2; + private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder> valueBuilder_; + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public boolean hasValue() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getValue() { + if (valueBuilder_ == null) { + return value_; + } else { + return valueBuilder_.getMessage(); + } + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public Builder setValue(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value) { + if (valueBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + value_ = value; + onChanged(); + } else { + valueBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public Builder setValue( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder builderForValue) { + if (valueBuilder_ == null) { + value_ = builderForValue.build(); + onChanged(); + } else { + valueBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public Builder mergeValue(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value) { + if (valueBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + value_ != akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance()) { + value_ = + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.newBuilder(value_).mergeFrom(value).buildPartial(); + } else { + value_ = value; + } + onChanged(); + } else { + valueBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public Builder clearValue() { + if (valueBuilder_ == null) { + value_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + onChanged(); + } else { + valueBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder getValueBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getValueFieldBuilder().getBuilder(); + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getValueOrBuilder() { + if (valueBuilder_ != null) { + return valueBuilder_.getMessageOrBuilder(); + } else { + return value_; + } + } + /** + * required .akka.cluster.ddata.ORSet value = 2; + */ + private com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder> + getValueFieldBuilder() { + if (valueBuilder_ == null) { + valueBuilder_ = new com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder>( + value_, + getParentForChildren(), + isClean()); + value_ = null; + } + return valueBuilder_; + } + + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.ORMultiMap.Entry) + } + + static { + defaultInstance = new Entry(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:akka.cluster.ddata.ORMultiMap.Entry) + } + + private int bitField0_; + // required .akka.cluster.ddata.ORSet keys = 1; + public static final int KEYS_FIELD_NUMBER = 1; + private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet keys_; + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public boolean hasKeys() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getKeys() { + return keys_; + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getKeysOrBuilder() { + return keys_; + } + + // repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + public static final int ENTRIES_FIELD_NUMBER = 2; + private java.util.List entries_; + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public java.util.List getEntriesList() { + return entries_; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public java.util.List + getEntriesOrBuilderList() { + return entries_; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public int getEntriesCount() { + return entries_.size(); + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry getEntries(int index) { + return entries_.get(index); + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder getEntriesOrBuilder( + int index) { + return entries_.get(index); + } + + private void initFields() { + keys_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + entries_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasKeys()) { + memoizedIsInitialized = 0; + return false; + } + if (!getKeys().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + for (int i = 0; i < getEntriesCount(); i++) { + if (!getEntries(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, keys_); + } + for (int i = 0; i < entries_.size(); i++) { + output.writeMessage(2, entries_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, keys_); + } + for (int i = 0; i < entries_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, entries_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code akka.cluster.ddata.ORMultiMap} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMapOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.class, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Builder.class); + } + + // Construct using akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getKeysFieldBuilder(); + getEntriesFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (keysBuilder_ == null) { + keys_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + } else { + keysBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + if (entriesBuilder_ == null) { + entries_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + } else { + entriesBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.internal_static_akka_cluster_ddata_ORMultiMap_descriptor; + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap getDefaultInstanceForType() { + return akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.getDefaultInstance(); + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap build() { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap buildPartial() { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap result = new akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (keysBuilder_ == null) { + result.keys_ = keys_; + } else { + result.keys_ = keysBuilder_.build(); + } + if (entriesBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002)) { + entries_ = java.util.Collections.unmodifiableList(entries_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.entries_ = entries_; + } else { + result.entries_ = entriesBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap) { + return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap other) { + if (other == akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.getDefaultInstance()) return this; + if (other.hasKeys()) { + mergeKeys(other.getKeys()); + } + if (entriesBuilder_ == null) { + if (!other.entries_.isEmpty()) { + if (entries_.isEmpty()) { + entries_ = other.entries_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureEntriesIsMutable(); + entries_.addAll(other.entries_); + } + onChanged(); + } + } else { + if (!other.entries_.isEmpty()) { + if (entriesBuilder_.isEmpty()) { + entriesBuilder_.dispose(); + entriesBuilder_ = null; + entries_ = other.entries_; + bitField0_ = (bitField0_ & ~0x00000002); + entriesBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getEntriesFieldBuilder() : null; + } else { + entriesBuilder_.addAllMessages(other.entries_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasKeys()) { + + return false; + } + if (!getKeys().isInitialized()) { + + return false; + } + for (int i = 0; i < getEntriesCount(); i++) { + if (!getEntries(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required .akka.cluster.ddata.ORSet keys = 1; + private akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet keys_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder> keysBuilder_; + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public boolean hasKeys() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet getKeys() { + if (keysBuilder_ == null) { + return keys_; + } else { + return keysBuilder_.getMessage(); + } + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public Builder setKeys(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value) { + if (keysBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + keys_ = value; + onChanged(); + } else { + keysBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public Builder setKeys( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder builderForValue) { + if (keysBuilder_ == null) { + keys_ = builderForValue.build(); + onChanged(); + } else { + keysBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public Builder mergeKeys(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet value) { + if (keysBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + keys_ != akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance()) { + keys_ = + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.newBuilder(keys_).mergeFrom(value).buildPartial(); + } else { + keys_ = value; + } + onChanged(); + } else { + keysBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public Builder clearKeys() { + if (keysBuilder_ == null) { + keys_ = akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.getDefaultInstance(); + onChanged(); + } else { + keysBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder getKeysBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getKeysFieldBuilder().getBuilder(); + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder getKeysOrBuilder() { + if (keysBuilder_ != null) { + return keysBuilder_.getMessageOrBuilder(); + } else { + return keys_; + } + } + /** + * required .akka.cluster.ddata.ORSet keys = 1; + */ + private com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder> + getKeysFieldBuilder() { + if (keysBuilder_ == null) { + keysBuilder_ = new com.google.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSet.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORSetOrBuilder>( + keys_, + getParentForChildren(), + isClean()); + keys_ = null; + } + return keysBuilder_; + } + + // repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + private java.util.List entries_ = + java.util.Collections.emptyList(); + private void ensureEntriesIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + entries_ = new java.util.ArrayList(entries_); + bitField0_ |= 0x00000002; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder> entriesBuilder_; + + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public java.util.List getEntriesList() { + if (entriesBuilder_ == null) { + return java.util.Collections.unmodifiableList(entries_); + } else { + return entriesBuilder_.getMessageList(); + } + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public int getEntriesCount() { + if (entriesBuilder_ == null) { + return entries_.size(); + } else { + return entriesBuilder_.getCount(); + } + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry getEntries(int index) { + if (entriesBuilder_ == null) { + return entries_.get(index); + } else { + return entriesBuilder_.getMessage(index); + } + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder setEntries( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry value) { + if (entriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntriesIsMutable(); + entries_.set(index, value); + onChanged(); + } else { + entriesBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder setEntries( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder builderForValue) { + if (entriesBuilder_ == null) { + ensureEntriesIsMutable(); + entries_.set(index, builderForValue.build()); + onChanged(); + } else { + entriesBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder addEntries(akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry value) { + if (entriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntriesIsMutable(); + entries_.add(value); + onChanged(); + } else { + entriesBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder addEntries( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry value) { + if (entriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntriesIsMutable(); + entries_.add(index, value); + onChanged(); + } else { + entriesBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder addEntries( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder builderForValue) { + if (entriesBuilder_ == null) { + ensureEntriesIsMutable(); + entries_.add(builderForValue.build()); + onChanged(); + } else { + entriesBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder addEntries( + int index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder builderForValue) { + if (entriesBuilder_ == null) { + ensureEntriesIsMutable(); + entries_.add(index, builderForValue.build()); + onChanged(); + } else { + entriesBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder addAllEntries( + java.lang.Iterable values) { + if (entriesBuilder_ == null) { + ensureEntriesIsMutable(); + super.addAll(values, entries_); + onChanged(); + } else { + entriesBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder clearEntries() { + if (entriesBuilder_ == null) { + entries_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + } else { + entriesBuilder_.clear(); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public Builder removeEntries(int index) { + if (entriesBuilder_ == null) { + ensureEntriesIsMutable(); + entries_.remove(index); + onChanged(); + } else { + entriesBuilder_.remove(index); + } + return this; + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder getEntriesBuilder( + int index) { + return getEntriesFieldBuilder().getBuilder(index); + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder getEntriesOrBuilder( + int index) { + if (entriesBuilder_ == null) { + return entries_.get(index); } else { + return entriesBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public java.util.List + getEntriesOrBuilderList() { + if (entriesBuilder_ != null) { + return entriesBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(entries_); + } + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder addEntriesBuilder() { + return getEntriesFieldBuilder().addBuilder( + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder addEntriesBuilder( + int index) { + return getEntriesFieldBuilder().addBuilder( + index, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.getDefaultInstance()); + } + /** + * repeated .akka.cluster.ddata.ORMultiMap.Entry entries = 2; + */ + public java.util.List + getEntriesBuilderList() { + return getEntriesFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder> + getEntriesFieldBuilder() { + if (entriesBuilder_ == null) { + entriesBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.Entry.Builder, akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages.ORMultiMap.EntryOrBuilder>( + entries_, + ((bitField0_ & 0x00000002) == 0x00000002), + getParentForChildren(), + isClean()); + entries_ = null; + } + return entriesBuilder_; + } + + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.ORMultiMap) + } + + static { + defaultInstance = new ORMultiMap(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:akka.cluster.ddata.ORMultiMap) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_akka_cluster_ddata_GSet_descriptor; private static @@ -12384,6 +13968,16 @@ public final class ReplicatedDataMessages { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_akka_cluster_ddata_PNCounterMap_Entry_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_akka_cluster_ddata_ORMultiMap_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_akka_cluster_ddata_ORMultiMap_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_akka_cluster_ddata_ORMultiMap_Entry_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -12431,8 +14025,13 @@ public final class ReplicatedDataMessages { "akka.cluster.ddata.ORSet\0227\n\007entries\030\002 \003(" + "\0132&.akka.cluster.ddata.PNCounterMap.Entr" + "y\032B\n\005Entry\022\013\n\003key\030\001 \002(\t\022,\n\005value\030\002 \002(\0132\035" + - ".akka.cluster.ddata.PNCounterB#\n\037akka.cl" + - "uster.ddata.protobuf.msgH\001" + ".akka.cluster.ddata.PNCounter\"\254\001\n\nORMult" + + "iMap\022\'\n\004keys\030\001 \002(\0132\031.akka.cluster.ddata.", + "ORSet\0225\n\007entries\030\002 \003(\0132$.akka.cluster.dd" + + "ata.ORMultiMap.Entry\032>\n\005Entry\022\013\n\003key\030\001 \002" + + "(\t\022(\n\005value\030\002 \002(\0132\031.akka.cluster.ddata.O" + + "RSetB#\n\037akka.cluster.ddata.protobuf.msgH" + + "\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -12529,6 +14128,18 @@ public final class ReplicatedDataMessages { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_PNCounterMap_Entry_descriptor, new java.lang.String[] { "Key", "Value", }); + internal_static_akka_cluster_ddata_ORMultiMap_descriptor = + getDescriptor().getMessageTypes().get(10); + internal_static_akka_cluster_ddata_ORMultiMap_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_akka_cluster_ddata_ORMultiMap_descriptor, + new java.lang.String[] { "Keys", "Entries", }); + internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor = + internal_static_akka_cluster_ddata_ORMultiMap_descriptor.getNestedTypes().get(0); + internal_static_akka_cluster_ddata_ORMultiMap_Entry_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_akka_cluster_ddata_ORMultiMap_Entry_descriptor, + new java.lang.String[] { "Key", "Value", }); return null; } }; diff --git a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto index 146e227104..9f612f47ff 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto @@ -86,7 +86,15 @@ message PNCounterMap { repeated Entry entries = 2; } - +message ORMultiMap { + message Entry { + required string key = 1; + required ORSet value = 2; + } + + required ORSet keys = 1; + repeated Entry entries = 2; +} diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index fdc52f4da1..4bd44ca720 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -87,7 +87,7 @@ final class ORMap[A <: ReplicatedData] private[akka] ( * * `IllegalArgumentException` is thrown if you try to replace an existing `ORSet` * value, because important history can be lost when replacing the `ORSet` and - * undesired effects of merging will occur. + * undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead. */ def put(node: Cluster, key: String, value: A): ORMap[A] = put(node.selfUniqueAddress, key, value) @@ -99,7 +99,7 @@ final class ORMap[A <: ReplicatedData] private[akka] ( throw new IllegalArgumentException( "`ORMap.put` must not be used to replace an existing `ORSet` " + "value, because important history can be lost when replacing the `ORSet` and " + - "undesired effects of merging will occur. Use `ORMap.updated` instead.") + "undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.") else new ORMap(keys.add(node, key), values.updated(key, value)) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala new file mode 100644 index 0000000000..71ad5f02e1 --- /dev/null +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -0,0 +1,233 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster.ddata + +import akka.cluster.{ UniqueAddress, Cluster } + +object ORMultiMap { + + val _empty: ORMultiMap[Any] = new ORMultiMap(ORMap.empty) + /** + * Provides an empty multimap. + */ + def empty[A]: ORMultiMap[A] = _empty.asInstanceOf[ORMultiMap[A]] + def apply(): ORMultiMap[Any] = _empty + + /** + * Java API + */ + def create[A](): ORMultiMap[A] = empty[A] + + /** + * Extract the [[ORMultiMap#entries]]. + */ + def unapply[A](m: ORMultiMap[A]): Option[Map[String, Set[A]]] = Some(m.entries) + + /** + * Extract the [[ORMultiMap#entries]] of an `ORMultiMap`. + */ + def unapply(value: Any): Option[Map[String, Set[Any]]] = value match { + case m: ORMultiMap[Any] @unchecked ⇒ Some(m.entries) + case _ ⇒ None + } +} + +/** + * An immutable multi-map implementation. This class wraps an + * [[ORMap]] with an [[ORSet]] for the map's value. + * + * This class is immutable, i.e. "modifying" methods return a new instance. + */ +@SerialVersionUID(1L) +final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORSet[A]]) + extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { + + override type T = ORMultiMap[A] + + override def merge(that: T): T = + new ORMultiMap(underlying.merge(that.underlying)) + + /** + * Scala API: All entries of a multimap where keys are strings and values are sets. + */ + def entries: Map[String, Set[A]] = + underlying.entries.map { case (k, v) ⇒ k -> v.elements } + + /** + * Java API: All entries of a multimap where keys are strings and values are sets. + */ + def getEntries(): java.util.Map[String, java.util.Set[A]] = { + import scala.collection.JavaConverters._ + val result = new java.util.HashMap[String, java.util.Set[A]] + underlying.entries.foreach { + case (k, v) ⇒ result.put(k, v.elements.asJava) + } + result + } + + /** + * Get the set associated with the key if there is one. + */ + def get(key: String): Option[Set[A]] = + underlying.get(key).map(_.elements) + + /** + * Scala API: Get the set associated with the key if there is one, + * else return the given default. + */ + def getOrElse(key: String, default: ⇒ Set[A]): Set[A] = + get(key).getOrElse(default) + + def contains(key: String): Boolean = underlying.contains(key) + + def isEmpty: Boolean = underlying.isEmpty + + def size: Int = underlying.size + + /** + * Convenience for put. Requires an implicit Cluster. + * @see [[#put]] + */ + def +(entry: (String, Set[A]))(implicit node: Cluster): ORMultiMap[A] = { + val (key, value) = entry + put(node, key, value) + } + + /** + * Scala API: Associate an entire set with the key while retaining the history of the previous + * replicated data set. + */ + def put(node: Cluster, key: String, value: Set[A]): ORMultiMap[A] = + put(node.selfUniqueAddress, key, value) + + /** + * Java API: Associate an entire set with the key while retaining the history of the previous + * replicated data set. + */ + def put(node: Cluster, key: String, value: java.util.Set[A]): ORMultiMap[A] = { + import scala.collection.JavaConverters._ + put(node, key, value.asScala.toSet) + } + + /** + * INTERNAL API + */ + private[akka] def put(node: UniqueAddress, key: String, value: Set[A]): ORMultiMap[A] = { + val newUnderlying = underlying.updated(node, key, ORSet.empty[A]) { existing ⇒ + value.foldLeft(existing.clear(node)) { (s, element) ⇒ s.add(node, element) } + } + new ORMultiMap(newUnderlying) + } + + /** + * Convenience for remove. Requires an implicit Cluster. + * @see [[#remove]] + */ + def -(key: String)(implicit node: Cluster): ORMultiMap[A] = + remove(node, key) + + /** + * Remove an entire set associated with the key. + */ + def remove(node: Cluster, key: String): ORMultiMap[A] = + remove(node.selfUniqueAddress, key) + + /** + * INTERNAL API + */ + private[akka] def remove(node: UniqueAddress, key: String): ORMultiMap[A] = + new ORMultiMap(underlying.remove(node, key)) + + /** + * Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. + */ + def addBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] = + addBinding(node.selfUniqueAddress, key, element) + + /** + * Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. + */ + def addBinding(node: Cluster, key: String, element: A): ORMultiMap[A] = + addBinding(key, element)(node) + + /** + * INTERNAL API + */ + private[akka] def addBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = { + val newUnderlying = underlying.updated(node, key, ORSet.empty[A])(_.add(node, element)) + new ORMultiMap(newUnderlying) + } + + /** + * Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the + * entire set will be removed. + */ + def removeBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] = + removeBinding(node.selfUniqueAddress, key, element) + + /** + * Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the + * entire set will be removed. + */ + def removeBinding(node: Cluster, key: String, element: A): ORMultiMap[A] = + removeBinding(key, element)(node) + + /** + * INTERNAL API + */ + private[akka] def removeBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = { + val newUnderlying = { + val u = underlying.updated(node, key, ORSet.empty[A])(_.remove(node, element)) + u.get(key) match { + case Some(s) if s.isEmpty ⇒ u.remove(node, key) + case _ ⇒ u + } + } + new ORMultiMap(newUnderlying) + } + + /** + * Replace an element of a set associated with a key with a new one if it is different. This is useful when an element is removed + * and another one is added within the same Update. The order of addition and removal is important in order + * to retain history for replicated data. + */ + def replaceBinding(key: String, oldElement: A, newElement: A)(implicit node: Cluster): ORMultiMap[A] = + replaceBinding(node.selfUniqueAddress, key, oldElement, newElement) + + /** + * INTERNAL API + */ + private[akka] def replaceBinding(node: UniqueAddress, key: String, oldElement: A, newElement: A): ORMultiMap[A] = + if (newElement != oldElement) + addBinding(node, key, newElement).removeBinding(node, key, oldElement) + else + this + + override def needPruningFrom(removedNode: UniqueAddress): Boolean = + underlying.needPruningFrom(removedNode) + + override def pruningCleanup(removedNode: UniqueAddress): T = + new ORMultiMap(underlying.pruningCleanup(removedNode)) + + override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): T = + new ORMultiMap(underlying.prune(removedNode, collapseInto)) + + // this class cannot be a `case class` because we need different `unapply` + + override def toString: String = s"ORMulti$entries" + + override def equals(o: Any): Boolean = o match { + case other: ORMultiMap[_] ⇒ underlying == other.underlying + case _ ⇒ false + } + + override def hashCode: Int = underlying.hashCode +} + +object ORMultiMapKey { + def create[A](id: String): Key[ORMultiMap[A]] = ORMultiMapKey(id) +} + +@SerialVersionUID(1L) +final case class ORMultiMapKey[A](_id: String) extends Key[ORMultiMap[A]](_id) with ReplicatedDataSerialization diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 3859cab017..93d72bdafd 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -568,7 +568,7 @@ object Replicator { *
  • Counters: [[GCounter]], [[PNCounter]]
  • *
  • Registers: [[LWWRegister]], [[Flag]]
  • *
  • Sets: [[GSet]], [[ORSet]]
  • - *
  • Maps: [[ORMap]], [[LWWMap]], [[PNCounterMap]]
  • + *
  • Maps: [[ORMap]], [[ORMultiMap]], [[LWWMap]], [[PNCounterMap]]
  • * * * For good introduction to the CRDT subject watch the diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 5a192cc1ea..353c4a79a8 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -46,6 +46,8 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) private val LWWMapKeyManifest = "i" private val PNCounterMapManifest = "J" private val PNCounterMapKeyManifest = "j" + private val ORMultiMapManifest = "K" + private val ORMultiMapKeyManifest = "k" private val VersionVectorManifest = "L" private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( @@ -58,6 +60,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) ORMapManifest -> ormapFromBinary, LWWMapManifest -> lwwmapFromBinary, PNCounterMapManifest -> pncountermapFromBinary, + ORMultiMapManifest -> multimapFromBinary, DeletedDataManifest -> (_ ⇒ DeletedData), VersionVectorManifest -> versionVectorFromBinary, @@ -69,7 +72,8 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) PNCounterKeyManifest -> (bytes ⇒ PNCounterKey(keyIdFromBinary(bytes))), ORMapKeyManifest -> (bytes ⇒ ORMapKey(keyIdFromBinary(bytes))), LWWMapKeyManifest -> (bytes ⇒ LWWMapKey(keyIdFromBinary(bytes))), - PNCounterMapKeyManifest -> (bytes ⇒ PNCounterMapKey(keyIdFromBinary(bytes)))) + PNCounterMapKeyManifest -> (bytes ⇒ PNCounterMapKey(keyIdFromBinary(bytes))), + ORMultiMapKeyManifest -> (bytes ⇒ ORMultiMapKey(keyIdFromBinary(bytes)))) override def manifest(obj: AnyRef): String = obj match { case _: ORSet[_] ⇒ ORSetManifest @@ -81,6 +85,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) case _: ORMap[_] ⇒ ORMapManifest case _: LWWMap[_] ⇒ LWWMapManifest case _: PNCounterMap ⇒ PNCounterMapManifest + case _: ORMultiMap[_] ⇒ ORMultiMapManifest case DeletedData ⇒ DeletedDataManifest case _: VersionVector ⇒ VersionVectorManifest @@ -93,6 +98,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) case _: ORMapKey[_] ⇒ ORMapKeyManifest case _: LWWMapKey[_] ⇒ LWWMapKeyManifest case _: PNCounterMapKey ⇒ PNCounterMapKeyManifest + case _: ORMultiMapKey[_] ⇒ ORMultiMapKeyManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") @@ -108,6 +114,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) case m: ORMap[_] ⇒ compress(ormapToProto(m)) case m: LWWMap[_] ⇒ compress(lwwmapToProto(m)) case m: PNCounterMap ⇒ compress(pncountermapToProto(m)) + case m: ORMultiMap[_] ⇒ compress(multimapToProto(m)) case DeletedData ⇒ dm.Empty.getDefaultInstance.toByteArray case m: VersionVector ⇒ versionVectorToProto(m).toByteArray case Key(id) ⇒ keyIdToBinary(id) @@ -371,6 +378,26 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) entries)) } + def multimapToProto(multimap: ORMultiMap[_]): rd.ORMultiMap = { + val b = rd.ORMultiMap.newBuilder().setKeys(orsetToProto(multimap.underlying.keys)) + multimap.underlying.entries.toVector.sortBy { case (key, _) ⇒ key }.foreach { + case (key, value) ⇒ b.addEntries(rd.ORMultiMap.Entry.newBuilder(). + setKey(key).setValue(orsetToProto(value))) + } + b.build() + } + + def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any] = + multimapFromProto(rd.ORMultiMap.parseFrom(decompress(bytes))) + + def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any] = { + val entries = multimap.getEntriesList.asScala.map(entry ⇒ + entry.getKey -> orsetFromProto(entry.getValue)).toMap + new ORMultiMap(new ORMap( + keys = orsetFromProto(multimap.getKeys).asInstanceOf[ORSet[String]], + entries)) + } + def keyIdToBinary(id: String): Array[Byte] = id.getBytes(UTF_8) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala new file mode 100644 index 0000000000..fb08b36097 --- /dev/null +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster.ddata + +import akka.actor.Address +import akka.cluster.UniqueAddress +import akka.cluster.ddata.Replicator.Changed +import org.scalatest.{ Matchers, WordSpec } + +class ORMultiMapSpec extends WordSpec with Matchers { + + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + + "A ORMultiMap" must { + + "be able to add entries" in { + val m = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B") + m.entries should be(Map("a" -> Set("A"), "b" -> Set("B"))) + + val m2 = m.addBinding(node1, "a", "C") + m2.entries should be(Map("a" -> Set("A", "C"), "b" -> Set("B"))) + } + + "be able to remove entry" in { + val m = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B").removeBinding(node1, "a", "A") + m.entries should be(Map("b" -> Set("B"))) + } + + "be able to replace an entry" in { + val m = ORMultiMap().addBinding(node1, "a", "A").replaceBinding(node1, "a", "A", "B") + m.entries should be(Map("a" -> Set("B"))) + } + + "be able to have its entries correctly merged with another ORMultiMap with other entries" in { + val m1 = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B") + val m2 = ORMultiMap().addBinding(node2, "c", "C") + + // merge both ways + + val expectedMerge = Map( + "a" -> Set("A"), + "b" -> Set("B"), + "c" -> Set("C")) + + val merged1 = m1 merge m2 + merged1.entries should be(expectedMerge) + + val merged2 = m2 merge m1 + merged2.entries should be(expectedMerge) + } + + "be able to have its entries correctly merged with another ORMultiMap with overlapping entries" in { + val m1 = ORMultiMap() + .addBinding(node1, "a", "A1") + .addBinding(node1, "b", "B1") + .removeBinding(node1, "a", "A1") + .addBinding(node1, "d", "D1") + val m2 = ORMultiMap() + .addBinding(node2, "c", "C2") + .addBinding(node2, "a", "A2") + .addBinding(node2, "b", "B2") + .removeBinding(node2, "b", "B2") + .addBinding(node2, "d", "D2") + + // merge both ways + + val expectedMerged = Map( + "a" -> Set("A2"), + "b" -> Set("B1"), + "c" -> Set("C2"), + "d" -> Set("D1", "D2")) + + val merged1 = m1 merge m2 + merged1.entries should be(expectedMerged) + + val merged2 = m2 merge m1 + merged2.entries should be(expectedMerged) + } + } + + "be able to get all bindings for an entry and then reduce them upon putting them back" in { + val m = ORMultiMap().addBinding(node1, "a", "A1").addBinding(node1, "a", "A2").addBinding(node1, "b", "B1") + val Some(a) = m.get("a") + + a should be(Set("A1", "A2")) + + val m2 = m.put(node1, "a", a - "A1") + + val expectedMerged = Map( + "a" -> Set("A2"), + "b" -> Set("B1")) + + m2.entries should be(expectedMerged) + } + + "return the value for an existing key and the default for a non-existing one when using getOrElse" in { + val m = ORMultiMap().addBinding(node1, "a", "A") + m.getOrElse("a", Set("B")) shouldBe Set("A") + m.getOrElse("b", Set("B")) shouldBe Set("B") + } + + "remove all bindings for a given key" in { + val m = ORMultiMap().addBinding(node1, "a", "A1").addBinding(node1, "a", "A2").addBinding(node1, "b", "B1") + val m2 = m.remove(node1, "a") + m2.entries should be(Map("b" -> Set("B1"))) + } + + "have unapply extractor" in { + val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L)) + val m2: ORMultiMap[Long] = m1 + val ORMultiMap(entries1) = m1 + val entries2: Map[String, Set[Long]] = entries1 + Changed(ORMultiMapKey[Long]("key"))(m1) match { + case c @ Changed(ORMultiMapKey("key")) ⇒ + val ORMultiMap(entries3) = c.dataValue + val entries4: Map[String, Set[Long]] = entries3 + entries4 should be(Map("a" -> Set(1L, 2L), "b" -> Set(3L))) + } + } +} diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index f6b9b829fb..6fbbd4f288 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -16,6 +16,7 @@ import akka.cluster.ddata.GSet import akka.cluster.ddata.LWWMap import akka.cluster.ddata.LWWRegister import akka.cluster.ddata.ORMap +import akka.cluster.ddata.ORMultiMap import akka.cluster.ddata.ORSet import akka.cluster.ddata.PNCounter import akka.cluster.ddata.PNCounterMap @@ -154,6 +155,19 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem("ReplicatedDataSe increment(address2, "b", 5)) } + "serialize ORMultiMap" in { + checkSerialization(ORMultiMap()) + checkSerialization(ORMultiMap().addBinding(address1, "a", "A")) + checkSerialization(ORMultiMap.empty[String] + .addBinding(address1, "a", "A1") + .put(address2, "b", Set("B1", "B2", "B3")) + .addBinding(address2, "a", "A2")) + + val m1 = ORMultiMap.empty[String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2") + val m2 = ORMultiMap.empty[String].put(address2, "b", Set("B1", "B2", "B3")) + checkSameContent(m1.merge(m2), m2.merge(m1)) + } + "serialize DeletedData" in { checkSerialization(DeletedData) } diff --git a/akka-docs/rst/java/code/docs/ddata/DistributedDataDocTest.java b/akka-docs/rst/java/code/docs/ddata/DistributedDataDocTest.java index 2521405d06..ac70471ee6 100644 --- a/akka-docs/rst/java/code/docs/ddata/DistributedDataDocTest.java +++ b/akka-docs/rst/java/code/docs/ddata/DistributedDataDocTest.java @@ -3,56 +3,38 @@ */ package docs.ddata; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.assertEquals; - -import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; -import java.util.Arrays; import java.util.HashSet; -import java.util.Optional; +import java.util.Arrays; import java.util.Set; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import scala.PartialFunction; +import java.math.BigInteger; +import java.util.Optional; +import com.typesafe.config.ConfigFactory; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; +import scala.PartialFunction; +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.forkjoin.ThreadLocalRandom; -import akka.actor.ActorRef; +import akka.actor.Actor; +import akka.actor.ActorLogging; import akka.actor.ActorSystem; import akka.cluster.Cluster; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.Flag; -import akka.cluster.ddata.FlagKey; -import akka.cluster.ddata.GSet; -import akka.cluster.ddata.GSetKey; -import akka.cluster.ddata.Key; -import akka.cluster.ddata.LWWRegister; -import akka.cluster.ddata.ORSet; -import akka.cluster.ddata.ORSetKey; -import akka.cluster.ddata.PNCounter; -import akka.cluster.ddata.PNCounterKey; -import akka.cluster.ddata.PNCounterMap; -import akka.cluster.ddata.Replicator; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Delete; -import akka.cluster.ddata.Replicator.GetFailure; -import akka.cluster.ddata.Replicator.GetSuccess; -import akka.cluster.ddata.Replicator.NotFound; -import akka.cluster.ddata.Replicator.ReadAll; -import akka.cluster.ddata.Replicator.ReadConsistency; -import akka.cluster.ddata.Replicator.ReadFrom; -import akka.cluster.ddata.Replicator.ReadMajority; -import akka.cluster.ddata.Replicator.Subscribe; -import akka.cluster.ddata.Replicator.UpdateSuccess; -import akka.cluster.ddata.Replicator.UpdateTimeout; -import akka.cluster.ddata.Replicator.WriteAll; -import akka.cluster.ddata.Replicator.WriteConsistency; -import akka.cluster.ddata.Replicator.WriteMajority; -import akka.cluster.ddata.Replicator.WriteTo; +import akka.cluster.ddata.*; import akka.japi.pf.ReceiveBuilder; + +import static akka.cluster.ddata.Replicator.*; + +import akka.testkit.AkkaSpec; +import akka.testkit.ImplicitSender; import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import akka.actor.ActorRef; +import akka.serialization.SerializationExtension; public class DistributedDataDocTest { @@ -349,6 +331,19 @@ public class DistributedDataDocTest { //#orset } + public void demonstrateORMultiMap() { + //#ormultimap + final Cluster node = Cluster.get(system); + final ORMultiMap m0 = ORMultiMap.create(); + final ORMultiMap m1 = m0.put(node, "a", + new HashSet(Arrays.asList(1, 2, 3))); + final ORMultiMap m2 = m1.addBinding(node, "a", 4); + final ORMultiMap m3 = m2.removeBinding(node, "a", 2); + final ORMultiMap m4 = m3.addBinding(node, "b", 1); + System.out.println(m4.getEntries()); + //#ormultimap + } + public void demonstrateFlag() { //#flag final Flag f0 = Flag.create(); diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index d902a9c7c2..125f3494c7 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -244,7 +244,7 @@ by this package, such as: * Counters: ``GCounter``, ``PNCounter`` * Sets: ``GSet``, ``ORSet`` -* Maps: ``ORMap``, ``LWWMap``, ``PNCounterMap`` +* Maps: ``ORMap``, ``ORMultiMap``, ``LWWMap``, ``PNCounterMap`` * Registers: ``LWWRegister``, ``Flag`` Counters @@ -307,12 +307,17 @@ It is rather inconvenient to use the ``ORMap`` directly since it does not expose of the values. The ``ORMap`` is intended as a low level tool for building more specific maps, such as the following specialized maps. +``ORMultiMap`` (observed-remove multi-map) is a multi-map implementation that wraps an +``ORMap`` with an ``ORSet`` for the map's value. + ``PNCounterMap`` (positive negative counter map) is a map of named counters. It is a specialized ``ORMap`` with ``PNCounter`` values. ``LWWMap`` (last writer wins map) is a specialized ``ORMap`` with ``LWWRegister`` (last writer wins register) values. +.. includecode:: code/docs/ddata/DistributedDataDocTest.java#ormultimap + Note that ``LWWRegister`` and therefore ``LWWMap`` relies on synchronized clocks and should only be used when the choice of value is not important for concurrent updates occurring within the clock skew. diff --git a/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala b/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala index 045ee0aeb1..2b4163c9f5 100644 --- a/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala @@ -325,6 +325,19 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { //#orset } + "demonstrate ORMultiMap" in { + def println(o: Any): Unit = () + //#ormultimap + implicit val node = Cluster(system) + val m0 = ORMultiMap.empty[Int] + val m1 = m0 + ("a" -> Set(1, 2, 3)) + val m2 = m1.addBinding("a", 4) + val m3 = m2.removeBinding("a", 2) + val m4 = m3.addBinding("b", 1) + println(m4.entries) + //#ormultimap + } + "demonstrate Flag" in { def println(o: Any): Unit = () //#flag diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 7862204acd..78d14da06b 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -240,7 +240,7 @@ by this package, such as: * Counters: ``GCounter``, ``PNCounter`` * Sets: ``GSet``, ``ORSet`` -* Maps: ``ORMap``, ``LWWMap``, ``PNCounterMap`` +* Maps: ``ORMap``, ``ORMultiMap``, ``LWWMap``, ``PNCounterMap`` * Registers: ``LWWRegister``, ``Flag`` Counters @@ -303,12 +303,17 @@ It is rather inconvenient to use the ``ORMap`` directly since it does not expose of the values. The ``ORMap`` is intended as a low level tool for building more specific maps, such as the following specialized maps. +``ORMultiMap`` (observed-remove multi-map) is a multi-map implementation that wraps an +``ORMap`` with an ``ORSet`` for the map's value. + ``PNCounterMap`` (positive negative counter map) is a map of named counters. It is a specialized ``ORMap`` with ``PNCounter`` values. ``LWWMap`` (last writer wins map) is a specialized ``ORMap`` with ``LWWRegister`` (last writer wins register) values. +.. includecode:: code/docs/ddata/DistributedDataDocSpec.scala#ormultimap + Note that ``LWWRegister`` and therefore ``LWWMap`` relies on synchronized clocks and should only be used when the choice of value is not important for concurrent updates occurring within the clock skew.