diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java index 139dca9c1a..0b27cd1048 100644 --- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java +++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java @@ -6320,6 +6320,643 @@ public final class ClusterShardingMessages { } + public interface EntitiesStartedOrBuilder extends + // @@protoc_insertion_point(interface_extends:EntitiesStarted) + akka.protobufv3.internal.MessageOrBuilder { + + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + java.util.List + getEntityIdList(); + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + int getEntityIdCount(); + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + java.lang.String getEntityId(int index); + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + akka.protobufv3.internal.ByteString + getEntityIdBytes(int index); + } + /** + * Protobuf type {@code EntitiesStarted} + */ + public static final class EntitiesStarted extends + akka.protobufv3.internal.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:EntitiesStarted) + EntitiesStartedOrBuilder { + private static final long serialVersionUID = 0L; + // Use EntitiesStarted.newBuilder() to construct. + private EntitiesStarted(akka.protobufv3.internal.GeneratedMessageV3.Builder builder) { + super(builder); + } + private EntitiesStarted() { + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) { + return new EntitiesStarted(); + } + + @java.lang.Override + public final akka.protobufv3.internal.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private EntitiesStarted( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields = + akka.protobufv3.internal.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + akka.protobufv3.internal.ByteString bs = input.readBytes(); + if (!((mutable_bitField0_ & 0x00000001) != 0)) { + entityId_ = new akka.protobufv3.internal.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + entityId_.add(bs); + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobufv3.internal.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) != 0)) { + entityId_ = entityId_.getUnmodifiableView(); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStarted_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStarted_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.Builder.class); + } + + public static final int ENTITYID_FIELD_NUMBER = 1; + private akka.protobufv3.internal.LazyStringList entityId_; + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + public akka.protobufv3.internal.ProtocolStringList + getEntityIdList() { + return entityId_; + } + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + public int getEntityIdCount() { + return entityId_.size(); + } + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + public java.lang.String getEntityId(int index) { + return entityId_.get(index); + } + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + public akka.protobufv3.internal.ByteString + getEntityIdBytes(int index) { + return entityId_.getByteString(index); + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(akka.protobufv3.internal.CodedOutputStream output) + throws java.io.IOException { + for (int i = 0; i < entityId_.size(); i++) { + akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, entityId_.getRaw(i)); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < entityId_.size(); i++) { + dataSize += computeStringSizeNoTag(entityId_.getRaw(i)); + } + size += dataSize; + size += 1 * getEntityIdList().size(); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted)) { + return super.equals(obj); + } + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted other = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted) obj; + + if (!getEntityIdList() + .equals(other.getEntityIdList())) return false; + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (getEntityIdCount() > 0) { + hash = (37 * hash) + ENTITYID_FIELD_NUMBER; + hash = (53 * hash) + getEntityIdList().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + java.nio.ByteBuffer data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + java.nio.ByteBuffer data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + akka.protobufv3.internal.ByteString data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + akka.protobufv3.internal.ByteString data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom(byte[] data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + byte[] data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseDelimitedFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + akka.protobufv3.internal.CodedInputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parseFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code EntitiesStarted} + */ + public static final class Builder extends + akka.protobufv3.internal.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:EntitiesStarted) + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStartedOrBuilder { + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStarted_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStarted_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.Builder.class); + } + + // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobufv3.internal.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public akka.protobufv3.internal.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStarted_descriptor; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted getDefaultInstanceForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.getDefaultInstance(); + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted build() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted buildPartial() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted(this); + int from_bitField0_ = bitField0_; + if (((bitField0_ & 0x00000001) != 0)) { + entityId_ = entityId_.getUnmodifiableView(); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.entityId_ = entityId_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(akka.protobufv3.internal.Message other) { + if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted) { + return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted other) { + if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted.getDefaultInstance()) return this; + if (!other.entityId_.isEmpty()) { + if (entityId_.isEmpty()) { + entityId_ = other.entityId_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureEntityIdIsMutable(); + entityId_.addAll(other.entityId_); + } + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private akka.protobufv3.internal.LazyStringList entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + private void ensureEntityIdIsMutable() { + if (!((bitField0_ & 0x00000001) != 0)) { + entityId_ = new akka.protobufv3.internal.LazyStringArrayList(entityId_); + bitField0_ |= 0x00000001; + } + } + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + public akka.protobufv3.internal.ProtocolStringList + getEntityIdList() { + return entityId_.getUnmodifiableView(); + } + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + public int getEntityIdCount() { + return entityId_.size(); + } + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + public java.lang.String getEntityId(int index) { + return entityId_.get(index); + } + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + public akka.protobufv3.internal.ByteString + getEntityIdBytes(int index) { + return entityId_.getByteString(index); + } + /** + * repeated string entityId = 1; + * @param index The index to set the value at. + * @param value The entityId to set. + * @return This builder for chaining. + */ + public Builder setEntityId( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param value The entityId to add. + * @return This builder for chaining. + */ + public Builder addEntityId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.add(value); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param values The entityId to add. + * @return This builder for chaining. + */ + public Builder addAllEntityId( + java.lang.Iterable values) { + ensureEntityIdIsMutable(); + akka.protobufv3.internal.AbstractMessageLite.Builder.addAll( + values, entityId_); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @return This builder for chaining. + */ + public Builder clearEntityId() { + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param value The bytes of the entityId to add. + * @return This builder for chaining. + */ + public Builder addEntityIdBytes( + akka.protobufv3.internal.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.add(value); + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:EntitiesStarted) + } + + // @@protoc_insertion_point(class_scope:EntitiesStarted) + private static final akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted(); + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final akka.protobufv3.internal.Parser + PARSER = new akka.protobufv3.internal.AbstractParser() { + @java.lang.Override + public EntitiesStarted parsePartialFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return new EntitiesStarted(input, extensionRegistry); + } + }; + + public static akka.protobufv3.internal.Parser parser() { + return PARSER; + } + + @java.lang.Override + public akka.protobufv3.internal.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStarted getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + public interface EntityStoppedOrBuilder extends // @@protoc_insertion_point(interface_extends:EntityStopped) akka.protobufv3.internal.MessageOrBuilder { @@ -18267,6 +18904,11 @@ public final class ClusterShardingMessages { private static final akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable internal_static_EntityStarted_fieldAccessorTable; + private static final akka.protobufv3.internal.Descriptors.Descriptor + internal_static_EntitiesStarted_descriptor; + private static final + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internal_static_EntitiesStarted_fieldAccessorTable; private static final akka.protobufv3.internal.Descriptors.Descriptor internal_static_EntityStopped_descriptor; private static final @@ -18362,29 +19004,30 @@ public final class ClusterShardingMessages { "\006region\030\002 \002(\t\"*\n\tShardHome\022\r\n\005shard\030\001 \002(" + "\t\022\016\n\006region\030\002 \002(\t\"\037\n\013EntityState\022\020\n\010enti" + "ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" + - "\001 \002(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t" + - "\"0\n\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityC" + - "ount\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030" + - "\001 \003(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n" + - "\rMapFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001" + - "(\005\"/\n\027GetClusterShardingStats\022\024\n\014timeout" + - "Nanos\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005s" + - "tats\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X" + - "\n\031ClusterShardingStatsEntry\022\031\n\007address\030\001" + - " \002(\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardReg" + - "ionStats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 " + - "\003(\0132\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002" + - "(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004" + - "port\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 " + - "\002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022" + - "\017\n\007shardId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n" + - "\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nSha" + - "rdState\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 " + - "\003(\t\"F\n\027CurrentShardRegionState\022\033\n\006shards" + - "\030\001 \003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024R" + - "ememberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006" + - "marker\030\002 \001(\010B&\n\"akka.cluster.sharding.pr" + - "otobuf.msgH\001" + "\001 \002(\t\"#\n\017EntitiesStarted\022\020\n\010entityId\030\001 \003" + + "(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t\"0\n" + + "\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityCoun" + + "t\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030\001 \003" + + "(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n\rMa" + + "pFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"" + + "/\n\027GetClusterShardingStats\022\024\n\014timeoutNan" + + "os\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005stat" + + "s\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X\n\031C" + + "lusterShardingStatsEntry\022\031\n\007address\030\001 \002(" + + "\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardRegion" + + "Stats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 \003(\013" + + "2\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022" + + "\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004por" + + "t\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 \002(\t" + + "\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007" + + "shardId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n\007sh" + + "ardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nShardS" + + "tate\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t" + + "\"F\n\027CurrentShardRegionState\022\033\n\006shards\030\001 " + + "\003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024Reme" + + "mberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006mar" + + "ker\030\002 \001(\010B&\n\"akka.cluster.sharding.proto" + + "buf.msgH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -18438,92 +19081,98 @@ public final class ClusterShardingMessages { akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_EntityStarted_descriptor, new java.lang.String[] { "EntityId", }); - internal_static_EntityStopped_descriptor = + internal_static_EntitiesStarted_descriptor = getDescriptor().getMessageTypes().get(7); + internal_static_EntitiesStarted_fieldAccessorTable = new + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( + internal_static_EntitiesStarted_descriptor, + new java.lang.String[] { "EntityId", }); + internal_static_EntityStopped_descriptor = + getDescriptor().getMessageTypes().get(8); internal_static_EntityStopped_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_EntityStopped_descriptor, new java.lang.String[] { "EntityId", }); internal_static_ShardStats_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(9); internal_static_ShardStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardStats_descriptor, new java.lang.String[] { "Shard", "EntityCount", }); internal_static_ShardRegionStats_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(10); internal_static_ShardRegionStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardRegionStats_descriptor, new java.lang.String[] { "Stats", "Failed", }); internal_static_MapFieldEntry_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(11); internal_static_MapFieldEntry_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_MapFieldEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_GetClusterShardingStats_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(12); internal_static_GetClusterShardingStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_GetClusterShardingStats_descriptor, new java.lang.String[] { "TimeoutNanos", }); internal_static_ClusterShardingStats_descriptor = - getDescriptor().getMessageTypes().get(12); + getDescriptor().getMessageTypes().get(13); internal_static_ClusterShardingStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ClusterShardingStats_descriptor, new java.lang.String[] { "Stats", }); internal_static_ClusterShardingStatsEntry_descriptor = - getDescriptor().getMessageTypes().get(13); + getDescriptor().getMessageTypes().get(14); internal_static_ClusterShardingStatsEntry_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ClusterShardingStatsEntry_descriptor, new java.lang.String[] { "Address", "Stats", }); internal_static_CurrentRegions_descriptor = - getDescriptor().getMessageTypes().get(14); + getDescriptor().getMessageTypes().get(15); internal_static_CurrentRegions_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentRegions_descriptor, new java.lang.String[] { "Regions", }); internal_static_Address_descriptor = - getDescriptor().getMessageTypes().get(15); + getDescriptor().getMessageTypes().get(16); internal_static_Address_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_Address_descriptor, new java.lang.String[] { "Protocol", "System", "Hostname", "Port", }); internal_static_StartEntity_descriptor = - getDescriptor().getMessageTypes().get(16); + getDescriptor().getMessageTypes().get(17); internal_static_StartEntity_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_StartEntity_descriptor, new java.lang.String[] { "EntityId", }); internal_static_StartEntityAck_descriptor = - getDescriptor().getMessageTypes().get(17); + getDescriptor().getMessageTypes().get(18); internal_static_StartEntityAck_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_StartEntityAck_descriptor, new java.lang.String[] { "EntityId", "ShardId", }); internal_static_CurrentShardState_descriptor = - getDescriptor().getMessageTypes().get(18); + getDescriptor().getMessageTypes().get(19); internal_static_CurrentShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentShardState_descriptor, new java.lang.String[] { "ShardId", "EntityIds", }); internal_static_ShardState_descriptor = - getDescriptor().getMessageTypes().get(19); + getDescriptor().getMessageTypes().get(20); internal_static_ShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardState_descriptor, new java.lang.String[] { "ShardId", "EntityIds", }); internal_static_CurrentShardRegionState_descriptor = - getDescriptor().getMessageTypes().get(20); + getDescriptor().getMessageTypes().get(21); internal_static_CurrentShardRegionState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentShardRegionState_descriptor, new java.lang.String[] { "Shards", "Failed", }); internal_static_RememberedShardState_descriptor = - getDescriptor().getMessageTypes().get(21); + getDescriptor().getMessageTypes().get(22); internal_static_RememberedShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_RememberedShardState_descriptor, diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto index 2d5deae395..f22582a815 100644 --- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto +++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto @@ -46,6 +46,10 @@ message EntityStarted { required string entityId = 1; } +message EntitiesStarted { + repeated string entityId = 1; +} + message EntityStopped { required string entityId = 1; } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 7128fc1c07..cd4ff16afb 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -326,7 +326,7 @@ private[akka] class Shard( case Terminated(ref) => receiveTerminated(ref) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) - case msg: ShardRegion.StartEntity => receiveStartEntity(msg) + case msg: ShardRegion.StartEntity => startEntities(Map(msg.entityId -> Some(sender()))) case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg) case msg: ShardRegionCommand => receiveShardRegionCommand(msg) case msg: ShardQuery => receiveShardQuery(msg) @@ -338,12 +338,18 @@ private[akka] class Shard( def waitForAsyncWrite(entityId: EntityId, command: RememberEntitiesShardStore.Command)( whenDone: EntityId => Unit): Unit = { + waitForAsyncWrite(Set(entityId), command)(_ => whenDone(entityId)) + } + + def waitForAsyncWrite(entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)( + whenDone: Set[EntityId] => Unit): Unit = { + rememberEntitiesStore match { case None => - whenDone(entityId) + whenDone(entityIds) case Some(store) => - if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityId, command) + if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds, command) store ! command timers.startSingleTimer( RememberEntityTimeoutKey, @@ -352,35 +358,51 @@ private[akka] class Shard( // and this could always fail before ddata store completes retrying writes settings.tuningParameters.updatingStateTimeout) - context.become { - case RememberEntitiesShardStore.UpdateDone(`entityId`) => - if (VerboseDebug) log.debug("Update of [{}] {} done", entityId, command) + context.become(waitingForUpdate(Map.empty)) + + def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = { + // none of the current impls will send back a partial update, yet! + case RememberEntitiesShardStore.UpdateDone(ids) if ids == ids => timers.cancel(RememberEntityTimeoutKey) - whenDone(entityId) - context.become(idle) - unstashAll() + whenDone(entityIds) + if (pendingStarts.isEmpty) { + context.become(idle) + unstashAll() + } else { + startEntities(pendingStarts) + // FIXME what if all these are already in entities? Need a become idle/unstashAll + } case RememberEntityTimeout(`command`) => throw new RuntimeException( - s"Async write for entityId $entityId timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") + s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") + case msg: ShardRegion.StartEntity => + log.error("Start entity while a write already in progress") + context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender())))) // below cases should handle same messages as in idle - case _: Terminated => stash() - case _: CoordinatorMessage => stash() - case _: RememberEntityCommand => stash() - case _: ShardRegion.StartEntity => stash() - case _: ShardRegion.StartEntityAck => stash() - case _: ShardRegionCommand => stash() - case msg: ShardQuery => receiveShardQuery(msg) - case PassivateIdleTick => stash() - case msg: LeaseLost => receiveLeaseLost(msg) - case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) - case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender(), OptionVal.Some(entityId)) - case msg => + case _: Terminated => stash() + case _: CoordinatorMessage => stash() + case _: RememberEntityCommand => stash() + case _: ShardRegion.StartEntityAck => stash() + case _: ShardRegionCommand => stash() + case msg: ShardQuery => receiveShardQuery(msg) + case PassivateIdleTick => stash() + case msg: LeaseLost => receiveLeaseLost(msg) + case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) + case msg if extractEntityId.isDefinedAt(msg) => + val (id, _) = extractEntityId(msg) + if (entityIds.contains(id)) { + deliverMessage(msg, sender(), OptionVal.Some(entityIds)) + } else { + stash() + context.become(waitingForUpdate(pendingStarts + (id -> None))) + } + case msg => // shouldn't be any other message types, but just in case - log.debug( + log.warning( "Stashing unexpected message [{}] while waiting for remember entities update of {}", msg.getClass, - entityId) + entityIds) stash() } } @@ -410,19 +432,32 @@ private[akka] class Shard( case RestartEntities(ids) => restartEntities(ids) } - private def receiveStartEntity(start: ShardRegion.StartEntity): Unit = { - val requester = sender() - log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", requester, start.entityId, shardId) - touchLastMessageTimestamp(start.entityId) + // this could be because of a start message or due to a new message for the entity + // if it is a start entity then start entity ack is sent after it is created + private def startEntities(entities: Map[EntityId, Option[ActorRef]]): Unit = { + val alreadyStarted = entities.filterKeys(entity => entityIds(entity)) + val needStarting = entities -- alreadyStarted.keySet + log.debug( + "Request to start entities {}. Already started {}. Need starting {}", + entities, + alreadyStarted, + needStarting) - if (entityIds(start.entityId)) { - getOrCreateEntity(start.entityId) - requester ! ShardRegion.StartEntityAck(start.entityId, shardId) - } else { - waitForAsyncWrite(start.entityId, RememberEntitiesShardStore.AddEntity(start.entityId)) { id => - getOrCreateEntity(id) - sendMsgBuffer(id) - requester ! ShardRegion.StartEntityAck(id, shardId) + alreadyStarted.foreach { + case (entityId, requestor) => + getOrCreateEntity(entityId) + touchLastMessageTimestamp(entityId) + requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + } + + if (needStarting.nonEmpty) { + waitForAsyncWrite(needStarting.keySet, RememberEntitiesShardStore.AddEntities(needStarting.keySet)) { _ => + needStarting.foreach { + case (entityId, requestor) => + getOrCreateEntity(entityId) + sendMsgBuffer(entityId) + requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + } } } } @@ -431,9 +466,9 @@ private[akka] class Shard( if (ack.shardId != shardId && entityIds(ack.entityId)) { log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId) - waitForAsyncWrite(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { id => - entityIds = entityIds - id - messageBuffers.remove(id) + waitForAsyncWrite(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { _ => + entityIds = entityIds - ack.entityId + messageBuffers.remove(ack.entityId) } } } @@ -505,7 +540,7 @@ private[akka] class Shard( context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id)) } else { // FIXME optional wait for completion as optimization where stops are not critical - waitForAsyncWrite(id, RememberEntitiesShardStore.RemoveEntity(id))(passivateCompleted) + waitForAsyncWrite(id, RememberEntitiesShardStore.RemoveEntity(id))(_ => passivateCompleted(id)) } } @@ -551,7 +586,9 @@ private[akka] class Shard( entityIds = entityIds - entityId if (hasBufferedMessages) { log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId) - waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntity(entityId))(sendMsgBuffer) + waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntities(Set(entityId))) { _ => + sendMsgBuffer(entityId) + } } else { log.debug("Entity stopped after passivation [{}]", entityId) dropBufferFor(entityId) @@ -559,10 +596,10 @@ private[akka] class Shard( } /** - * @param entityIdWaitingForWrite an id for an remember entity write in progress, if non empty messages for that id + * @param entityIdsWaitingForWrite ids for remembered entities that have a write in progress, if non empty messages for that id * will be buffered */ - private def deliverMessage(msg: Any, snd: ActorRef, entityIdWaitingForWrite: OptionVal[EntityId]): Unit = { + private def deliverMessage(msg: Any, snd: ActorRef, entityIdsWaitingForWrite: OptionVal[Set[EntityId]]): Unit = { val (id, payload) = extractEntityId(msg) if (id == null || id == "") { log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) @@ -571,17 +608,18 @@ private[akka] class Shard( payload match { case start: ShardRegion.StartEntity => // we can only start a new entity if we are not currently waiting for another write - if (entityIdWaitingForWrite.isEmpty) receiveStartEntity(start) + if (entityIdsWaitingForWrite.isEmpty) startEntities(Map(start.entityId -> Some(sender()))) // write in progress, see waitForAsyncWrite for unstash else stash() case _ => - if (messageBuffers.contains(id) || entityIdWaitingForWrite.contains(id)) { + if (messageBuffers.contains(id) || (entityIdsWaitingForWrite.isDefined && entityIdsWaitingForWrite.get + .contains(id))) { // either: // 1. entity is passivating, buffer until passivation complete (id in message buffers) // 2. we are waiting for storing entity start or stop with remember entities to complete // and want to buffer until write completes if (VerboseDebug) { - if (entityIdWaitingForWrite.contains(id)) + if (entityIdsWaitingForWrite.isDefined && entityIdsWaitingForWrite.get.contains(id)) log.debug("Buffering message [{}] to [{}] because of write in progress for it", msg.getClass, id) else log.debug("Buffering message [{}] to [{}] because passivation in progress for it", msg.getClass, id) @@ -614,16 +652,16 @@ private[akka] class Shard( actor.tell(payload, snd) } else { - entityIdWaitingForWrite match { + entityIdsWaitingForWrite match { case OptionVal.None => // No actor running and no write in progress, start actor and deliver message when started // Note; we only do this if remembering, otherwise the buffer is an overhead if (VerboseDebug) log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id) appendToMessageBuffer(id, msg, snd) - waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer) + waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntities(Set(id)))(sendMsgBuffer) - case OptionVal.Some(`id`) => + case OptionVal.Some(ids) if ids.contains(id) => // No actor running and write in progress for this particular id, buffer message for deliver when // write completes if (VerboseDebug) @@ -633,7 +671,7 @@ private[akka] class Shard( id) appendToMessageBuffer(id, msg, snd) - case OptionVal.Some(otherId) => + case OptionVal.Some(otherIds) => // No actor running and write in progress for some other entity id, stash message for deliver when // unstash happens on async write complete if (VerboseDebug) @@ -641,7 +679,7 @@ private[akka] class Shard( "Stashing message [{}] to [{}] because of write in progress for [{}]", payload.getClass, id, - otherId) + otherIds) stash() } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index c15eed411c..b5184ff749 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -978,6 +978,9 @@ abstract class ShardCoordinator( /** * Singleton coordinator that decides where to allocate shards. * + * Users can migrate to using DData to store state then either event sourcing or ddata to store + * the remembered entities. + * * @see [[ClusterSharding$ ClusterSharding extension]] */ @deprecated("Use `ddata` mode, persistence mode is deprecated.", "2.6.0") @@ -1103,6 +1106,9 @@ private[akka] object DDataShardCoordinator { * INTERNAL API * Singleton coordinator (with state based on ddata) that decides where to allocate shards. * + * The plan is for this to be the only type of ShardCoordinator. A full cluster shutdown will rely + * on remembered entities to re-initialize and reallocate the existing shards. + * * @see [[ClusterSharding$ ClusterSharding extension]] */ @InternalApi diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index 8e31ae1dd0..54455bcb49 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -29,6 +29,7 @@ import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardRegion.EntityId import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.internal.RememberEntitiesShardStore.{ AddEntities, RemoveEntity } import akka.util.PrettyDuration._ import scala.concurrent.ExecutionContext @@ -103,8 +104,9 @@ private[akka] final class DDataRememberEntitiesShardStore( override def receive: Receive = idle def idle: Receive = { - case update: RememberEntitiesShardStore.UpdateEntityCommand => onUpdate(update) - case RememberEntitiesShardStore.GetEntities => onGetEntities() + case RememberEntitiesShardStore.GetEntities => onGetEntities() + case AddEntities(ids) => addEntities(ids) + case RemoveEntity(id) => removeEntity(id) } def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = { @@ -139,35 +141,54 @@ private[akka] final class DDataRememberEntitiesShardStore( } } - private def onUpdate(update: RememberEntitiesShardStore.UpdateEntityCommand): Unit = { - val keyForEntity = key(update.entityId) - val sendUpdate = () => - replicator ! Update(keyForEntity, ORSet.empty[EntityId], writeMajority, Some(update)) { existing => - update match { - case RememberEntitiesShardStore.AddEntity(id) => existing :+ id - case RememberEntitiesShardStore.RemoveEntity(id) => existing.remove(id) - } - } + private def addEntities(ids: Set[EntityId]): Unit = { + val updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)] = ids.groupBy(key).map { + case (key, ids) => + (ids, (Update(key, ORSet.empty[EntityId], writeMajority, Some(ids)) { existing => + ids.foldLeft(existing) { + case (acc, nextId) => acc :+ nextId + } + }, maxUpdateAttempts)) + } - sendUpdate() - context.become(waitingForUpdate(sender(), update, keyForEntity, maxUpdateAttempts, sendUpdate)) + updates.foreach { + case (_, (update, _)) => + replicator ! update + } + + context.become(waitingForUpdates(sender(), ids, updates)) } - private def waitingForUpdate( - requestor: ActorRef, - update: RememberEntitiesShardStore.UpdateEntityCommand, - keyForEntity: ORSetKey[EntityId], - retriesLeft: Int, - retry: () => Unit): Receive = { - case UpdateSuccess(`keyForEntity`, Some(`update`)) => - log.debug("The DDataShard state was successfully updated for [{}]", update.entityId) - requestor ! RememberEntitiesShardStore.UpdateDone(update.entityId) - context.become(idle) + private def removeEntity(id: EntityId): Unit = { + val keyForEntity = key(id) + val update = Update(keyForEntity, ORSet.empty[EntityId], writeMajority, Some(Set(id))) { existing => + existing.remove(id) + } + replicator ! update - case UpdateTimeout(`keyForEntity`, Some(`update`)) => + context.become(waitingForUpdates(sender(), Set(id), Map((Set(id), (update, maxUpdateAttempts))))) + } + + private def waitingForUpdates( + requestor: ActorRef, + allIds: Set[EntityId], + updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)]): Receive = { + case UpdateSuccess(_, Some(ids: Set[EntityId] @unchecked)) => + log.debug("The DDataShard state was successfully updated for [{}]", ids) + val remaining = updates - ids + if (remaining.isEmpty) { + requestor ! RememberEntitiesShardStore.UpdateDone(allIds) + context.become(idle) + } else { + context.become(waitingForUpdates(requestor, allIds, remaining)) + } + + case UpdateTimeout(_, Some(ids: Set[EntityId] @unchecked)) => + val (update, retriesLeft) = updates(ids) if (retriesLeft > 0) { log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft) - retry() + replicator ! update + context.become(waitingForUpdates(requestor, allIds, updates.updated(ids, (update, retriesLeft - 1)))) } else { log.error( "Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries", @@ -176,15 +197,15 @@ private[akka] final class DDataRememberEntitiesShardStore( // will trigger shard restart context.stop(self) } - case StoreFailure(`keyForEntity`, Some(`update`)) => + case StoreFailure(_, _) => log.error("Unable to update state, due to store failure") // will trigger shard restart context.stop(self) - case ModifyFailure(`keyForEntity`, error, cause, Some(`update`)) => + case ModifyFailure(_, error, cause, _) => log.error(cause, "Unable to update state, due to modify failure: {}", error) // will trigger shard restart context.stop(self) - case UpdateDataDeleted(`keyForEntity`, Some(`update`)) => + case UpdateDataDeleted(_, _) => log.error("Unable to update state, due to delete") // will trigger shard restart context.stop(self) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala index ff8783b9c5..dfa54b66e9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala @@ -30,7 +30,8 @@ import akka.persistence.SnapshotSelectionCriteria private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) extends RememberEntitiesProvider { - // this is backed by an actor using the same events as for state-store-mode=persistence + // this is backed by an actor using the same events, at the serailisation level, as the now removed PersistentShard when state-store-mode=persistence + // new events can be added but the old events should continue to be handled override def shardStoreProps(shardId: ShardId): Props = EventSourcedRememberEntitiesStore.props(typeName, shardId, settings) @@ -42,15 +43,14 @@ private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, /** * INTERNAL API + * */ private[akka] object EventSourcedRememberEntitiesStore { /** * A case class which represents a state change for the Shard */ - sealed trait StateChange extends ClusterShardingSerializable { - val entityId: EntityId - } + sealed trait StateChange extends ClusterShardingSerializable /** * Persistent state of the Shard. @@ -58,9 +58,9 @@ private[akka] object EventSourcedRememberEntitiesStore { final case class State private[akka] (entities: Set[EntityId] = Set.empty) extends ClusterShardingSerializable /** - * `State` change for starting an entity in this `Shard` + * `State` change for starting a set of entities in this `Shard` */ - final case class EntityStarted(entityId: EntityId) extends StateChange + final case class EntitiesStarted(entities: Set[String]) extends StateChange case object StartedAck @@ -97,7 +97,7 @@ private[akka] final class EventSourcedRememberEntitiesStore( override def snapshotPluginId: String = settings.snapshotPluginId override def receiveRecover: Receive = { - case EntityStarted(id) => state = state.copy(state.entities + id) + case EntitiesStarted(ids) => state = state.copy(state.entities ++ ids) case EntityStopped(id) => state = state.copy(state.entities - id) case SnapshotOffer(_, snapshot: State) => state = snapshot case RecoveryCompleted => @@ -105,15 +105,15 @@ private[akka] final class EventSourcedRememberEntitiesStore( } override def receiveCommand: Receive = { - case RememberEntitiesShardStore.AddEntity(id) => - persist(EntityStarted(id)) { started => - sender() ! RememberEntitiesShardStore.UpdateDone(id) - state.copy(state.entities + started.entityId) + case RememberEntitiesShardStore.AddEntities(ids) => + persist(EntitiesStarted(ids)) { started => + sender() ! RememberEntitiesShardStore.UpdateDone(ids) + state.copy(state.entities ++ started.entities) saveSnapshotWhenNeeded() } case RememberEntitiesShardStore.RemoveEntity(id) => persist(EntityStopped(id)) { stopped => - sender() ! RememberEntitiesShardStore.UpdateDone(id) + sender() ! RememberEntitiesShardStore.UpdateDone(Set(id)) state.copy(state.entities - stopped.entityId) saveSnapshotWhenNeeded() } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala index f1dca5b549..eff58885d9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala @@ -46,13 +46,10 @@ private[akka] object RememberEntitiesShardStore { // SPI protocol for a remember entities shard store sealed trait Command - sealed trait UpdateEntityCommand extends Command { - def entityId: EntityId - } - final case class AddEntity(entityId: EntityId) extends UpdateEntityCommand - final case class RemoveEntity(entityId: EntityId) extends UpdateEntityCommand + final case class AddEntities(entityIds: Set[EntityId]) extends Command + final case class RemoveEntity(entityId: EntityId) extends Command // responses for UpdateEntity add and remove - final case class UpdateDone(entityId: EntityId) + final case class UpdateDone(entityIds: Set[EntityId]) case object GetEntities extends Command final case class RememberedEntities(entities: Set[EntityId]) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 91beeb0552..0e60819549 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -10,10 +10,9 @@ import java.util.zip.GZIPOutputStream import scala.annotation.tailrec import scala.concurrent.duration._ - import akka.util.ccompat.JavaConverters._ -import scala.collection.immutable +import scala.collection.immutable import akka.actor.ActorRef import akka.actor.ExtendedActorSystem import akka.cluster.sharding.Shard @@ -29,6 +28,7 @@ import java.io.NotSerializableException import akka.actor.Address import akka.cluster.sharding.ShardRegion._ +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages /** @@ -41,11 +41,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy import ShardCoordinator.Internal._ import Shard.{ CurrentShardState, GetCurrentShardState } import Shard.{ GetShardStats, ShardStats } - import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ - State => EntityState, - EntityStarted, - EntityStopped - } + import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState, EntityStopped } private final val BufferSize = 1024 * 4 @@ -73,6 +69,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val EntityStateManifest = "CA" private val EntityStartedManifest = "CB" private val EntityStoppedManifest = "CD" + private val EntitiesStartedManifest = "CE" private val StartEntityManifest = "EA" private val StartEntityAckManifest = "EB" @@ -98,6 +95,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] => AnyRef]( EntityStateManifest -> entityStateFromBinary, EntityStartedManifest -> entityStartedFromBinary, + EntitiesStartedManifest -> entitiesStartedFromBinary, EntityStoppedManifest -> entityStoppedFromBinary, CoordinatorStateManifest -> coordinatorStateFromBinary, ShardRegionRegisteredManifest -> { bytes => @@ -199,9 +197,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy }) override def manifest(obj: AnyRef): String = obj match { - case _: EntityState => EntityStateManifest - case _: EntityStarted => EntityStartedManifest - case _: EntityStopped => EntityStoppedManifest + case _: EntityState => EntityStateManifest + case _: EntitiesStarted => EntitiesStartedManifest + case _: EntityStopped => EntityStoppedManifest case _: State => CoordinatorStateManifest case _: ShardRegionRegistered => ShardRegionRegisteredManifest @@ -272,9 +270,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case GracefulShutdownReq(ref) => actorRefMessageToProto(ref).toByteArray - case m: EntityState => entityStateToProto(m).toByteArray - case m: EntityStarted => entityStartedToProto(m).toByteArray - case m: EntityStopped => entityStoppedToProto(m).toByteArray + case m: EntityState => entityStateToProto(m).toByteArray + case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray + case m: EntityStopped => entityStoppedToProto(m).toByteArray case s: StartEntity => startEntityToByteArray(s) case s: StartEntityAck => startEntityAckToByteArray(s) @@ -406,11 +404,14 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private def entityStateFromBinary(bytes: Array[Byte]): EntityState = EntityState(sm.EntityState.parseFrom(bytes).getEntitiesList.asScala.toSet) - private def entityStartedToProto(evt: EntityStarted): sm.EntityStarted = - sm.EntityStarted.newBuilder().setEntityId(evt.entityId).build() + private def entityStartedFromBinary(bytes: Array[Byte]): EntitiesStarted = + EntitiesStarted(Set(sm.EntityStarted.parseFrom(bytes).getEntityId)) - private def entityStartedFromBinary(bytes: Array[Byte]): EntityStarted = - EntityStarted(sm.EntityStarted.parseFrom(bytes).getEntityId) + private def entitiesStartedToProto(evt: EntitiesStarted): sm.EntitiesStarted = + sm.EntitiesStarted.newBuilder().addAllEntityId(evt.entities.asJava).build() + + private def entitiesStartedFromBinary(bytes: Array[Byte]): EntitiesStarted = + EntitiesStarted(sm.EntitiesStarted.parseFrom(bytes).getEntityIdList.asScala.toSet) private def entityStoppedToProto(evt: EntityStopped): sm.EntityStopped = sm.EntityStopped.newBuilder().setEntityId(evt.entityId).build() diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala index 28e5e2255d..cc7cfb8b46 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -32,20 +32,25 @@ object ClusterShardingRememberEntitiesPerfSpec { case id: Int => (id.toString, id) } - val extractShardId: ShardRegion.ExtractShardId = msg => - msg match { - case _: Int => "0" // only one shard - case ShardRegion.StartEntity(_) => "0" - } + val extractShardId: ShardRegion.ExtractShardId = { + case _: Int => "0" // only one shard + case ShardRegion.StartEntity(_) => "0" + } } -object ClusterShardingRememberEntitiesPerfSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s""" +object ClusterShardingRememberEntitiesPerfSpecConfig + extends MultiNodeClusterShardingConfig( + rememberEntities = true, + additionalConfig = s""" akka.testconductor.barrier-timeout = 3 minutes akka.remote.artery.advanced.outbound-message-queue-size = 10000 akka.remote.artery.advanced.maximum-frame-size = 512 KiB # comment next line to enable durable lmdb storage akka.cluster.sharding.distributed-data.durable.keys = [] + akka.cluster.sharding { + remember-entities = on + } """) { val first = role("first") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index a68ac89b30..c5d9a49aa4 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -59,6 +59,7 @@ class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Bool rememberEntities) class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities) + class DDataClusterShardingEventSourcedRememberEntitiesSpecConfig(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpecConfig( ClusterShardingSettings.StateStoreModeDData, diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala index 7edb69b0bc..2243198621 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala @@ -88,15 +88,16 @@ object RememberEntitiesFailureSpec { object FakeShardStoreActor { def props(shardId: ShardId): Props = Props(new FakeShardStoreActor(shardId)) - case class FailAddEntity(entityId: EntityId, whichWay: Fail) + case class FailAddEntity(entityId: Set[EntityId], whichWay: Fail) case class FailRemoveEntity(entityId: EntityId, whichWay: Fail) - case class ClearFail(entityId: EntityId) + case class ClearAddFail(entityId: Set[EntityId]) + case class ClearRemoveFail(entityId: EntityId) } class FakeShardStoreActor(shardId: ShardId) extends Actor with ActorLogging { import FakeShardStoreActor._ implicit val ec = context.system.dispatcher - private var failAddEntity = Map.empty[EntityId, Fail] + private var failAddEntity = Map.empty[Set[EntityId], Fail] private var failRemoveEntity = Map.empty[EntityId, Fail] context.system.eventStream.publish(ShardStoreCreated(self, shardId)) @@ -109,7 +110,7 @@ object RememberEntitiesFailureSpec { case Some(CrashStore) => throw TestException("store crash on GetEntities") case Some(StopStore) => context.stop(self) } - case RememberEntitiesShardStore.AddEntity(entityId) => + case RememberEntitiesShardStore.AddEntities(entityId) => failAddEntity.get(entityId) match { case None => sender ! RememberEntitiesShardStore.UpdateDone(entityId) case Some(NoResponse) => log.debug("Sending no response for AddEntity") @@ -118,7 +119,7 @@ object RememberEntitiesFailureSpec { } case RememberEntitiesShardStore.RemoveEntity(entityId) => failRemoveEntity.get(entityId) match { - case None => sender ! RememberEntitiesShardStore.UpdateDone(entityId) + case None => sender ! RememberEntitiesShardStore.UpdateDone(Set(entityId)) case Some(NoResponse) => log.debug("Sending no response for RemoveEntity") case Some(CrashStore) => throw TestException("store crash on AddEntity") case Some(StopStore) => context.stop(self) @@ -129,8 +130,10 @@ object RememberEntitiesFailureSpec { case FailRemoveEntity(id, whichWay) => failRemoveEntity = failRemoveEntity.updated(id, whichWay) sender() ! Done - case ClearFail(id) => + case ClearAddFail(id) => failAddEntity = failAddEntity - id + sender() ! Done + case ClearRemoveFail(id) => failRemoveEntity = failRemoveEntity - id sender() ! Done } @@ -243,7 +246,7 @@ class RememberEntitiesFailureSpec probe.expectMsg("hello-1") // hit shard with other entity that will fail - shardStore.tell(FakeShardStoreActor.FailAddEntity("11", wayToFail), storeProbe.ref) + shardStore.tell(FakeShardStoreActor.FailAddEntity(Set("11"), wayToFail), storeProbe.ref) storeProbe.expectMsg(Done) sharding.tell(EntityEnvelope(11, "hello-11"), probe.ref) @@ -256,7 +259,7 @@ class RememberEntitiesFailureSpec } val stopFailingProbe = TestProbe() - shardStore.tell(FakeShardStoreActor.ClearFail("11"), stopFailingProbe.ref) + shardStore.tell(FakeShardStoreActor.ClearAddFail(Set("11")), stopFailingProbe.ref) stopFailingProbe.expectMsg(Done) // it takes a while - timeout hits and then backoff @@ -292,7 +295,7 @@ class RememberEntitiesFailureSpec // FIXME restart without passivating is not saved and re-started again without storing the stop so this isn't testing anything sharding ! EntityEnvelope(1, "stop") - shard1Store.tell(FakeShardStoreActor.ClearFail("1"), storeProbe.ref) + shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) storeProbe.expectMsg(Done) // it takes a while - timeout hits and then backoff @@ -329,7 +332,7 @@ class RememberEntitiesFailureSpec sharding ! EntityEnvelope(1, "graceful-stop") - shard1Store.tell(FakeShardStoreActor.ClearFail("1"), storeProbe.ref) + shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) storeProbe.expectMsg(Done) // it takes a while? diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala index ea2404380a..fa0d6a42b1 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala @@ -13,6 +13,7 @@ import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.ShardId import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted import akka.serialization.SerializationExtension import akka.testkit.AkkaSpec @@ -74,10 +75,17 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { } "be able to serialize PersistentShard domain events" in { - checkSerialization(EventSourcedRememberEntitiesStore.EntityStarted("e1")) + checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2"))) checkSerialization(EventSourcedRememberEntitiesStore.EntityStopped("e1")) } + "be able to deserialize old entity started event into entities started" in { + import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } + + val asBytes = sm.EntityStarted.newBuilder().setEntityId("e1").build().toByteArray + SerializationExtension(system).deserialize(asBytes, 13, "CB").get shouldEqual EntitiesStarted(Set("e1")) + } + "be able to serialize GetShardStats" in { checkSerialization(Shard.GetShardStats) }